You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

575 lines
19KB

  1. #![allow(unused_imports)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::path::PathBuf;
  7. use std::time::*;
  8. use std::io::{self, prelude::*};
  9. use std::fs;
  10. use std::f64::NAN;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use pretty_toa::ThousandsSep;
  15. use markets::crypto::{Exchange, Ticker, Side};
  16. use pipelines::windows::WeightedMeanWindow;
  17. // equivalent to panic! but without the ugly 'thread main panicked' yada yada
  18. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  19. eprintln!($fmt, $($args)*);
  20. std::process::exit(1);
  21. }}}
  22. const PROGRESS_EVERY: usize = 1024 * 1024;
  23. const ONE_SECOND: u64 = 1_000_000_000;
  24. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  25. #[derive(Debug, StructOpt)]
  26. struct Opt {
  27. /// Path to CSV file with trades data
  28. #[structopt(short = "f", long = "trades-csv")]
  29. #[structopt(parse(from_os_str))]
  30. trades_csv: PathBuf,
  31. /// Where to save the query results (CSV output)
  32. #[structopt(short = "o", long = "output-path")]
  33. #[structopt(parse(from_os_str))]
  34. output_path: PathBuf,
  35. #[structopt(short = "z", long = "hard-mode")]
  36. hard_mode: bool,
  37. }
  38. #[derive(Deserialize, Debug)]
  39. struct Trade {
  40. /// Time of trade in unix nanoseconds
  41. pub time: u64,
  42. /// Exchange where trade executed
  43. pub exch: Exchange,
  44. /// Currency rate of trade (base/quote)
  45. pub ticker: Ticker,
  46. /// Price of trade, in quote denomination
  47. pub price: f64,
  48. /// Size/Volume of trade, in base denomination
  49. pub amount: f64,
  50. pub server_time: u64,
  51. }
  52. fn per_sec(n: usize, span: Duration) -> f64 {
  53. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  54. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  55. n as f64 / s
  56. }
  57. #[allow(dead_code)]
  58. #[inline(always)]
  59. fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> {
  60. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?)
  61. .ok_or("parsing time failed")?;
  62. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  63. .map_err(|_| "parsing amount failed")?;
  64. let exch = match row.get(2).ok_or("no exch")? {
  65. b"bmex" => e!(bmex),
  66. b"bnce" => e!(bnce),
  67. b"btfx" => e!(btfx),
  68. b"gdax" => e!(gdax),
  69. b"okex" => e!(okex),
  70. b"bits" => e!(bits),
  71. b"plnx" => e!(plnx),
  72. b"krkn" => e!(krkn),
  73. _ => return Err("illegal exch"),
  74. };
  75. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  76. .map_err(|_| "parsing price failed")?;
  77. let ticker = match row.get(6).ok_or("no ticker")? {
  78. b"btc_usd" => t!(btc-usd),
  79. b"eth_usd" => t!(eth-usd),
  80. b"ltc_usd" => t!(ltc-usd),
  81. b"etc_usd" => t!(etc-usd),
  82. b"bch_usd" => t!(bch-usd),
  83. b"xmr_usd" => t!(xmr-usd),
  84. b"usdt_usd" => t!(usdt-usd),
  85. _ => return Err("illegal ticker"),
  86. };
  87. Ok(Trade { time, amount, exch, price, ticker,
  88. server_time: 0,
  89. })
  90. }
  91. #[allow(dead_code)]
  92. #[inline(always)]
  93. fn manual_deserialize_str(row: &csv::StringRecord) -> Result<Trade, &'static str> {
  94. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes())
  95. .ok_or("parsing time failed")?;
  96. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  97. .map_err(|_| "parsing amount failed")?;
  98. let exch = match row.get(2).ok_or("no exch")? {
  99. "bmex" => e!(bmex),
  100. "bnce" => e!(bnce),
  101. "btfx" => e!(btfx),
  102. "gdax" => e!(gdax),
  103. "okex" => e!(okex),
  104. "bits" => e!(bits),
  105. "plnx" => e!(plnx),
  106. "krkn" => e!(krkn),
  107. _ => return Err("illegal exch"),
  108. };
  109. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  110. .map_err(|_| "parsing price failed")?;
  111. let ticker = match row.get(6).ok_or("no ticker")? {
  112. "btc_usd" => t!(btc-usd),
  113. "eth_usd" => t!(eth-usd),
  114. "ltc_usd" => t!(ltc-usd),
  115. "etc_usd" => t!(etc-usd),
  116. "bch_usd" => t!(bch-usd),
  117. "xmr_usd" => t!(xmr-usd),
  118. "usdt_usd" => t!(usdt-usd),
  119. _ => return Err("illegal ticker"),
  120. };
  121. Ok(Trade { time, amount, exch, price, ticker,
  122. server_time: 0,
  123. })
  124. }
  125. /// Example of code used in discussion of increasing CSV parsing performance
  126. #[allow(dead_code)]
  127. fn fast_parse_bytes<R: Read>(mut rdr: csv::Reader<R>) -> Result<usize, String> {
  128. // our data is ascii, so parsing with the slightly faster ByteRecord is fine
  129. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  130. let mut row = csv::ByteRecord::new();
  131. // manual_deserialize_bytes assumes the column order of the CSV,
  132. // so here we verify that it actually matches that assumption
  133. assert_eq!(headers.get(0), Some(&b"time"[..]));
  134. assert_eq!(headers.get(1), Some(&b"amount"[..]));
  135. assert_eq!(headers.get(2), Some(&b"exch"[..]));
  136. assert_eq!(headers.get(3), Some(&b"price"[..]));
  137. assert_eq!(headers.get(6), Some(&b"ticker"[..]));
  138. let mut n = 0;
  139. let mut last_time = 0;
  140. while rdr.read_byte_record(&mut row)
  141. .map_err(|e| {
  142. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  143. })?
  144. {
  145. let trade: Trade = manual_deserialize_bytes(&row)
  146. .map_err(|e| {
  147. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  148. })?;
  149. assert!(trade.time >= last_time);
  150. last_time = trade.time;
  151. n += 1;
  152. }
  153. Ok(n)
  154. }
  155. fn hard_mode<R, W>(
  156. mut rdr: csv::Reader<R>,
  157. mut wtr: csv::Writer<W>,
  158. logger: &slog::Logger,
  159. ) -> Result<usize, String>
  160. where R: Read,
  161. W: Write
  162. {
  163. let logger = logger.new(o!("hard-mode" => "challenge accepted"));
  164. info!(logger, "beginning hard mode");
  165. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  166. let mut row = csv::StringRecord::new();
  167. // pull out first row to initialize query calculations
  168. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  169. let trade: Trade = row.deserialize(Some(&headers))
  170. .map_err(|e| {
  171. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  172. })?;
  173. let mut cur_bucket = trade.time - (trade.time % (ONE_SECOND * 10)) + ONE_SECOND * 10;
  174. // let mut next_bucket = cur_bucket + ONE_SECOND * 10;
  175. // let mut last_price: f64 = NAN;
  176. #[derive(Default, Clone)]
  177. struct Lookbacks<T> {
  178. pub p5: T,
  179. pub p15: T,
  180. pub p60: T,
  181. }
  182. let mut ratios: Lookbacks<f64> = Default::default();
  183. let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
  184. Lookbacks {
  185. p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
  186. p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
  187. p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
  188. };
  189. let mut gdax_windows = bmex_windows.clone();
  190. //let mut row_buffers: [Vec<u8>; 4] = [Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32)];
  191. let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
  192. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  193. ($trade:ident) => {{
  194. match $trade.exch {
  195. e!(bmex) => {
  196. bmex_windows.p5 .push($trade.time, $trade.price, $trade.amount);
  197. bmex_windows.p15.push($trade.time, $trade.price, $trade.amount);
  198. bmex_windows.p60.push($trade.time, $trade.price, $trade.amount);
  199. // last_price = $trade.price;
  200. }
  201. e!(gdax) => {
  202. gdax_windows.p5 .push($trade.time, $trade.price, $trade.amount);
  203. gdax_windows.p15.push($trade.time, $trade.price, $trade.amount);
  204. gdax_windows.p60.push($trade.time, $trade.price, $trade.amount);
  205. // last_price = $trade.price;
  206. }
  207. _ => {}
  208. }
  209. }}
  210. }
  211. wtr.write_record(&[
  212. "time",
  213. "r5",
  214. "r15",
  215. "r60",
  216. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  217. let mut wtr = wtr.into_inner().map_err(|e| format!("csv::Writer::into_inner failed: {}", e))?;
  218. if trade.ticker == t!(btc-usd) { update!(trade); }
  219. let mut n = 0;
  220. let mut n_written = 0;
  221. while rdr.read_record(&mut row)
  222. .map_err(|e| {
  223. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  224. })?
  225. {
  226. n += 1;
  227. let trade: Trade = row.deserialize(Some(&headers))
  228. .map_err(|e| {
  229. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  230. })?;
  231. if trade.time > cur_bucket {
  232. debug!(logger, "about to purge";
  233. "n" => n,
  234. "n written" => n_written,
  235. "trade.time" => trade.time,
  236. "cur_bucket" => cur_bucket,
  237. "gdax p5 len" => gdax_windows.p5.len(),
  238. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  239. );
  240. bmex_windows.p5 .purge(trade.time);
  241. bmex_windows.p15.purge(trade.time);
  242. bmex_windows.p60.purge(trade.time);
  243. gdax_windows.p5 .purge(trade.time);
  244. gdax_windows.p15.purge(trade.time);
  245. gdax_windows.p60.purge(trade.time);
  246. debug!(logger, "finished purge";
  247. "n" => n,
  248. "n written" => n_written,
  249. "trade.time" => trade.time,
  250. "cur_bucket" => cur_bucket,
  251. "gdax p5 len" => gdax_windows.p5.len(),
  252. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  253. );
  254. ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
  255. ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
  256. ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
  257. //row_buffers.iter_mut().for_each(|x| x.clear());
  258. row_buffer.clear();
  259. itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  260. row_buffer.push(b',');
  261. dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  262. row_buffer.push(b',');
  263. dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  264. row_buffer.push(b',');
  265. dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  266. row_buffer.push(b'\n');
  267. wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
  268. /*
  269. wtr.write_record(&row_buffers[..]).map_err(|e| {
  270. format!("writing csv row failed: {}", e)
  271. })?;
  272. */
  273. /*
  274. wtr.write_record(&[
  275. &format!("{}", cur_bucket),
  276. &format!("{}", ratios.p5),
  277. &format!("{}", ratios.p15),
  278. &format!("{}", ratios.p60),
  279. ]).map_err(|e| {
  280. format!("writing csv row failed: {}", e)
  281. })?;
  282. */
  283. n_written += 1;
  284. cur_bucket += ONE_SECOND * 10;
  285. }
  286. if trade.ticker == t!(btc-usd) { update!(trade); }
  287. if n % PROGRESS_EVERY == 0 {
  288. info!(logger, "calculating hard query";
  289. "n" => %n.thousands_sep(),
  290. "n_written" => %n_written.thousands_sep(),
  291. "ratios.p5" => ratios.p5,
  292. "ratios.p15" => ratios.p15,
  293. "ratios.p60" => ratios.p60,
  294. );
  295. }
  296. }
  297. info!(logger, "finished with hard query");
  298. Ok(n)
  299. }
  300. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  301. let opt = Opt::from_args();
  302. info!(logger, "initializing...";
  303. "trades-csv" => %opt.trades_csv.display(),
  304. "output-path" => %opt.output_path.display()
  305. );
  306. if ! opt.trades_csv.exists() {
  307. error!(logger, "path does not exist: {}", opt.trades_csv.display());
  308. fatal!("Error: path does not exist: {}", opt.trades_csv.display());
  309. }
  310. debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());
  311. let rdr = fs::File::open(&opt.trades_csv)
  312. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?;
  313. let rdr = io::BufReader::new(rdr);
  314. let mut rdr = csv::Reader::from_reader(rdr);
  315. // initializing --output-path CSV
  316. let wtr = fs::File::create(&opt.output_path)
  317. .map_err(|e| format!("creating output csv file failed: {} (tried to create {})", e, opt.output_path.display()))?;
  318. let wtr = io::BufWriter::new(wtr);
  319. let mut wtr = csv::Writer::from_writer(wtr);
  320. if opt.hard_mode { return hard_mode(rdr, wtr, &logger) }
  321. wtr.write_record(&[
  322. "time",
  323. "ratio",
  324. "bmex",
  325. "gdax",
  326. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  327. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  328. let mut row = csv::StringRecord::new();
  329. // pull out first row to initialize query calculations
  330. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  331. let trade: Trade = row.deserialize(Some(&headers))
  332. .map_err(|e| {
  333. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  334. })?;
  335. let mut cur_hour = trade.time - trade.time % ONE_HOUR;
  336. let mut next_hour = cur_hour + ONE_HOUR;
  337. let mut bmex_total = if trade.exch == e!(bmex) { trade.price * trade.amount } else { 0.0 };
  338. let mut bmex_amt = if trade.exch == e!(bmex) { trade.amount } else { 0.0 };
  339. let mut n_bmex = 0;
  340. let mut gdax_total = if trade.exch == e!(gdax) { trade.price * trade.amount } else { 0.0 };
  341. let mut gdax_amt = if trade.exch == e!(gdax) { trade.amount } else { 0.0 };
  342. let mut n_gdax = 0;
  343. let mut n = 0;
  344. let mut n_written = 0;
  345. let mut last_time = 0;
  346. while rdr.read_record(&mut row)
  347. .map_err(|e| {
  348. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  349. })?
  350. {
  351. let trade: Trade = row.deserialize(Some(&headers))
  352. .map_err(|e| {
  353. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  354. })?;
  355. n += 1;
  356. if trade.server_time != 0 {
  357. let diff: i64 = (trade.server_time as i64 - trade.time as i64) / 1000 / 1000;
  358. assert!(diff >= std::i32::MIN as i64, "diff = {}, trade = {:?}", diff, trade);
  359. assert!(diff <= std::i32::MAX as i64, "diff = {}, trade = {:?}", diff, trade);
  360. }
  361. // verify data is sorted by time
  362. assert!(trade.time >= last_time);
  363. last_time = trade.time;
  364. if trade.ticker != t!(btc-usd) { continue }
  365. if trade.time >= next_hour {
  366. // `trade` is past the last hour bucket, so finalize/write last
  367. // hour results, and reset state for this hour
  368. if n_bmex == 0 || n_gdax == 0 {
  369. wtr.write_record(&[
  370. &format!("{}", cur_hour),
  371. "NaN",
  372. "NaN",
  373. "NaN",
  374. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  375. } else {
  376. let bmex_wt_avg = bmex_total / bmex_amt;
  377. let gdax_wt_avg = gdax_total / gdax_amt;
  378. let ratio = bmex_wt_avg / gdax_wt_avg;
  379. wtr.write_record(&[
  380. &format!("{}", cur_hour),
  381. &format!("{}", ratio),
  382. &format!("{}", bmex_wt_avg),
  383. &format!("{}", gdax_wt_avg),
  384. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  385. }
  386. n_written += 1;
  387. // reset state
  388. bmex_total = 0.0;
  389. bmex_amt = 0.0;
  390. gdax_total = 0.0;
  391. gdax_amt = 0.0;
  392. n_bmex = 0;
  393. n_gdax = 0;
  394. cur_hour = next_hour;
  395. next_hour += ONE_HOUR;
  396. // if we are skipping hours in between the last and current row, we
  397. // need to write a NaN row for the hours that had no data
  398. while next_hour <= trade.time {
  399. wtr.write_record(&[
  400. &format!("{}", cur_hour),
  401. "NaN",
  402. "NaN",
  403. "NaN",
  404. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  405. n_written += 1;
  406. cur_hour = next_hour;
  407. next_hour += ONE_HOUR;
  408. }
  409. }
  410. match trade.exch {
  411. e!(bmex) => {
  412. bmex_total += trade.price * trade.amount;
  413. bmex_amt += trade.amount;
  414. n_bmex += 1;
  415. }
  416. e!(gdax) => {
  417. gdax_total += trade.price * trade.amount;
  418. gdax_amt += trade.amount;
  419. n_gdax += 1;
  420. }
  421. _ => {}
  422. }
  423. if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
  424. info!(logger, "parsing csv file";
  425. "n rows" => %n.thousands_sep(),
  426. "n written" => %n_written.thousands_sep(),
  427. "elapsed" => ?(Instant::now() - start),
  428. );
  429. }
  430. if cfg!(debug_assertions) && n > PROGRESS_EVERY {
  431. warn!(logger, "debug mode: exiting early";
  432. "n rows" => %n.thousands_sep(),
  433. "n written" => %n_written.thousands_sep(),
  434. "elapsed" => ?(Instant::now() - start),
  435. );
  436. break
  437. }
  438. }
  439. // intentionally skipping handling the partial hour here
  440. info!(logger, "finished parsing CSV/calculating query. closing output file");
  441. drop(wtr);
  442. Ok(n)
  443. }
  444. fn main() {
  445. let start = Instant::now();
  446. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  447. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  448. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  449. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  450. match run(start, &logger) {
  451. Ok(n) => {
  452. let took = Instant::now() - start;
  453. let took_secs = took.as_millis() as f64 / 1000.0;
  454. let took_str = format!("{}min, {:.1}sec", took.as_secs() / 60, took_secs % 60.0);
  455. info!(logger, "finished in {}", took_str;
  456. "n rows" => %n.thousands_sep(),
  457. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
  458. );
  459. }
  460. Err(e) => {
  461. crit!(logger, "run failed: {:?}", e);
  462. eprintln!("\n\nError: {}", e);
  463. std::thread::sleep(Duration::from_millis(100));
  464. std::process::exit(1);
  465. }
  466. }
  467. }