You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

583 lines
20KB

  1. #![allow(unused_imports)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::path::PathBuf;
  7. use std::time::*;
  8. use std::io::{self, prelude::*};
  9. use std::fs;
  10. use std::f64::NAN;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use pretty_toa::ThousandsSep;
  15. use markets::crypto::{Exchange, Ticker, Side};
  16. use pipelines::windows::WeightedAvgWindow;
  17. // equivalent to panic! but without the ugly 'thread main panicked' yada yada
  18. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  19. eprintln!($fmt, $($args)*);
  20. std::process::exit(1);
  21. }}}
  22. const PROGRESS_EVERY: usize = 1024 * 1024;
  23. const ONE_SECOND: u64 = 1_000_000_000;
  24. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  25. #[derive(Debug, StructOpt)]
  26. struct Opt {
  27. /// Path to CSV file with trades data
  28. #[structopt(short = "f", long = "trades-csv")]
  29. #[structopt(parse(from_os_str))]
  30. trades_csv: PathBuf,
  31. /// Where to save the query results (CSV output)
  32. #[structopt(short = "o", long = "output-path")]
  33. #[structopt(parse(from_os_str))]
  34. output_path: PathBuf,
  35. #[structopt(short = "z", long = "hard-mode")]
  36. hard_mode: bool,
  37. }
  38. #[derive(Deserialize)]
  39. struct Trade {
  40. /// Time of trade in unix nanoseconds
  41. pub time: u64,
  42. /// Exchange where trade executed
  43. pub exch: Exchange,
  44. /// Currency rate of trade (base/quote)
  45. pub ticker: Ticker,
  46. /// Price of trade, in quote denomination
  47. pub price: f64,
  48. /// Size/Volume of trade, in base denomination
  49. pub amount: f64,
  50. }
  51. fn per_sec(n: usize, span: Duration) -> f64 {
  52. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  53. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  54. n as f64 / s
  55. }
  56. #[allow(dead_code)]
  57. #[inline(always)]
  58. fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> {
  59. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?)
  60. .ok_or("parsing time failed")?;
  61. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  62. .map_err(|_| "parsing amount failed")?;
  63. let exch = match row.get(2).ok_or("no exch")? {
  64. b"bmex" => e!(bmex),
  65. b"bnce" => e!(bnce),
  66. b"btfx" => e!(btfx),
  67. b"gdax" => e!(gdax),
  68. b"okex" => e!(okex),
  69. b"bits" => e!(bits),
  70. b"plnx" => e!(plnx),
  71. b"krkn" => e!(krkn),
  72. _ => return Err("illegal exch"),
  73. };
  74. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  75. .map_err(|_| "parsing price failed")?;
  76. let ticker = match row.get(6).ok_or("no ticker")? {
  77. b"btc_usd" => t!(btc-usd),
  78. b"eth_usd" => t!(eth-usd),
  79. b"ltc_usd" => t!(ltc-usd),
  80. b"etc_usd" => t!(etc-usd),
  81. b"bch_usd" => t!(bch-usd),
  82. b"xmr_usd" => t!(xmr-usd),
  83. b"usdt_usd" => t!(usdt-usd),
  84. _ => return Err("illegal ticker"),
  85. };
  86. Ok(Trade { time, amount, exch, price, ticker })
  87. }
  88. #[allow(dead_code)]
  89. #[inline(always)]
  90. fn manual_deserialize_str(row: &csv::StringRecord) -> Result<Trade, &'static str> {
  91. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes())
  92. .ok_or("parsing time failed")?;
  93. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  94. .map_err(|_| "parsing amount failed")?;
  95. let exch = match row.get(2).ok_or("no exch")? {
  96. "bmex" => e!(bmex),
  97. "bnce" => e!(bnce),
  98. "btfx" => e!(btfx),
  99. "gdax" => e!(gdax),
  100. "okex" => e!(okex),
  101. "bits" => e!(bits),
  102. "plnx" => e!(plnx),
  103. "krkn" => e!(krkn),
  104. _ => return Err("illegal exch"),
  105. };
  106. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  107. .map_err(|_| "parsing price failed")?;
  108. let ticker = match row.get(6).ok_or("no ticker")? {
  109. "btc_usd" => t!(btc-usd),
  110. "eth_usd" => t!(eth-usd),
  111. "ltc_usd" => t!(ltc-usd),
  112. "etc_usd" => t!(etc-usd),
  113. "bch_usd" => t!(bch-usd),
  114. "xmr_usd" => t!(xmr-usd),
  115. "usdt_usd" => t!(usdt-usd),
  116. _ => return Err("illegal ticker"),
  117. };
  118. Ok(Trade { time, amount, exch, price, ticker })
  119. }
  120. /// Example of code used in discussion of increasing CSV parsing performance
  121. #[allow(dead_code)]
  122. fn fast_parse_bytes<R: Read>(mut rdr: csv::Reader<R>) -> Result<usize, String> {
  123. // our data is ascii, so parsing with the slightly faster ByteRecord is fine
  124. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  125. let mut row = csv::ByteRecord::new();
  126. // manual_deserialize_bytes assumes the column order of the CSV,
  127. // so here we verify that it actually matches that assumption
  128. assert_eq!(headers.get(0), Some(&b"time"[..]));
  129. assert_eq!(headers.get(1), Some(&b"amount"[..]));
  130. assert_eq!(headers.get(2), Some(&b"exch"[..]));
  131. assert_eq!(headers.get(3), Some(&b"price"[..]));
  132. assert_eq!(headers.get(6), Some(&b"ticker"[..]));
  133. let mut n = 0;
  134. let mut last_time = 0;
  135. while rdr.read_byte_record(&mut row)
  136. .map_err(|e| {
  137. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  138. })?
  139. {
  140. let trade: Trade = manual_deserialize_bytes(&row)
  141. .map_err(|e| {
  142. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  143. })?;
  144. assert!(trade.time >= last_time);
  145. last_time = trade.time;
  146. n += 1;
  147. }
  148. Ok(n)
  149. }
  150. fn hard_mode<R, W>(
  151. mut rdr: csv::Reader<R>,
  152. mut wtr: csv::Writer<W>,
  153. logger: &slog::Logger,
  154. ) -> Result<usize, String>
  155. where R: Read,
  156. W: Write
  157. {
  158. let logger = logger.new(o!("hard-mode" => "challenge accepted"));
  159. info!(logger, "beginning hard mode");
  160. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  161. let mut row = csv::StringRecord::new();
  162. // pull out first row to initialize query calculations
  163. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  164. let trade: Trade = row.deserialize(Some(&headers))
  165. .map_err(|e| {
  166. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  167. })?;
  168. let mut cur_bucket = trade.time - (trade.time % (ONE_SECOND * 10)) + ONE_SECOND * 10;
  169. //let mut next_bucket = cur_bucket + ONE_SECOND * 10;
  170. let mut last_price: f64 = NAN;
  171. #[derive(Default, Clone)]
  172. struct Lookbacks<T> {
  173. pub p5: T,
  174. pub p15: T,
  175. pub p60: T,
  176. }
  177. let mut bprices: Lookbacks<f64> = Default::default();
  178. let mut gprices: Lookbacks<f64> = Default::default();
  179. let mut ratios: Lookbacks<f64> = Default::default();
  180. let mut bwindows: Lookbacks<WeightedAvgWindow> =
  181. Lookbacks {
  182. p5: WeightedAvgWindow::new(ONE_SECOND * 60 * 5 ),
  183. p15: WeightedAvgWindow::new(ONE_SECOND * 60 * 15),
  184. p60: WeightedAvgWindow::new(ONE_SECOND * 60 * 60),
  185. };
  186. let mut gwindows = bwindows.clone();
  187. #[inline(always)]
  188. fn do_purge(windows: &mut Lookbacks<WeightedAvgWindow>, prices: &mut Lookbacks<f64>, time: u64) {
  189. if windows.p5.purge(time) { prices.p5 = windows.p5 .checked_wt_mean().unwrap_or(NAN); }
  190. if windows.p15.purge(time) { prices.p15 = windows.p15.checked_wt_mean().unwrap_or(NAN); }
  191. if windows.p60.purge(time) { prices.p60 = windows.p60.checked_wt_mean().unwrap_or(NAN); }
  192. }
  193. #[inline(always)]
  194. fn do_update(windows: &mut Lookbacks<WeightedAvgWindow>, prices: &mut Lookbacks<f64>, time: u64, price: f64, amount: f64) {
  195. //prices.p5 = windows.p5 .update(time, price, amount).unwrap_or(NAN);
  196. //prices.p15 = windows.p15.update(time, price, amount).unwrap_or(NAN);
  197. //prices.p60 = windows.p60.update(time, price, amount).unwrap_or(NAN);
  198. windows.p5 .push(time, price, amount);
  199. windows.p15.push(time, price, amount);
  200. windows.p60.push(time, price, amount);
  201. }
  202. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  203. ($trade:ident) => {{
  204. match $trade.exch {
  205. e!(bmex) => {
  206. do_update(&mut bwindows, &mut bprices, $trade.time, $trade.price, $trade.amount);
  207. //do_purge(&mut gwindows, &mut gprices, $trade.time);
  208. last_price = $trade.price;
  209. }
  210. e!(gdax) => {
  211. do_update(&mut gwindows, &mut gprices, $trade.time, $trade.price, $trade.amount);
  212. //do_purge(&mut bwindows, &mut bprices, $trade.time);
  213. last_price = $trade.price;
  214. }
  215. _ => {}
  216. }
  217. }}
  218. }
  219. wtr.write_record(&[
  220. "time",
  221. "last",
  222. "bmex_5min",
  223. "gdax_5min",
  224. "n_bmex_p5",
  225. "n_gdax_p5",
  226. "r5",
  227. "r15",
  228. "r60",
  229. //"n_bmex_p5",
  230. //"n_bmex_p15",
  231. //"n_bmex_p60",
  232. //"n_gdax_p5",
  233. //"n_gdax_p15",
  234. //"n_gdax_p60",
  235. //"gdax_p5_is_empty",
  236. //"gdax_p5_checked_wt_mean",
  237. //"tradetime_minus_cur_bucket",
  238. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  239. if trade.ticker == t!(btc-usd) { update!(trade); }
  240. let mut n = 0;
  241. let mut n_written = 0;
  242. while rdr.read_record(&mut row)
  243. .map_err(|e| {
  244. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  245. })?
  246. {
  247. n += 1;
  248. let trade: Trade = row.deserialize(Some(&headers))
  249. .map_err(|e| {
  250. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  251. })?;
  252. if trade.time > cur_bucket {
  253. debug!(logger, "about to purge";
  254. "n" => n,
  255. "n written" => n_written,
  256. "trade.time" => trade.time,
  257. "cur_bucket" => cur_bucket,
  258. "gdax p5 len" => gwindows.p5.len(),
  259. "gdax p5 wt avg" => gwindows.p5.wt_mean(),
  260. );
  261. do_purge(&mut gwindows, &mut gprices, cur_bucket);
  262. do_purge(&mut bwindows, &mut bprices, cur_bucket);
  263. debug!(logger, "finished purge";
  264. "n" => n,
  265. "n written" => n_written,
  266. "trade.time" => trade.time,
  267. "cur_bucket" => cur_bucket,
  268. "gdax p5 len" => gwindows.p5.len(),
  269. "gdax p5 wt avg" => gwindows.p5.wt_mean(),
  270. );
  271. ratios.p5 = bwindows.p5 .checked_wt_mean().unwrap_or(NAN) / gwindows.p5 .checked_wt_mean().unwrap_or(NAN);
  272. ratios.p15 = bwindows.p15.checked_wt_mean().unwrap_or(NAN) / gwindows.p15.checked_wt_mean().unwrap_or(NAN);
  273. ratios.p60 = bwindows.p60.checked_wt_mean().unwrap_or(NAN) / gwindows.p60.checked_wt_mean().unwrap_or(NAN);
  274. //ratios.p5 = bwindows.p5 .wt_mean() / gwindows.p5 .wt_mean();
  275. //ratios.p15 = bwindows.p15.wt_mean() / gwindows.p15.wt_mean();
  276. //ratios.p60 = bwindows.p60.wt_mean() / gwindows.p60.wt_mean();
  277. wtr.write_record(&[
  278. &format!("{}", cur_bucket),
  279. &format!("{}", last_price),
  280. &format!("{}", bwindows.p5.checked_wt_mean().unwrap_or(NAN)),
  281. &format!("{}", gwindows.p5.checked_wt_mean().unwrap_or(NAN)),
  282. &format!("{}", bwindows.p5.len()),
  283. &format!("{}", gwindows.p5.len()),
  284. &format!("{}", ratios.p5),
  285. &format!("{}", ratios.p15),
  286. &format!("{}", ratios.p60),
  287. //&format!("{}", bwindows.p15.len()),
  288. //&format!("{}", gwindows.p60.len()),
  289. //&format!("{}", gwindows.p15.len()),
  290. //&format!("{}", gwindows.p15.len()),
  291. //&format!("{}", bwindows.p60.len()),
  292. //&format!("{}", bwindows.p5.is_empty()),
  293. //&format!("{:?}", bwindows.p5.checked_wt_mean()),
  294. //&format!("{}", trade.time - cur_bucket),
  295. ]).map_err(|e| {
  296. format!("writing csv row failed: {}", e)
  297. })?;
  298. n_written += 1;
  299. cur_bucket += ONE_SECOND * 10;
  300. //cur_bucket = next_bucket;
  301. //next_bucket += ONE_SECOND * 10;
  302. }
  303. if trade.ticker == t!(btc-usd) { update!(trade); }
  304. if n % PROGRESS_EVERY == 0 {
  305. info!(logger, "calculating hard query";
  306. "n" => %n.thousands_sep(),
  307. "n_written" => %n_written.thousands_sep(),
  308. "ratios.p5" => ratios.p5,
  309. "ratios.p15" => ratios.p15,
  310. "ratios.p60" => ratios.p60,
  311. );
  312. }
  313. }
  314. info!(logger, "finished with hard query");
  315. Ok(n)
  316. }
  317. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  318. let opt = Opt::from_args();
  319. info!(logger, "initializing...";
  320. "trades-csv" => %opt.trades_csv.display(),
  321. "output-path" => %opt.output_path.display()
  322. );
  323. if ! opt.trades_csv.exists() {
  324. error!(logger, "path does not exist: {}", opt.trades_csv.display());
  325. fatal!("Error: path does not exist: {}", opt.trades_csv.display());
  326. }
  327. debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());
  328. let rdr = fs::File::open(&opt.trades_csv)
  329. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?;
  330. let rdr = io::BufReader::new(rdr);
  331. let mut rdr = csv::Reader::from_reader(rdr);
  332. // initializing --output-path CSV
  333. let wtr = fs::File::create(&opt.output_path)
  334. .map_err(|e| format!("creating output csv file failed: {} (tried to create {})", e, opt.output_path.display()))?;
  335. let wtr = io::BufWriter::new(wtr);
  336. let mut wtr = csv::Writer::from_writer(wtr);
  337. if opt.hard_mode { return hard_mode(rdr, wtr, &logger) }
  338. wtr.write_record(&[
  339. "time",
  340. "ratio",
  341. "bmex",
  342. "gdax",
  343. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  344. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  345. let mut row = csv::StringRecord::new();
  346. // pull out first row to initialize query calculations
  347. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  348. let trade: Trade = row.deserialize(Some(&headers))
  349. .map_err(|e| {
  350. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  351. })?;
  352. let mut cur_hour = trade.time - trade.time % ONE_HOUR;
  353. let mut next_hour = cur_hour + ONE_HOUR;
  354. let mut bmex_total = if trade.exch == e!(bmex) { trade.price * trade.amount } else { 0.0 };
  355. let mut bmex_amt = if trade.exch == e!(bmex) { trade.amount } else { 0.0 };
  356. let mut n_bmex = 0;
  357. let mut gdax_total = if trade.exch == e!(gdax) { trade.price * trade.amount } else { 0.0 };
  358. let mut gdax_amt = if trade.exch == e!(gdax) { trade.amount } else { 0.0 };
  359. let mut n_gdax = 0;
  360. let mut n = 0;
  361. let mut n_written = 0;
  362. let mut last_time = 0;
  363. while rdr.read_record(&mut row)
  364. .map_err(|e| {
  365. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  366. })?
  367. {
  368. let trade: Trade = row.deserialize(Some(&headers))
  369. .map_err(|e| {
  370. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  371. })?;
  372. n += 1;
  373. // verify data is sorted by time
  374. assert!(trade.time >= last_time);
  375. last_time = trade.time;
  376. if trade.ticker != t!(btc-usd) { continue }
  377. if trade.time >= next_hour {
  378. // `trade` is past the last hour bucket, so finalize/write last
  379. // hour results, and reset state for this hour
  380. if n_bmex == 0 || n_gdax == 0 {
  381. wtr.write_record(&[
  382. &format!("{}", cur_hour),
  383. "NaN",
  384. "NaN",
  385. "NaN",
  386. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  387. } else {
  388. let bmex_wt_avg = bmex_total / bmex_amt;
  389. let gdax_wt_avg = gdax_total / gdax_amt;
  390. let ratio = bmex_wt_avg / gdax_wt_avg;
  391. wtr.write_record(&[
  392. &format!("{}", cur_hour),
  393. &format!("{}", ratio),
  394. &format!("{}", bmex_wt_avg),
  395. &format!("{}", gdax_wt_avg),
  396. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  397. }
  398. n_written += 1;
  399. // reset state
  400. bmex_total = 0.0;
  401. bmex_amt = 0.0;
  402. gdax_total = 0.0;
  403. gdax_amt = 0.0;
  404. n_bmex = 0;
  405. n_gdax = 0;
  406. cur_hour = next_hour;
  407. next_hour += ONE_HOUR;
  408. // if we are skipping hours in between the last and current row, we
  409. // need to write a NaN row for the hours that had no data
  410. while next_hour <= trade.time {
  411. wtr.write_record(&[
  412. &format!("{}", cur_hour),
  413. "NaN",
  414. "NaN",
  415. "NaN",
  416. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  417. n_written += 1;
  418. cur_hour = next_hour;
  419. next_hour += ONE_HOUR;
  420. }
  421. }
  422. match trade.exch {
  423. e!(bmex) => {
  424. bmex_total += trade.price * trade.amount;
  425. bmex_amt += trade.amount;
  426. n_bmex += 1;
  427. }
  428. e!(gdax) => {
  429. gdax_total += trade.price * trade.amount;
  430. gdax_amt += trade.amount;
  431. n_gdax += 1;
  432. }
  433. _ => {}
  434. }
  435. if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
  436. info!(logger, "parsing csv file";
  437. "n rows" => %n.thousands_sep(),
  438. "n written" => %n_written.thousands_sep(),
  439. "elapsed" => ?(Instant::now() - start),
  440. );
  441. }
  442. if cfg!(debug_assertions) && n > PROGRESS_EVERY {
  443. warn!(logger, "debug mode: exiting early";
  444. "n rows" => %n.thousands_sep(),
  445. "n written" => %n_written.thousands_sep(),
  446. "elapsed" => ?(Instant::now() - start),
  447. );
  448. break
  449. }
  450. }
  451. // intentionally skipping handling the partial hour here
  452. info!(logger, "finished parsing CSV/calculating query. closing output file");
  453. drop(wtr);
  454. Ok(n)
  455. }
  456. fn main() {
  457. let start = Instant::now();
  458. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  459. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  460. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  461. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  462. match run(start, &logger) {
  463. Ok(n) => {
  464. let took = Instant::now() - start;
  465. let took_secs = took.as_millis() as f64 / 1000.0;
  466. let took_str = format!("{}min, {:.1}sec", took.as_secs() / 60, took_secs % 60.0);
  467. info!(logger, "finished in {}", took_str;
  468. "n rows" => %n.thousands_sep(),
  469. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
  470. );
  471. }
  472. Err(e) => {
  473. crit!(logger, "run failed: {:?}", e);
  474. eprintln!("\n\nError: {}", e);
  475. std::thread::sleep(Duration::from_millis(100));
  476. std::process::exit(1);
  477. }
  478. }
  479. }