You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

563 lines
19KB

  1. #![allow(unused_imports)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::path::PathBuf;
  7. use std::time::*;
  8. use std::io::{self, prelude::*};
  9. use std::fs;
  10. use std::f64::NAN;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use pretty_toa::ThousandsSep;
  15. use markets::crypto::{Exchange, Ticker, Side};
  16. use pipelines::windows::WeightedMeanWindow;
  17. // equivalent to panic! but without the ugly 'thread main panicked' yada yada
  18. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  19. eprintln!($fmt, $($args)*);
  20. std::process::exit(1);
  21. }}}
  22. const PROGRESS_EVERY: usize = 1024 * 1024;
  23. const ONE_SECOND: u64 = 1_000_000_000;
  24. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  25. #[derive(Debug, StructOpt)]
  26. struct Opt {
  27. /// Path to CSV file with trades data
  28. #[structopt(short = "f", long = "trades-csv")]
  29. #[structopt(parse(from_os_str))]
  30. trades_csv: PathBuf,
  31. /// Where to save the query results (CSV output)
  32. #[structopt(short = "o", long = "output-path")]
  33. #[structopt(parse(from_os_str))]
  34. output_path: PathBuf,
  35. #[structopt(short = "z", long = "hard-mode")]
  36. hard_mode: bool,
  37. }
  38. #[derive(Deserialize)]
  39. struct Trade {
  40. /// Time of trade in unix nanoseconds
  41. pub time: u64,
  42. /// Exchange where trade executed
  43. pub exch: Exchange,
  44. /// Currency rate of trade (base/quote)
  45. pub ticker: Ticker,
  46. /// Price of trade, in quote denomination
  47. pub price: f64,
  48. /// Size/Volume of trade, in base denomination
  49. pub amount: f64,
  50. }
  51. fn per_sec(n: usize, span: Duration) -> f64 {
  52. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  53. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  54. n as f64 / s
  55. }
  56. #[allow(dead_code)]
  57. #[inline(always)]
  58. fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> {
  59. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?)
  60. .ok_or("parsing time failed")?;
  61. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  62. .map_err(|_| "parsing amount failed")?;
  63. let exch = match row.get(2).ok_or("no exch")? {
  64. b"bmex" => e!(bmex),
  65. b"bnce" => e!(bnce),
  66. b"btfx" => e!(btfx),
  67. b"gdax" => e!(gdax),
  68. b"okex" => e!(okex),
  69. b"bits" => e!(bits),
  70. b"plnx" => e!(plnx),
  71. b"krkn" => e!(krkn),
  72. _ => return Err("illegal exch"),
  73. };
  74. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  75. .map_err(|_| "parsing price failed")?;
  76. let ticker = match row.get(6).ok_or("no ticker")? {
  77. b"btc_usd" => t!(btc-usd),
  78. b"eth_usd" => t!(eth-usd),
  79. b"ltc_usd" => t!(ltc-usd),
  80. b"etc_usd" => t!(etc-usd),
  81. b"bch_usd" => t!(bch-usd),
  82. b"xmr_usd" => t!(xmr-usd),
  83. b"usdt_usd" => t!(usdt-usd),
  84. _ => return Err("illegal ticker"),
  85. };
  86. Ok(Trade { time, amount, exch, price, ticker })
  87. }
  88. #[allow(dead_code)]
  89. #[inline(always)]
  90. fn manual_deserialize_str(row: &csv::StringRecord) -> Result<Trade, &'static str> {
  91. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes())
  92. .ok_or("parsing time failed")?;
  93. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  94. .map_err(|_| "parsing amount failed")?;
  95. let exch = match row.get(2).ok_or("no exch")? {
  96. "bmex" => e!(bmex),
  97. "bnce" => e!(bnce),
  98. "btfx" => e!(btfx),
  99. "gdax" => e!(gdax),
  100. "okex" => e!(okex),
  101. "bits" => e!(bits),
  102. "plnx" => e!(plnx),
  103. "krkn" => e!(krkn),
  104. _ => return Err("illegal exch"),
  105. };
  106. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  107. .map_err(|_| "parsing price failed")?;
  108. let ticker = match row.get(6).ok_or("no ticker")? {
  109. "btc_usd" => t!(btc-usd),
  110. "eth_usd" => t!(eth-usd),
  111. "ltc_usd" => t!(ltc-usd),
  112. "etc_usd" => t!(etc-usd),
  113. "bch_usd" => t!(bch-usd),
  114. "xmr_usd" => t!(xmr-usd),
  115. "usdt_usd" => t!(usdt-usd),
  116. _ => return Err("illegal ticker"),
  117. };
  118. Ok(Trade { time, amount, exch, price, ticker })
  119. }
  120. /// Example of code used in discussion of increasing CSV parsing performance
  121. #[allow(dead_code)]
  122. fn fast_parse_bytes<R: Read>(mut rdr: csv::Reader<R>) -> Result<usize, String> {
  123. // our data is ascii, so parsing with the slightly faster ByteRecord is fine
  124. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  125. let mut row = csv::ByteRecord::new();
  126. // manual_deserialize_bytes assumes the column order of the CSV,
  127. // so here we verify that it actually matches that assumption
  128. assert_eq!(headers.get(0), Some(&b"time"[..]));
  129. assert_eq!(headers.get(1), Some(&b"amount"[..]));
  130. assert_eq!(headers.get(2), Some(&b"exch"[..]));
  131. assert_eq!(headers.get(3), Some(&b"price"[..]));
  132. assert_eq!(headers.get(6), Some(&b"ticker"[..]));
  133. let mut n = 0;
  134. let mut last_time = 0;
  135. while rdr.read_byte_record(&mut row)
  136. .map_err(|e| {
  137. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  138. })?
  139. {
  140. let trade: Trade = manual_deserialize_bytes(&row)
  141. .map_err(|e| {
  142. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  143. })?;
  144. assert!(trade.time >= last_time);
  145. last_time = trade.time;
  146. n += 1;
  147. }
  148. Ok(n)
  149. }
  150. fn hard_mode<R, W>(
  151. mut rdr: csv::Reader<R>,
  152. mut wtr: csv::Writer<W>,
  153. logger: &slog::Logger,
  154. ) -> Result<usize, String>
  155. where R: Read,
  156. W: Write
  157. {
  158. let logger = logger.new(o!("hard-mode" => "challenge accepted"));
  159. info!(logger, "beginning hard mode");
  160. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  161. let mut row = csv::StringRecord::new();
  162. // pull out first row to initialize query calculations
  163. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  164. let trade: Trade = row.deserialize(Some(&headers))
  165. .map_err(|e| {
  166. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  167. })?;
  168. let mut cur_bucket = trade.time - (trade.time % (ONE_SECOND * 10)) + ONE_SECOND * 10;
  169. // let mut next_bucket = cur_bucket + ONE_SECOND * 10;
  170. // let mut last_price: f64 = NAN;
  171. #[derive(Default, Clone)]
  172. struct Lookbacks<T> {
  173. pub p5: T,
  174. pub p15: T,
  175. pub p60: T,
  176. }
  177. let mut ratios: Lookbacks<f64> = Default::default();
  178. let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
  179. Lookbacks {
  180. p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
  181. p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
  182. p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
  183. };
  184. let mut gdax_windows = bmex_windows.clone();
  185. //let mut row_buffers: [Vec<u8>; 4] = [Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32)];
  186. let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
  187. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  188. ($trade:ident) => {{
  189. match $trade.exch {
  190. e!(bmex) => {
  191. bmex_windows.p5 .push($trade.time, $trade.price, $trade.amount);
  192. bmex_windows.p15.push($trade.time, $trade.price, $trade.amount);
  193. bmex_windows.p60.push($trade.time, $trade.price, $trade.amount);
  194. // last_price = $trade.price;
  195. }
  196. e!(gdax) => {
  197. gdax_windows.p5 .push($trade.time, $trade.price, $trade.amount);
  198. gdax_windows.p15.push($trade.time, $trade.price, $trade.amount);
  199. gdax_windows.p60.push($trade.time, $trade.price, $trade.amount);
  200. // last_price = $trade.price;
  201. }
  202. _ => {}
  203. }
  204. }}
  205. }
  206. wtr.write_record(&[
  207. "time",
  208. "r5",
  209. "r15",
  210. "r60",
  211. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  212. let mut wtr = wtr.into_inner().map_err(|e| format!("csv::Writer::into_inner failed: {}", e))?;
  213. if trade.ticker == t!(btc-usd) { update!(trade); }
  214. let mut n = 0;
  215. let mut n_written = 0;
  216. while rdr.read_record(&mut row)
  217. .map_err(|e| {
  218. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  219. })?
  220. {
  221. n += 1;
  222. let trade: Trade = row.deserialize(Some(&headers))
  223. .map_err(|e| {
  224. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  225. })?;
  226. if trade.time > cur_bucket {
  227. debug!(logger, "about to purge";
  228. "n" => n,
  229. "n written" => n_written,
  230. "trade.time" => trade.time,
  231. "cur_bucket" => cur_bucket,
  232. "gdax p5 len" => gdax_windows.p5.len(),
  233. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  234. );
  235. bmex_windows.p5 .purge(trade.time);
  236. bmex_windows.p15.purge(trade.time);
  237. bmex_windows.p60.purge(trade.time);
  238. gdax_windows.p5 .purge(trade.time);
  239. gdax_windows.p15.purge(trade.time);
  240. gdax_windows.p60.purge(trade.time);
  241. debug!(logger, "finished purge";
  242. "n" => n,
  243. "n written" => n_written,
  244. "trade.time" => trade.time,
  245. "cur_bucket" => cur_bucket,
  246. "gdax p5 len" => gdax_windows.p5.len(),
  247. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  248. );
  249. ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
  250. ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
  251. ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
  252. //row_buffers.iter_mut().for_each(|x| x.clear());
  253. row_buffer.clear();
  254. itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  255. row_buffer.push(b',');
  256. dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  257. row_buffer.push(b',');
  258. dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  259. row_buffer.push(b',');
  260. dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  261. row_buffer.push(b'\n');
  262. wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
  263. /*
  264. wtr.write_record(&row_buffers[..]).map_err(|e| {
  265. format!("writing csv row failed: {}", e)
  266. })?;
  267. */
  268. /*
  269. wtr.write_record(&[
  270. &format!("{}", cur_bucket),
  271. &format!("{}", ratios.p5),
  272. &format!("{}", ratios.p15),
  273. &format!("{}", ratios.p60),
  274. ]).map_err(|e| {
  275. format!("writing csv row failed: {}", e)
  276. })?;
  277. */
  278. n_written += 1;
  279. cur_bucket += ONE_SECOND * 10;
  280. }
  281. if trade.ticker == t!(btc-usd) { update!(trade); }
  282. if n % PROGRESS_EVERY == 0 {
  283. info!(logger, "calculating hard query";
  284. "n" => %n.thousands_sep(),
  285. "n_written" => %n_written.thousands_sep(),
  286. "ratios.p5" => ratios.p5,
  287. "ratios.p15" => ratios.p15,
  288. "ratios.p60" => ratios.p60,
  289. );
  290. }
  291. }
  292. info!(logger, "finished with hard query");
  293. Ok(n)
  294. }
  295. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  296. let opt = Opt::from_args();
  297. info!(logger, "initializing...";
  298. "trades-csv" => %opt.trades_csv.display(),
  299. "output-path" => %opt.output_path.display()
  300. );
  301. if ! opt.trades_csv.exists() {
  302. error!(logger, "path does not exist: {}", opt.trades_csv.display());
  303. fatal!("Error: path does not exist: {}", opt.trades_csv.display());
  304. }
  305. debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());
  306. let rdr = fs::File::open(&opt.trades_csv)
  307. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?;
  308. let rdr = io::BufReader::new(rdr);
  309. let mut rdr = csv::Reader::from_reader(rdr);
  310. // initializing --output-path CSV
  311. let wtr = fs::File::create(&opt.output_path)
  312. .map_err(|e| format!("creating output csv file failed: {} (tried to create {})", e, opt.output_path.display()))?;
  313. let wtr = io::BufWriter::new(wtr);
  314. let mut wtr = csv::Writer::from_writer(wtr);
  315. if opt.hard_mode { return hard_mode(rdr, wtr, &logger) }
  316. wtr.write_record(&[
  317. "time",
  318. "ratio",
  319. "bmex",
  320. "gdax",
  321. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  322. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  323. let mut row = csv::StringRecord::new();
  324. // pull out first row to initialize query calculations
  325. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  326. let trade: Trade = row.deserialize(Some(&headers))
  327. .map_err(|e| {
  328. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  329. })?;
  330. let mut cur_hour = trade.time - trade.time % ONE_HOUR;
  331. let mut next_hour = cur_hour + ONE_HOUR;
  332. let mut bmex_total = if trade.exch == e!(bmex) { trade.price * trade.amount } else { 0.0 };
  333. let mut bmex_amt = if trade.exch == e!(bmex) { trade.amount } else { 0.0 };
  334. let mut n_bmex = 0;
  335. let mut gdax_total = if trade.exch == e!(gdax) { trade.price * trade.amount } else { 0.0 };
  336. let mut gdax_amt = if trade.exch == e!(gdax) { trade.amount } else { 0.0 };
  337. let mut n_gdax = 0;
  338. let mut n = 0;
  339. let mut n_written = 0;
  340. let mut last_time = 0;
  341. while rdr.read_record(&mut row)
  342. .map_err(|e| {
  343. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  344. })?
  345. {
  346. let trade: Trade = row.deserialize(Some(&headers))
  347. .map_err(|e| {
  348. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  349. })?;
  350. n += 1;
  351. // verify data is sorted by time
  352. assert!(trade.time >= last_time);
  353. last_time = trade.time;
  354. if trade.ticker != t!(btc-usd) { continue }
  355. if trade.time >= next_hour {
  356. // `trade` is past the last hour bucket, so finalize/write last
  357. // hour results, and reset state for this hour
  358. if n_bmex == 0 || n_gdax == 0 {
  359. wtr.write_record(&[
  360. &format!("{}", cur_hour),
  361. "NaN",
  362. "NaN",
  363. "NaN",
  364. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  365. } else {
  366. let bmex_wt_avg = bmex_total / bmex_amt;
  367. let gdax_wt_avg = gdax_total / gdax_amt;
  368. let ratio = bmex_wt_avg / gdax_wt_avg;
  369. wtr.write_record(&[
  370. &format!("{}", cur_hour),
  371. &format!("{}", ratio),
  372. &format!("{}", bmex_wt_avg),
  373. &format!("{}", gdax_wt_avg),
  374. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  375. }
  376. n_written += 1;
  377. // reset state
  378. bmex_total = 0.0;
  379. bmex_amt = 0.0;
  380. gdax_total = 0.0;
  381. gdax_amt = 0.0;
  382. n_bmex = 0;
  383. n_gdax = 0;
  384. cur_hour = next_hour;
  385. next_hour += ONE_HOUR;
  386. // if we are skipping hours in between the last and current row, we
  387. // need to write a NaN row for the hours that had no data
  388. while next_hour <= trade.time {
  389. wtr.write_record(&[
  390. &format!("{}", cur_hour),
  391. "NaN",
  392. "NaN",
  393. "NaN",
  394. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  395. n_written += 1;
  396. cur_hour = next_hour;
  397. next_hour += ONE_HOUR;
  398. }
  399. }
  400. match trade.exch {
  401. e!(bmex) => {
  402. bmex_total += trade.price * trade.amount;
  403. bmex_amt += trade.amount;
  404. n_bmex += 1;
  405. }
  406. e!(gdax) => {
  407. gdax_total += trade.price * trade.amount;
  408. gdax_amt += trade.amount;
  409. n_gdax += 1;
  410. }
  411. _ => {}
  412. }
  413. if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
  414. info!(logger, "parsing csv file";
  415. "n rows" => %n.thousands_sep(),
  416. "n written" => %n_written.thousands_sep(),
  417. "elapsed" => ?(Instant::now() - start),
  418. );
  419. }
  420. if cfg!(debug_assertions) && n > PROGRESS_EVERY {
  421. warn!(logger, "debug mode: exiting early";
  422. "n rows" => %n.thousands_sep(),
  423. "n written" => %n_written.thousands_sep(),
  424. "elapsed" => ?(Instant::now() - start),
  425. );
  426. break
  427. }
  428. }
  429. // intentionally skipping handling the partial hour here
  430. info!(logger, "finished parsing CSV/calculating query. closing output file");
  431. drop(wtr);
  432. Ok(n)
  433. }
  434. fn main() {
  435. let start = Instant::now();
  436. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  437. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  438. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  439. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  440. match run(start, &logger) {
  441. Ok(n) => {
  442. let took = Instant::now() - start;
  443. let took_secs = took.as_millis() as f64 / 1000.0;
  444. let took_str = format!("{}min, {:.1}sec", took.as_secs() / 60, took_secs % 60.0);
  445. info!(logger, "finished in {}", took_str;
  446. "n rows" => %n.thousands_sep(),
  447. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
  448. );
  449. }
  450. Err(e) => {
  451. crit!(logger, "run failed: {:?}", e);
  452. eprintln!("\n\nError: {}", e);
  453. std::thread::sleep(Duration::from_millis(100));
  454. std::process::exit(1);
  455. }
  456. }
  457. }