You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

586 lines
20KB

  1. #![allow(unused_imports)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::path::PathBuf;
  7. use std::time::*;
  8. use std::io::{self, prelude::*};
  9. use std::fs;
  10. use std::f64::NAN;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use pretty_toa::ThousandsSep;
  15. use markets::crypto::{Exchange, Ticker, Side};
  16. use pipelines::windows::WeightedMeanWindow;
  17. // equivalent to panic! but without the ugly 'thread main panicked' yada yada
  18. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  19. eprintln!($fmt, $($args)*);
  20. std::process::exit(1);
  21. }}}
  22. const PROGRESS_EVERY: usize = 1024 * 1024;
  23. const ONE_SECOND: u64 = 1_000_000_000;
  24. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  25. #[derive(Debug, StructOpt)]
  26. struct Opt {
  27. /// Path to CSV file with trades data
  28. #[structopt(short = "f", long = "trades-csv")]
  29. #[structopt(parse(from_os_str))]
  30. trades_csv: PathBuf,
  31. /// Where to save the query results (CSV output)
  32. #[structopt(short = "o", long = "output-path")]
  33. #[structopt(parse(from_os_str))]
  34. output_path: PathBuf,
  35. #[structopt(short = "z", long = "hard-mode")]
  36. hard_mode: bool,
  37. }
  38. #[derive(Deserialize)]
  39. struct Trade {
  40. /// Time of trade in unix nanoseconds
  41. pub time: u64,
  42. /// Exchange where trade executed
  43. pub exch: Exchange,
  44. /// Currency rate of trade (base/quote)
  45. pub ticker: Ticker,
  46. /// Price of trade, in quote denomination
  47. pub price: f64,
  48. /// Size/Volume of trade, in base denomination
  49. pub amount: f64,
  50. }
  51. fn per_sec(n: usize, span: Duration) -> f64 {
  52. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  53. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  54. n as f64 / s
  55. }
  56. #[allow(dead_code)]
  57. #[inline(always)]
  58. fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> {
  59. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?)
  60. .ok_or("parsing time failed")?;
  61. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  62. .map_err(|_| "parsing amount failed")?;
  63. let exch = match row.get(2).ok_or("no exch")? {
  64. b"bmex" => e!(bmex),
  65. b"bnce" => e!(bnce),
  66. b"btfx" => e!(btfx),
  67. b"gdax" => e!(gdax),
  68. b"okex" => e!(okex),
  69. b"bits" => e!(bits),
  70. b"plnx" => e!(plnx),
  71. b"krkn" => e!(krkn),
  72. _ => return Err("illegal exch"),
  73. };
  74. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  75. .map_err(|_| "parsing price failed")?;
  76. let ticker = match row.get(6).ok_or("no ticker")? {
  77. b"btc_usd" => t!(btc-usd),
  78. b"eth_usd" => t!(eth-usd),
  79. b"ltc_usd" => t!(ltc-usd),
  80. b"etc_usd" => t!(etc-usd),
  81. b"bch_usd" => t!(bch-usd),
  82. b"xmr_usd" => t!(xmr-usd),
  83. b"usdt_usd" => t!(usdt-usd),
  84. _ => return Err("illegal ticker"),
  85. };
  86. Ok(Trade { time, amount, exch, price, ticker })
  87. }
  88. #[allow(dead_code)]
  89. #[inline(always)]
  90. fn manual_deserialize_str(row: &csv::StringRecord) -> Result<Trade, &'static str> {
  91. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes())
  92. .ok_or("parsing time failed")?;
  93. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  94. .map_err(|_| "parsing amount failed")?;
  95. let exch = match row.get(2).ok_or("no exch")? {
  96. "bmex" => e!(bmex),
  97. "bnce" => e!(bnce),
  98. "btfx" => e!(btfx),
  99. "gdax" => e!(gdax),
  100. "okex" => e!(okex),
  101. "bits" => e!(bits),
  102. "plnx" => e!(plnx),
  103. "krkn" => e!(krkn),
  104. _ => return Err("illegal exch"),
  105. };
  106. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  107. .map_err(|_| "parsing price failed")?;
  108. let ticker = match row.get(6).ok_or("no ticker")? {
  109. "btc_usd" => t!(btc-usd),
  110. "eth_usd" => t!(eth-usd),
  111. "ltc_usd" => t!(ltc-usd),
  112. "etc_usd" => t!(etc-usd),
  113. "bch_usd" => t!(bch-usd),
  114. "xmr_usd" => t!(xmr-usd),
  115. "usdt_usd" => t!(usdt-usd),
  116. _ => return Err("illegal ticker"),
  117. };
  118. Ok(Trade { time, amount, exch, price, ticker })
  119. }
  120. /// Example of code used in discussion of increasing CSV parsing performance
  121. #[allow(dead_code)]
  122. fn fast_parse_bytes<R: Read>(mut rdr: csv::Reader<R>) -> Result<usize, String> {
  123. // our data is ascii, so parsing with the slightly faster ByteRecord is fine
  124. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  125. let mut row = csv::ByteRecord::new();
  126. // manual_deserialize_bytes assumes the column order of the CSV,
  127. // so here we verify that it actually matches that assumption
  128. assert_eq!(headers.get(0), Some(&b"time"[..]));
  129. assert_eq!(headers.get(1), Some(&b"amount"[..]));
  130. assert_eq!(headers.get(2), Some(&b"exch"[..]));
  131. assert_eq!(headers.get(3), Some(&b"price"[..]));
  132. assert_eq!(headers.get(6), Some(&b"ticker"[..]));
  133. let mut n = 0;
  134. let mut last_time = 0;
  135. while rdr.read_byte_record(&mut row)
  136. .map_err(|e| {
  137. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  138. })?
  139. {
  140. let trade: Trade = manual_deserialize_bytes(&row)
  141. .map_err(|e| {
  142. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  143. })?;
  144. assert!(trade.time >= last_time);
  145. last_time = trade.time;
  146. n += 1;
  147. }
  148. Ok(n)
  149. }
  150. fn hard_mode<R, W>(
  151. mut rdr: csv::Reader<R>,
  152. mut wtr: csv::Writer<W>,
  153. logger: &slog::Logger,
  154. ) -> Result<usize, String>
  155. where R: Read,
  156. W: Write
  157. {
  158. let logger = logger.new(o!("hard-mode" => "challenge accepted"));
  159. info!(logger, "beginning hard mode");
  160. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  161. let mut row = csv::StringRecord::new();
  162. // pull out first row to initialize query calculations
  163. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  164. let trade: Trade = row.deserialize(Some(&headers))
  165. .map_err(|e| {
  166. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  167. })?;
  168. let mut cur_bucket = trade.time - (trade.time % (ONE_SECOND * 10)) + ONE_SECOND * 10;
  169. //let mut next_bucket = cur_bucket + ONE_SECOND * 10;
  170. let mut last_price: f64 = NAN;
  171. #[derive(Default, Clone)]
  172. struct Lookbacks<T> {
  173. pub p5: T,
  174. pub p15: T,
  175. pub p60: T,
  176. }
  177. let mut bprices: Lookbacks<f64> = Default::default();
  178. let mut gprices: Lookbacks<f64> = Default::default();
  179. let mut ratios: Lookbacks<f64> = Default::default();
  180. let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
  181. Lookbacks {
  182. p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
  183. p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
  184. p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
  185. };
  186. let mut gdax_windows = bmex_windows.clone();
  187. #[inline(always)]
  188. fn do_purge(windows: &mut Lookbacks<WeightedMeanWindow>, prices: &mut Lookbacks<f64>, time: u64) {
  189. //if windows.p5.purge(time) { prices.p5 = windows.p5 .checked_weighted_mean().unwrap_or(NAN); }
  190. //if windows.p15.purge(time) { prices.p15 = windows.p15.checked_weighted_mean().unwrap_or(NAN); }
  191. //if windows.p60.purge(time) { prices.p60 = windows.p60.checked_weighted_mean().unwrap_or(NAN); }
  192. windows.p5 .purge(time);
  193. windows.p15.purge(time);
  194. windows.p60.purge(time);
  195. }
  196. #[allow(unused)]
  197. #[inline(always)]
  198. fn do_update(windows: &mut Lookbacks<WeightedMeanWindow>, prices: &mut Lookbacks<f64>, time: u64, price: f64, amount: f64) {
  199. //prices.p5 = windows.p5 .update(time, price, amount).unwrap_or(NAN);
  200. //prices.p15 = windows.p15.update(time, price, amount).unwrap_or(NAN);
  201. //prices.p60 = windows.p60.update(time, price, amount).unwrap_or(NAN);
  202. windows.p5 .push(time, price, amount);
  203. windows.p15.push(time, price, amount);
  204. windows.p60.push(time, price, amount);
  205. }
  206. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  207. ($trade:ident) => {{
  208. match $trade.exch {
  209. e!(bmex) => {
  210. do_update(&mut bmex_windows, &mut bprices, $trade.time, $trade.price, $trade.amount);
  211. //do_purge(&mut gdax_windows, &mut gprices, $trade.time);
  212. last_price = $trade.price;
  213. }
  214. e!(gdax) => {
  215. do_update(&mut gdax_windows, &mut gprices, $trade.time, $trade.price, $trade.amount);
  216. //do_purge(&mut bmex_windows, &mut bprices, $trade.time);
  217. last_price = $trade.price;
  218. }
  219. _ => {}
  220. }
  221. }}
  222. }
  223. wtr.write_record(&[
  224. "time",
  225. //"last",
  226. //"bmex_5min",
  227. //"gdax_5min",
  228. //"n_bmex_p5",
  229. //"n_gdax_p5",
  230. "r5",
  231. "r15",
  232. "r60",
  233. //"n_bmex_p5",
  234. //"n_bmex_p15",
  235. //"n_bmex_p60",
  236. //"n_gdax_p5",
  237. //"n_gdax_p15",
  238. //"n_gdax_p60",
  239. //"gdax_p5_is_empty",
  240. //"gdax_p5_checked_weighted_mean",
  241. //"tradetime_minus_cur_bucket",
  242. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  243. if trade.ticker == t!(btc-usd) { update!(trade); }
  244. let mut n = 0;
  245. let mut n_written = 0;
  246. while rdr.read_record(&mut row)
  247. .map_err(|e| {
  248. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  249. })?
  250. {
  251. n += 1;
  252. let trade: Trade = row.deserialize(Some(&headers))
  253. .map_err(|e| {
  254. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  255. })?;
  256. if trade.time > cur_bucket {
  257. debug!(logger, "about to purge";
  258. "n" => n,
  259. "n written" => n_written,
  260. "trade.time" => trade.time,
  261. "cur_bucket" => cur_bucket,
  262. "gdax p5 len" => gdax_windows.p5.len(),
  263. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  264. );
  265. do_purge(&mut gdax_windows, &mut gprices, cur_bucket);
  266. do_purge(&mut bmex_windows, &mut bprices, cur_bucket);
  267. debug!(logger, "finished purge";
  268. "n" => n,
  269. "n written" => n_written,
  270. "trade.time" => trade.time,
  271. "cur_bucket" => cur_bucket,
  272. "gdax p5 len" => gdax_windows.p5.len(),
  273. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  274. );
  275. ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
  276. ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
  277. ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
  278. //ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
  279. //ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
  280. //ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
  281. wtr.write_record(&[
  282. &format!("{}", cur_bucket),
  283. //&format!("{}", last_price),
  284. //&format!("{}", bmex_windows.p5.checked_weighted_mean().unwrap_or(NAN)),
  285. //&format!("{}", gdax_windows.p5.checked_weighted_mean().unwrap_or(NAN)),
  286. //&format!("{}", bmex_windows.p5.len()),
  287. //&format!("{}", gdax_windows.p5.len()),
  288. &format!("{}", ratios.p5),
  289. &format!("{}", ratios.p15),
  290. &format!("{}", ratios.p60),
  291. //&format!("{}", bmex_windows.p15.len()),
  292. //&format!("{}", gdax_windows.p60.len()),
  293. //&format!("{}", gdax_windows.p15.len()),
  294. //&format!("{}", gdax_windows.p15.len()),
  295. //&format!("{}", bmex_windows.p60.len()),
  296. //&format!("{}", bmex_windows.p5.is_empty()),
  297. //&format!("{:?}", bmex_windows.p5.checked_weighted_mean()),
  298. //&format!("{}", trade.time - cur_bucket),
  299. ]).map_err(|e| {
  300. format!("writing csv row failed: {}", e)
  301. })?;
  302. n_written += 1;
  303. cur_bucket += ONE_SECOND * 10;
  304. //cur_bucket = next_bucket;
  305. //next_bucket += ONE_SECOND * 10;
  306. }
  307. if trade.ticker == t!(btc-usd) { update!(trade); }
  308. if n % PROGRESS_EVERY == 0 {
  309. info!(logger, "calculating hard query";
  310. "n" => %n.thousands_sep(),
  311. "n_written" => %n_written.thousands_sep(),
  312. "ratios.p5" => ratios.p5,
  313. "ratios.p15" => ratios.p15,
  314. "ratios.p60" => ratios.p60,
  315. );
  316. }
  317. }
  318. info!(logger, "finished with hard query");
  319. Ok(n)
  320. }
  321. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  322. let opt = Opt::from_args();
  323. info!(logger, "initializing...";
  324. "trades-csv" => %opt.trades_csv.display(),
  325. "output-path" => %opt.output_path.display()
  326. );
  327. if ! opt.trades_csv.exists() {
  328. error!(logger, "path does not exist: {}", opt.trades_csv.display());
  329. fatal!("Error: path does not exist: {}", opt.trades_csv.display());
  330. }
  331. debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());
  332. let rdr = fs::File::open(&opt.trades_csv)
  333. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?;
  334. let rdr = io::BufReader::new(rdr);
  335. let mut rdr = csv::Reader::from_reader(rdr);
  336. // initializing --output-path CSV
  337. let wtr = fs::File::create(&opt.output_path)
  338. .map_err(|e| format!("creating output csv file failed: {} (tried to create {})", e, opt.output_path.display()))?;
  339. let wtr = io::BufWriter::new(wtr);
  340. let mut wtr = csv::Writer::from_writer(wtr);
  341. if opt.hard_mode { return hard_mode(rdr, wtr, &logger) }
  342. wtr.write_record(&[
  343. "time",
  344. "ratio",
  345. "bmex",
  346. "gdax",
  347. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  348. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  349. let mut row = csv::StringRecord::new();
  350. // pull out first row to initialize query calculations
  351. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  352. let trade: Trade = row.deserialize(Some(&headers))
  353. .map_err(|e| {
  354. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  355. })?;
  356. let mut cur_hour = trade.time - trade.time % ONE_HOUR;
  357. let mut next_hour = cur_hour + ONE_HOUR;
  358. let mut bmex_total = if trade.exch == e!(bmex) { trade.price * trade.amount } else { 0.0 };
  359. let mut bmex_amt = if trade.exch == e!(bmex) { trade.amount } else { 0.0 };
  360. let mut n_bmex = 0;
  361. let mut gdax_total = if trade.exch == e!(gdax) { trade.price * trade.amount } else { 0.0 };
  362. let mut gdax_amt = if trade.exch == e!(gdax) { trade.amount } else { 0.0 };
  363. let mut n_gdax = 0;
  364. let mut n = 0;
  365. let mut n_written = 0;
  366. let mut last_time = 0;
  367. while rdr.read_record(&mut row)
  368. .map_err(|e| {
  369. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  370. })?
  371. {
  372. let trade: Trade = row.deserialize(Some(&headers))
  373. .map_err(|e| {
  374. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  375. })?;
  376. n += 1;
  377. // verify data is sorted by time
  378. assert!(trade.time >= last_time);
  379. last_time = trade.time;
  380. if trade.ticker != t!(btc-usd) { continue }
  381. if trade.time >= next_hour {
  382. // `trade` is past the last hour bucket, so finalize/write last
  383. // hour results, and reset state for this hour
  384. if n_bmex == 0 || n_gdax == 0 {
  385. wtr.write_record(&[
  386. &format!("{}", cur_hour),
  387. "NaN",
  388. "NaN",
  389. "NaN",
  390. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  391. } else {
  392. let bmex_wt_avg = bmex_total / bmex_amt;
  393. let gdax_wt_avg = gdax_total / gdax_amt;
  394. let ratio = bmex_wt_avg / gdax_wt_avg;
  395. wtr.write_record(&[
  396. &format!("{}", cur_hour),
  397. &format!("{}", ratio),
  398. &format!("{}", bmex_wt_avg),
  399. &format!("{}", gdax_wt_avg),
  400. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  401. }
  402. n_written += 1;
  403. // reset state
  404. bmex_total = 0.0;
  405. bmex_amt = 0.0;
  406. gdax_total = 0.0;
  407. gdax_amt = 0.0;
  408. n_bmex = 0;
  409. n_gdax = 0;
  410. cur_hour = next_hour;
  411. next_hour += ONE_HOUR;
  412. // if we are skipping hours in between the last and current row, we
  413. // need to write a NaN row for the hours that had no data
  414. while next_hour <= trade.time {
  415. wtr.write_record(&[
  416. &format!("{}", cur_hour),
  417. "NaN",
  418. "NaN",
  419. "NaN",
  420. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  421. n_written += 1;
  422. cur_hour = next_hour;
  423. next_hour += ONE_HOUR;
  424. }
  425. }
  426. match trade.exch {
  427. e!(bmex) => {
  428. bmex_total += trade.price * trade.amount;
  429. bmex_amt += trade.amount;
  430. n_bmex += 1;
  431. }
  432. e!(gdax) => {
  433. gdax_total += trade.price * trade.amount;
  434. gdax_amt += trade.amount;
  435. n_gdax += 1;
  436. }
  437. _ => {}
  438. }
  439. if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
  440. info!(logger, "parsing csv file";
  441. "n rows" => %n.thousands_sep(),
  442. "n written" => %n_written.thousands_sep(),
  443. "elapsed" => ?(Instant::now() - start),
  444. );
  445. }
  446. if cfg!(debug_assertions) && n > PROGRESS_EVERY {
  447. warn!(logger, "debug mode: exiting early";
  448. "n rows" => %n.thousands_sep(),
  449. "n written" => %n_written.thousands_sep(),
  450. "elapsed" => ?(Instant::now() - start),
  451. );
  452. break
  453. }
  454. }
  455. // intentionally skipping handling the partial hour here
  456. info!(logger, "finished parsing CSV/calculating query. closing output file");
  457. drop(wtr);
  458. Ok(n)
  459. }
  460. fn main() {
  461. let start = Instant::now();
  462. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  463. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  464. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  465. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  466. match run(start, &logger) {
  467. Ok(n) => {
  468. let took = Instant::now() - start;
  469. let took_secs = took.as_millis() as f64 / 1000.0;
  470. let took_str = format!("{}min, {:.1}sec", took.as_secs() / 60, took_secs % 60.0);
  471. info!(logger, "finished in {}", took_str;
  472. "n rows" => %n.thousands_sep(),
  473. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
  474. );
  475. }
  476. Err(e) => {
  477. crit!(logger, "run failed: {:?}", e);
  478. eprintln!("\n\nError: {}", e);
  479. std::thread::sleep(Duration::from_millis(100));
  480. std::process::exit(1);
  481. }
  482. }
  483. }