You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

586 lines
19KB

  1. #![allow(unused_imports)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::path::PathBuf;
  7. use std::time::*;
  8. use std::io::{self, prelude::*};
  9. use std::fs;
  10. use std::f64::NAN;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use pretty_toa::ThousandsSep;
  15. use markets::crypto::{Exchange, Ticker, Side};
  16. use pipelines::windows::WeightedMeanWindow;
  17. // equivalent to panic! but without the ugly 'thread main panicked' yada yada
  18. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  19. eprintln!($fmt, $($args)*);
  20. std::process::exit(1);
  21. }}}
  22. const PROGRESS_EVERY: usize = 1024 * 1024 * 2;
  23. const ONE_SECOND: u64 = 1_000_000_000;
  24. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  25. #[derive(Debug, StructOpt)]
  26. struct Opt {
  27. /// Path to CSV file with trades data
  28. #[structopt(short = "f", long = "trades-csv")]
  29. #[structopt(parse(from_os_str))]
  30. trades_csv: PathBuf,
  31. /// Where to save the query results (CSV output)
  32. #[structopt(short = "o", long = "output-path")]
  33. #[structopt(parse(from_os_str))]
  34. output_path: PathBuf,
  35. #[structopt(short = "z", long = "hard-mode")]
  36. hard_mode: bool,
  37. }
  38. #[derive(Deserialize, Debug)]
  39. struct Trade {
  40. /// Time of trade in unix nanoseconds
  41. pub time: u64,
  42. /// Exchange where trade executed
  43. pub exch: Exchange,
  44. /// Currency rate of trade (base/quote)
  45. pub ticker: Ticker,
  46. /// Price of trade, in quote denomination
  47. pub price: f64,
  48. /// Size/Volume of trade, in base denomination
  49. pub amount: f64,
  50. pub server_time: u64,
  51. }
  52. fn per_sec(n: usize, span: Duration) -> f64 {
  53. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  54. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  55. n as f64 / s
  56. }
  57. #[allow(dead_code)]
  58. #[inline(always)]
  59. fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> {
  60. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?)
  61. .ok_or("parsing time failed")?;
  62. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  63. .map_err(|_| "parsing amount failed")?;
  64. let exch = match row.get(2).ok_or("no exch")? {
  65. b"bmex" => e!(bmex),
  66. b"bnce" => e!(bnce),
  67. b"btfx" => e!(btfx),
  68. b"gdax" => e!(gdax),
  69. b"okex" => e!(okex),
  70. b"bits" => e!(bits),
  71. b"plnx" => e!(plnx),
  72. b"krkn" => e!(krkn),
  73. _ => return Err("illegal exch"),
  74. };
  75. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  76. .map_err(|_| "parsing price failed")?;
  77. let ticker = match row.get(6).ok_or("no ticker")? {
  78. b"btc_usd" => t!(btc-usd),
  79. b"eth_usd" => t!(eth-usd),
  80. b"ltc_usd" => t!(ltc-usd),
  81. b"etc_usd" => t!(etc-usd),
  82. b"bch_usd" => t!(bch-usd),
  83. b"xmr_usd" => t!(xmr-usd),
  84. b"usdt_usd" => t!(usdt-usd),
  85. _ => return Err("illegal ticker"),
  86. };
  87. Ok(Trade { time, amount, exch, price, ticker,
  88. server_time: 0,
  89. })
  90. }
  91. #[allow(dead_code)]
  92. #[inline(always)]
  93. fn manual_deserialize_str(row: &csv::StringRecord) -> Result<Trade, &'static str> {
  94. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes())
  95. .ok_or("parsing time failed")?;
  96. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  97. .map_err(|_| "parsing amount failed")?;
  98. let exch = match row.get(2).ok_or("no exch")? {
  99. "bmex" => e!(bmex),
  100. "bnce" => e!(bnce),
  101. "btfx" => e!(btfx),
  102. "gdax" => e!(gdax),
  103. "okex" => e!(okex),
  104. "bits" => e!(bits),
  105. "plnx" => e!(plnx),
  106. "krkn" => e!(krkn),
  107. _ => return Err("illegal exch"),
  108. };
  109. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  110. .map_err(|_| "parsing price failed")?;
  111. let ticker = match row.get(6).ok_or("no ticker")? {
  112. "btc_usd" => t!(btc-usd),
  113. "eth_usd" => t!(eth-usd),
  114. "ltc_usd" => t!(ltc-usd),
  115. "etc_usd" => t!(etc-usd),
  116. "bch_usd" => t!(bch-usd),
  117. "xmr_usd" => t!(xmr-usd),
  118. "usdt_usd" => t!(usdt-usd),
  119. _ => return Err("illegal ticker"),
  120. };
  121. Ok(Trade { time, amount, exch, price, ticker,
  122. server_time: 0,
  123. })
  124. }
  125. /// Example of code used in discussion of increasing CSV parsing performance
  126. #[allow(dead_code)]
  127. fn fast_parse_bytes<R: Read>(mut rdr: csv::Reader<R>) -> Result<usize, String> {
  128. // our data is ascii, so parsing with the slightly faster ByteRecord is fine
  129. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  130. let mut row = csv::ByteRecord::new();
  131. // manual_deserialize_bytes assumes the column order of the CSV,
  132. // so here we verify that it actually matches that assumption
  133. assert_eq!(headers.get(0), Some(&b"time"[..]));
  134. assert_eq!(headers.get(1), Some(&b"amount"[..]));
  135. assert_eq!(headers.get(2), Some(&b"exch"[..]));
  136. assert_eq!(headers.get(3), Some(&b"price"[..]));
  137. assert_eq!(headers.get(6), Some(&b"ticker"[..]));
  138. let mut n = 0;
  139. let mut last_time = 0;
  140. while rdr.read_byte_record(&mut row)
  141. .map_err(|e| {
  142. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  143. })?
  144. {
  145. let trade: Trade = manual_deserialize_bytes(&row)
  146. .map_err(|e| {
  147. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  148. })?;
  149. assert!(trade.time >= last_time);
  150. last_time = trade.time;
  151. n += 1;
  152. }
  153. Ok(n)
  154. }
  155. fn hard_mode<R, W>(
  156. mut rdr: csv::Reader<R>,
  157. mut wtr: csv::Writer<W>,
  158. logger: &slog::Logger,
  159. ) -> Result<usize, String>
  160. where R: Read,
  161. W: Write
  162. {
  163. let logger = logger.new(o!("hard-mode" => "challenge accepted"));
  164. info!(logger, "beginning hard mode");
  165. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  166. let mut row = csv::StringRecord::new();
  167. // pull out first row to initialize query calculations
  168. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  169. let trade: Trade = row.deserialize(Some(&headers))
  170. .map_err(|e| {
  171. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  172. })?;
  173. let mut cur_bucket = trade.time - (trade.time % (ONE_SECOND * 10)) + ONE_SECOND * 10;
  174. // let mut next_bucket = cur_bucket + ONE_SECOND * 10;
  175. // let mut last_price: f64 = NAN;
  176. #[derive(Default, Clone)]
  177. struct Lookbacks<T> {
  178. pub p5: T,
  179. pub p15: T,
  180. pub p60: T,
  181. }
  182. let mut ratios: Lookbacks<f64> = Default::default();
  183. let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
  184. Lookbacks {
  185. p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
  186. p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
  187. p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
  188. };
  189. let mut gdax_windows = bmex_windows.clone();
  190. //let mut row_buffers: [Vec<u8>; 4] = [Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32)];
  191. let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
  192. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  193. ($trade:ident) => {{
  194. match $trade.exch {
  195. e!(bmex) => {
  196. bmex_windows.p5 .push($trade.time, $trade.price, $trade.amount);
  197. bmex_windows.p15.push($trade.time, $trade.price, $trade.amount);
  198. bmex_windows.p60.push($trade.time, $trade.price, $trade.amount);
  199. // last_price = $trade.price;
  200. }
  201. e!(gdax) => {
  202. gdax_windows.p5 .push($trade.time, $trade.price, $trade.amount);
  203. gdax_windows.p15.push($trade.time, $trade.price, $trade.amount);
  204. gdax_windows.p60.push($trade.time, $trade.price, $trade.amount);
  205. // last_price = $trade.price;
  206. }
  207. _ => {}
  208. }
  209. }}
  210. }
  211. wtr.write_record(&[
  212. "time",
  213. "r5",
  214. "r15",
  215. "r60",
  216. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  217. let mut wtr = wtr.into_inner().map_err(|e| format!("csv::Writer::into_inner failed: {}", e))?;
  218. if trade.ticker == t!(btc-usd) { update!(trade); }
  219. let mut n = 0;
  220. let mut n_written = 0;
  221. while rdr.read_record(&mut row)
  222. .map_err(|e| {
  223. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  224. })?
  225. {
  226. n += 1;
  227. let trade: Trade = row.deserialize(Some(&headers))
  228. .map_err(|e| {
  229. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  230. })?;
  231. if trade.time > cur_bucket {
  232. debug!(logger, "about to purge";
  233. "n" => n,
  234. "n written" => n_written,
  235. "trade.time" => trade.time,
  236. "cur_bucket" => cur_bucket,
  237. "gdax p5 len" => gdax_windows.p5.len(),
  238. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  239. );
  240. bmex_windows.p5 .purge(trade.time);
  241. bmex_windows.p15.purge(trade.time);
  242. bmex_windows.p60.purge(trade.time);
  243. gdax_windows.p5 .purge(trade.time);
  244. gdax_windows.p15.purge(trade.time);
  245. gdax_windows.p60.purge(trade.time);
  246. debug!(logger, "finished purge";
  247. "n" => n,
  248. "n written" => n_written,
  249. "trade.time" => trade.time,
  250. "cur_bucket" => cur_bucket,
  251. "gdax p5 len" => gdax_windows.p5.len(),
  252. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  253. );
  254. ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
  255. ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
  256. ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
  257. //row_buffers.iter_mut().for_each(|x| x.clear());
  258. row_buffer.clear();
  259. itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  260. row_buffer.push(b',');
  261. dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  262. row_buffer.push(b',');
  263. dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  264. row_buffer.push(b',');
  265. dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  266. row_buffer.push(b'\n');
  267. wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
  268. /*
  269. wtr.write_record(&row_buffers[..]).map_err(|e| {
  270. format!("writing csv row failed: {}", e)
  271. })?;
  272. */
  273. /*
  274. wtr.write_record(&[
  275. &format!("{}", cur_bucket),
  276. &format!("{}", ratios.p5),
  277. &format!("{}", ratios.p15),
  278. &format!("{}", ratios.p60),
  279. ]).map_err(|e| {
  280. format!("writing csv row failed: {}", e)
  281. })?;
  282. */
  283. n_written += 1;
  284. cur_bucket += ONE_SECOND * 10;
  285. }
  286. if trade.ticker == t!(btc-usd) { update!(trade); }
  287. if n % PROGRESS_EVERY == 0 {
  288. info!(logger, "calculating hard query";
  289. "n" => %n.thousands_sep(),
  290. "n_written" => %n_written.thousands_sep(),
  291. "ratios.p5" => ratios.p5,
  292. "ratios.p15" => ratios.p15,
  293. "ratios.p60" => ratios.p60,
  294. );
  295. }
  296. }
  297. info!(logger, "finished with hard query");
  298. Ok(n)
  299. }
  300. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  301. let opt = Opt::from_args();
  302. info!(logger, "initializing...";
  303. "trades-csv" => %opt.trades_csv.display(),
  304. "output-path" => %opt.output_path.display()
  305. );
  306. if ! opt.trades_csv.exists() {
  307. error!(logger, "path does not exist: {}", opt.trades_csv.display());
  308. fatal!("Error: path does not exist: {}", opt.trades_csv.display());
  309. }
  310. debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());
  311. let rdr = fs::File::open(&opt.trades_csv)
  312. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?;
  313. let rdr = io::BufReader::new(rdr);
  314. let mut rdr = csv::Reader::from_reader(rdr);
  315. // initializing --output-path CSV
  316. let wtr = fs::File::create(&opt.output_path)
  317. .map_err(|e| format!("creating output csv file failed: {} (tried to create {})", e, opt.output_path.display()))?;
  318. let wtr = io::BufWriter::new(wtr);
  319. let mut wtr = csv::Writer::from_writer(wtr);
  320. if opt.hard_mode { return hard_mode(rdr, wtr, &logger) }
  321. wtr.write_record(&[
  322. "time",
  323. "ratio",
  324. "bmex",
  325. "gdax",
  326. ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  327. //let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  328. //let mut row = csv::StringRecord::new();
  329. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  330. let mut row = csv::ByteRecord::new();
  331. // pull out first row to initialize query calculations
  332. //rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  333. rdr.read_byte_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  334. let trade: Trade = row.deserialize(Some(&headers))
  335. .map_err(|e| {
  336. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  337. })?;
  338. let mut cur_hour = trade.time - trade.time % ONE_HOUR;
  339. let mut next_hour = cur_hour + ONE_HOUR;
  340. let mut bmex_total = if trade.exch == e!(bmex) { trade.price * trade.amount } else { 0.0 };
  341. let mut bmex_amt = if trade.exch == e!(bmex) { trade.amount } else { 0.0 };
  342. let mut n_bmex = 0;
  343. let mut gdax_total = if trade.exch == e!(gdax) { trade.price * trade.amount } else { 0.0 };
  344. let mut gdax_amt = if trade.exch == e!(gdax) { trade.amount } else { 0.0 };
  345. let mut n_gdax = 0;
  346. let mut n = 0;
  347. let mut n_written = 0;
  348. let mut last_time = 0;
  349. // while rdr.read_record(&mut row)
  350. // .map_err(|e| {
  351. // format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  352. // })?
  353. // {
  354. while rdr.read_byte_record(&mut row)
  355. .map_err(|e| {
  356. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  357. })?
  358. {
  359. let trade: Trade = row.deserialize(Some(&headers))
  360. .map_err(|e| {
  361. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  362. })?;
  363. n += 1;
  364. if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
  365. info!(logger, "parsing csv file";
  366. "n rows" => %n.thousands_sep(),
  367. "n written" => %n_written.thousands_sep(),
  368. "elapsed" => ?(Instant::now() - start),
  369. );
  370. }
  371. if trade.server_time != 0 {
  372. let diff: i64 = (trade.server_time as i64 - trade.time as i64) / 1000 / 1000;
  373. assert!(diff >= std::i32::MIN as i64, "diff = {}, trade = {:?}", diff, trade);
  374. assert!(diff <= std::i32::MAX as i64, "diff = {}, trade = {:?}", diff, trade);
  375. }
  376. // verify data is sorted by time
  377. assert!(trade.time >= last_time);
  378. last_time = trade.time;
  379. if trade.ticker != t!(btc-usd) { continue }
  380. if trade.time >= next_hour {
  381. // `trade` is past the last hour bucket, so finalize/write last
  382. // hour results, and reset state for this hour
  383. if n_bmex == 0 || n_gdax == 0 {
  384. wtr.write_record(&[
  385. &format!("{}", cur_hour),
  386. "NaN",
  387. "NaN",
  388. "NaN",
  389. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  390. } else {
  391. let bmex_wt_avg = bmex_total / bmex_amt;
  392. let gdax_wt_avg = gdax_total / gdax_amt;
  393. let ratio = bmex_wt_avg / gdax_wt_avg;
  394. wtr.write_record(&[
  395. &format!("{}", cur_hour),
  396. &format!("{}", ratio),
  397. &format!("{}", bmex_wt_avg),
  398. &format!("{}", gdax_wt_avg),
  399. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  400. }
  401. n_written += 1;
  402. // reset state
  403. bmex_total = 0.0;
  404. bmex_amt = 0.0;
  405. gdax_total = 0.0;
  406. gdax_amt = 0.0;
  407. n_bmex = 0;
  408. n_gdax = 0;
  409. cur_hour = next_hour;
  410. next_hour += ONE_HOUR;
  411. // if we are skipping hours in between the last and current row, we
  412. // need to write a NaN row for the hours that had no data
  413. while next_hour <= trade.time {
  414. wtr.write_record(&[
  415. &format!("{}", cur_hour),
  416. "NaN",
  417. "NaN",
  418. "NaN",
  419. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  420. n_written += 1;
  421. cur_hour = next_hour;
  422. next_hour += ONE_HOUR;
  423. }
  424. }
  425. match trade.exch {
  426. e!(bmex) => {
  427. bmex_total += trade.price * trade.amount;
  428. bmex_amt += trade.amount;
  429. n_bmex += 1;
  430. }
  431. e!(gdax) => {
  432. gdax_total += trade.price * trade.amount;
  433. gdax_amt += trade.amount;
  434. n_gdax += 1;
  435. }
  436. _ => {}
  437. }
  438. if cfg!(debug_assertions) && n > PROGRESS_EVERY {
  439. warn!(logger, "debug mode: exiting early";
  440. "n rows" => %n.thousands_sep(),
  441. "n written" => %n_written.thousands_sep(),
  442. "elapsed" => ?(Instant::now() - start),
  443. );
  444. break
  445. }
  446. }
  447. // intentionally skipping handling the partial hour here
  448. info!(logger, "finished parsing CSV/calculating query. closing output file");
  449. drop(wtr);
  450. Ok(n)
  451. }
  452. fn main() {
  453. let start = Instant::now();
  454. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  455. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  456. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  457. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  458. match run(start, &logger) {
  459. Ok(n) => {
  460. let took = Instant::now() - start;
  461. let took_secs = took.as_millis() as f64 / 1000.0;
  462. let took_str = format!("{}min, {:.1}sec", took.as_secs() / 60, took_secs % 60.0);
  463. info!(logger, "finished in {}", took_str;
  464. "n rows" => %n.thousands_sep(),
  465. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
  466. );
  467. }
  468. Err(e) => {
  469. crit!(logger, "run failed: {:?}", e);
  470. eprintln!("\n\nError: {}", e);
  471. std::thread::sleep(Duration::from_millis(100));
  472. std::process::exit(1);
  473. }
  474. }
  475. }