You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

436 lines
14KB

  1. #![allow(unused)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::io::{self, prelude::*};
  7. use std::fs;
  8. use std::path::{Path, PathBuf};
  9. use std::time::*;
  10. use pretty_toa::ThousandsSep;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use chrono::{DateTime, Utc, NaiveDateTime};
  15. use markets::crypto::{Exchange, Ticker, Side, Currency};
  16. use pipelines::encoding;
  17. use pipelines::windows::WeightedMeanWindow;
  18. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  19. eprintln!($fmt, $($args)*);
  20. std::process::exit(1);
  21. }}}
  22. const PROGRESS_EVERY: usize = 1024 * 1024 * 16;
  23. const ONE_SECOND: u64 = 1_000_000_000;
  24. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  25. #[structopt(rename_all="kebab-case")]
  26. #[derive(Debug, StructOpt)]
  27. struct Opt {
  28. /// Path to file with binary trades data
  29. #[structopt(short = "f", long = "input-file")]
  30. #[structopt(parse(from_os_str))]
  31. input_path: PathBuf,
  32. /// Where to save the query results (CSV output)
  33. #[structopt(short = "o", long = "output-path")]
  34. #[structopt(parse(from_os_str))]
  35. output_path: PathBuf,
  36. #[structopt(short = "z", long = "hard-mode")]
  37. hard_mode: bool,
  38. }
  39. fn nanos_to_utc(nanos: u64) -> DateTime<Utc> {
  40. const ONE_SECOND: u64 = 1_000_000_000;
  41. let sec: i64 = (nanos / ONE_SECOND) as i64;
  42. let nsec: u32 = (nanos % ONE_SECOND) as u32;
  43. let naive = NaiveDateTime::from_timestamp(sec, nsec);
  44. DateTime::from_utc(naive, Utc)
  45. }
  46. fn per_sec(n: usize, span: Duration) -> f64 {
  47. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  48. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  49. n as f64 / s
  50. }
  51. fn nanos(utc: DateTime<Utc>) -> u64 {
  52. (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64)
  53. }
  54. fn easy_query<W>(
  55. data: &memmap::Mmap,
  56. mut wtr: W,
  57. logger: &slog::Logger,
  58. ) -> Result<usize, String>
  59. where W: Write
  60. {
  61. let logger = logger.new(o!("easy-mode" => "whatever, man"));
  62. info!(logger, "beginning easy mode");
  63. let n_records = data.len() / encoding::SERIALIZED_SIZE;
  64. let mut n = 0;
  65. let mut n_written = 0;
  66. let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);
  67. let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
  68. writeln!(&mut wtr, "time,ratio,bmex,gdax")
  69. .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  70. assert!(n_records > 0);
  71. let first = encoding::PackedTradeData::new(records.next().unwrap());
  72. n += 1;
  73. let mut cur_hour = first.time() - first.time() % ONE_HOUR;
  74. let mut next_hour = cur_hour + ONE_HOUR;
  75. let mut bmex_total = 0.0;
  76. let mut bmex_amount = 0.0;
  77. let mut n_bmex = 0;
  78. let mut gdax_total = 0.0;
  79. let mut gdax_amount = 0.0;
  80. let mut n_gdax = 0;
  81. const MASK : i32 = i32::from_le_bytes([ 255, 255, 255, 0 ]);
  82. const BMEX_BTC_USD : i32 = i32::from_le_bytes([ e!(bmex) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);
  83. const GDAX_BTC_USD : i32 = i32::from_le_bytes([ e!(gdax) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);
  84. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  85. ($trade:ident) => {{
  86. let meta_sans_side: i32 = $trade.meta_i32() & MASK;
  87. let amount = $trade.amount();
  88. let total = $trade.price() * amount;
  89. if meta_sans_side == BMEX_BTC_USD {
  90. bmex_total += total;
  91. bmex_amount += amount;
  92. n_bmex += 1;
  93. } else if meta_sans_side == GDAX_BTC_USD {
  94. gdax_total += total;
  95. gdax_amount += amount;
  96. n_gdax += 1;
  97. }
  98. /*
  99. let is_bmex_btc_usd: f64 = (meta_sans_side == BMEX_BTC_USD) as u8 as f64;
  100. let is_gdax_btc_usd: f64 = (meta_sans_side == GDAX_BTC_USD) as u8 as f64;
  101. bmex_total += is_bmex_btc_usd * total;
  102. bmex_amount += is_bmex_btc_usd * amount;
  103. n_bmex += is_bmex_btc_usd as usize * 1;
  104. gdax_total += is_gdax_btc_usd * total;
  105. gdax_amount += is_gdax_btc_usd * amount;
  106. n_gdax += is_gdax_btc_usd as usize * 1;
  107. */
  108. //match ($trade.exch(), $trade.base(), $trade.quote()) {
  109. // (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
  110. // n_bmex += 1;
  111. // }
  112. // (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
  113. // gdax_total += $trade.price() * $trade.amount();
  114. // gdax_amount += $trade.amount();
  115. // n_gdax += 1;
  116. // }
  117. //
  118. // _ => {}
  119. //}
  120. }}
  121. }
  122. update!(first);
  123. for record in records {
  124. n += 1;
  125. let trade = encoding::PackedTradeData::new(record);
  126. if trade.time() > next_hour {
  127. row_buffer.clear();
  128. itoa::write(&mut row_buffer, cur_hour).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  129. if n_bmex == 0 || n_gdax == 0 {
  130. row_buffer.write(",NaN,NaN,NaN\n".as_bytes()).unwrap();
  131. } else {
  132. let bmex_wt_avg = bmex_total / bmex_amount;
  133. let gdax_wt_avg = gdax_total / gdax_amount;
  134. let ratio = bmex_wt_avg / gdax_wt_avg;
  135. row_buffer.push(b',');
  136. dtoa::write(&mut row_buffer, ratio).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  137. row_buffer.push(b',');
  138. dtoa::write(&mut row_buffer, bmex_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  139. row_buffer.push(b',');
  140. dtoa::write(&mut row_buffer, gdax_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  141. row_buffer.push(b'\n');
  142. }
  143. wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
  144. n_written += 1;
  145. bmex_total = 0.0;
  146. bmex_amount = 0.0;
  147. gdax_total = 0.0;
  148. gdax_amount = 0.0;
  149. n_bmex = 0;
  150. n_gdax = 0;
  151. cur_hour = next_hour;
  152. next_hour += ONE_HOUR;
  153. // if we are skipping hours in between the last and current row, we
  154. // need to write a NaN row for the hours that had no data
  155. while next_hour <= trade.time() {
  156. writeln!(&mut wtr, "{},NaN,NaN,NaN", cur_hour)
  157. .map_err(|e| format!("writing output row failed: {}", e))?;
  158. n_written += 1;
  159. cur_hour = next_hour;
  160. next_hour += ONE_HOUR;
  161. }
  162. }
  163. update!(trade);
  164. if n % PROGRESS_EVERY == 0 {
  165. info!(logger, "calculating query";
  166. "n" => %n.thousands_sep(),
  167. "n_written" => %n_written.thousands_sep(),
  168. );
  169. }
  170. }
  171. info!(logger, "finished with easy query");
  172. Ok(n)
  173. }
  174. fn hard_query<W>(
  175. data: &memmap::Mmap,
  176. mut wtr: W,
  177. logger: &slog::Logger,
  178. ) -> Result<usize, String>
  179. where W: Write
  180. {
  181. let logger = logger.new(o!("hard-mode" => "challenge accepted"));
  182. info!(logger, "beginning hard mode");
  183. let n_records = data.len() / encoding::SERIALIZED_SIZE;
  184. let mut n = 0;
  185. let mut n_written = 0;
  186. let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);
  187. // pull out first row to initialize query calculations
  188. assert!(n_records > 0);
  189. let first = encoding::PackedTradeData::new(records.next().unwrap());
  190. n += 1;
  191. let mut cur_bucket = first.time() - (first.time() % (ONE_SECOND * 10)) + ONE_SECOND * 10;
  192. #[derive(Default, Clone)]
  193. struct Lookbacks<T> {
  194. pub p5: T,
  195. pub p15: T,
  196. pub p60: T,
  197. }
  198. let mut ratios: Lookbacks<f64> = Default::default();
  199. let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
  200. Lookbacks {
  201. p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
  202. p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
  203. p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
  204. };
  205. let mut gdax_windows = bmex_windows.clone();
  206. let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
  207. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  208. ($trade:ident) => {{
  209. match ($trade.exch(), $trade.base(), $trade.quote()) {
  210. (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
  211. bmex_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
  212. bmex_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
  213. bmex_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
  214. }
  215. (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
  216. gdax_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
  217. gdax_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
  218. gdax_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
  219. }
  220. _ => {}
  221. }
  222. }}
  223. }
  224. writeln!(&mut wtr, "time,r5,r15,r60")
  225. .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  226. update!(first);
  227. for record in records {
  228. n += 1;
  229. let trade = encoding::PackedTradeData::new(record);
  230. if trade.time() > cur_bucket {
  231. debug!(logger, "about to purge";
  232. "n" => n,
  233. "n written" => n_written,
  234. "trade.time" => trade.time(),
  235. "cur_bucket" => cur_bucket,
  236. "gdax p5 len" => gdax_windows.p5.len(),
  237. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  238. );
  239. bmex_windows.p5 .purge(cur_bucket);
  240. bmex_windows.p15.purge(cur_bucket);
  241. bmex_windows.p60.purge(cur_bucket);
  242. gdax_windows.p5 .purge(cur_bucket);
  243. gdax_windows.p15.purge(cur_bucket);
  244. gdax_windows.p60.purge(cur_bucket);
  245. debug!(logger, "finished purge";
  246. "n" => n,
  247. "n written" => n_written,
  248. "trade.time" => trade.time(),
  249. "cur_bucket" => cur_bucket,
  250. "gdax p5 len" => gdax_windows.p5.len(),
  251. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  252. );
  253. ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
  254. ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
  255. ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
  256. //row_buffers.iter_mut().for_each(|x| x.clear());
  257. row_buffer.clear();
  258. itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  259. row_buffer.push(b',');
  260. dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  261. row_buffer.push(b',');
  262. dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  263. row_buffer.push(b',');
  264. dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  265. row_buffer.push(b'\n');
  266. wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
  267. n_written += 1;
  268. cur_bucket += ONE_SECOND * 10;
  269. }
  270. update!(trade);
  271. if n % PROGRESS_EVERY == 0 {
  272. info!(logger, "calculating hard query";
  273. "n" => %n.thousands_sep(),
  274. "n_written" => %n_written.thousands_sep(),
  275. "ratios.p5" => ratios.p5,
  276. "ratios.p15" => ratios.p15,
  277. "ratios.p60" => ratios.p60,
  278. );
  279. }
  280. }
  281. info!(logger, "finished with hard query");
  282. Ok(n)
  283. }
  284. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  285. let Opt { input_path, output_path, hard_mode } = Opt::from_args();
  286. info!(logger, "beginning to count";
  287. "input_path" => %input_path.display(),
  288. );
  289. if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) }
  290. let input_file =
  291. fs::OpenOptions::new()
  292. .read(true)
  293. .open(input_path)
  294. .map_err(|e| e.to_string())?;
  295. let file_length = input_file.metadata().unwrap().len();
  296. if file_length % encoding::SERIALIZED_SIZE as u64 != 0 || file_length == 0 {
  297. return Err(format!("file length is not a multiple of record size: {}", file_length))
  298. }
  299. let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE;
  300. info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep());
  301. let data: memmap::Mmap = unsafe {
  302. memmap::Mmap::map(&input_file)
  303. .map_err(|e| {
  304. format!("creating Mmap failed: {}", e)
  305. })?
  306. };
  307. info!(logger, "opening output file for writing");
  308. let wtr = fs::File::create(&output_path)
  309. .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
  310. let wtr = io::BufWriter::new(wtr);
  311. if hard_mode {
  312. hard_query(&data, wtr, &logger)
  313. } else {
  314. easy_query(&data, wtr, &logger)
  315. }
  316. }
  317. fn main() {
  318. let start = Instant::now();
  319. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  320. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  321. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  322. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  323. match run(start, &logger) {
  324. Ok(n) => {
  325. let took = Instant::now() - start;
  326. info!(logger, "finished in {:?}", took;
  327. "n rows" => %n.thousands_sep(),
  328. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
  329. );
  330. }
  331. Err(e) => {
  332. crit!(logger, "run failed: {:?}", e);
  333. eprintln!("\n\nError: {}", e);
  334. std::thread::sleep(Duration::from_millis(100));
  335. std::process::exit(1);
  336. }
  337. }
  338. }