You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

424 lines
14KB

  1. #![allow(unused)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::io::{self, prelude::*};
  7. use std::fs;
  8. use std::path::{Path, PathBuf};
  9. use std::time::*;
  10. use pretty_toa::ThousandsSep;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use chrono::{DateTime, Utc, NaiveDateTime};
  15. use markets::crypto::{Exchange, Ticker, Side, Currency};
  16. use pipelines::encoding;
  17. use pipelines::windows::WeightedMeanWindow;
  18. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  19. eprintln!($fmt, $($args)*);
  20. std::process::exit(1);
  21. }}}
  22. const PROGRESS_EVERY: usize = 1024 * 1024 * 16;
  23. const ONE_SECOND: u64 = 1_000_000_000;
  24. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  25. #[structopt(rename_all="kebab-case")]
  26. #[derive(Debug, StructOpt)]
  27. struct Opt {
  28. /// Path to file with binary trades data
  29. #[structopt(short = "f", long = "input-file")]
  30. #[structopt(parse(from_os_str))]
  31. input_path: PathBuf,
  32. /// Where to save the query results (CSV output)
  33. #[structopt(short = "o", long = "output-path")]
  34. #[structopt(parse(from_os_str))]
  35. output_path: PathBuf,
  36. #[structopt(short = "z", long = "hard-mode")]
  37. hard_mode: bool,
  38. }
  39. fn nanos_to_utc(nanos: u64) -> DateTime<Utc> {
  40. const ONE_SECOND: u64 = 1_000_000_000;
  41. let sec: i64 = (nanos / ONE_SECOND) as i64;
  42. let nsec: u32 = (nanos % ONE_SECOND) as u32;
  43. let naive = NaiveDateTime::from_timestamp(sec, nsec);
  44. DateTime::from_utc(naive, Utc)
  45. }
  46. fn per_sec(n: usize, span: Duration) -> f64 {
  47. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  48. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  49. n as f64 / s
  50. }
  51. fn nanos(utc: DateTime<Utc>) -> u64 {
  52. (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64)
  53. }
  54. fn easy_query<W>(
  55. data: &memmap::Mmap,
  56. mut wtr: W,
  57. logger: &slog::Logger,
  58. ) -> Result<usize, String>
  59. where W: Write
  60. {
  61. let logger = logger.new(o!("easy-mode" => "whatever, man"));
  62. info!(logger, "beginning easy mode");
  63. let n_records = data.len() / encoding::SERIALIZED_SIZE;
  64. let mut n = 0;
  65. let mut n_written = 0;
  66. let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);
  67. let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
  68. writeln!(&mut wtr, "time,ratio,bmex,gdax")
  69. .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  70. assert!(n_records > 0);
  71. let first = encoding::PackedTradeData::new(records.next().unwrap());
  72. n += 1;
  73. let mut cur_hour = first.time() - first.time() % ONE_HOUR;
  74. let mut next_hour = cur_hour + ONE_HOUR;
  75. let mut bmex_total = 0.0;
  76. let mut bmex_amount = 0.0;
  77. let mut n_bmex = 0;
  78. let mut gdax_total = 0.0;
  79. let mut gdax_amount = 0.0;
  80. let mut n_gdax = 0;
  81. const MASK : i32 = i32::from_le_bytes([ 255, 255, 255, 0]);
  82. const BMEX_BTC_USD : i32 = i32::from_le_bytes([ e!(bmex) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);
  83. const GDAX_BTC_USD : i32 = i32::from_le_bytes([ e!(gdax) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);
  84. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  85. ($trade:ident) => {{
  86. let meta_sans_side: i32 = $trade.meta_i32() & MASK;
  87. let is_bmex_btc_usd: f64 = (meta_sans_side == BMEX_BTC_USD) as u8 as f64;
  88. let is_gdax_btc_usd: f64 = (meta_sans_side == GDAX_BTC_USD) as u8 as f64;
  89. let amount = $trade.amount();
  90. let total = $trade.price() * amount;
  91. bmex_total += is_bmex_btc_usd * total;
  92. bmex_amount += is_bmex_btc_usd * amount;
  93. n_bmex += is_bmex_btc_usd as usize * 1;
  94. gdax_total += is_gdax_btc_usd * total;
  95. gdax_amount += is_gdax_btc_usd * amount;
  96. n_gdax += is_gdax_btc_usd as usize * 1;
  97. //match ($trade.exch(), $trade.base(), $trade.quote()) {
  98. // (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
  99. // n_bmex += 1;
  100. // }
  101. // (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
  102. // gdax_total += $trade.price() * $trade.amount();
  103. // gdax_amount += $trade.amount();
  104. // n_gdax += 1;
  105. // }
  106. //
  107. // _ => {}
  108. //}
  109. }}
  110. }
  111. update!(first);
  112. for record in records {
  113. n += 1;
  114. let trade = encoding::PackedTradeData::new(record);
  115. if trade.time() > next_hour {
  116. row_buffer.clear();
  117. itoa::write(&mut row_buffer, cur_hour).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  118. if n_bmex == 0 || n_gdax == 0 {
  119. row_buffer.write(",NaN,NaN,NaN\n".as_bytes()).unwrap();
  120. } else {
  121. let bmex_wt_avg = bmex_total / bmex_amount;
  122. let gdax_wt_avg = gdax_total / gdax_amount;
  123. let ratio = bmex_wt_avg / gdax_wt_avg;
  124. row_buffer.push(b',');
  125. dtoa::write(&mut row_buffer, ratio).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  126. row_buffer.push(b',');
  127. dtoa::write(&mut row_buffer, bmex_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  128. row_buffer.push(b',');
  129. dtoa::write(&mut row_buffer, gdax_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  130. row_buffer.push(b'\n');
  131. }
  132. wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
  133. n_written += 1;
  134. bmex_total = 0.0;
  135. bmex_amount = 0.0;
  136. gdax_total = 0.0;
  137. gdax_amount = 0.0;
  138. n_bmex = 0;
  139. n_gdax = 0;
  140. cur_hour = next_hour;
  141. next_hour += ONE_HOUR;
  142. // if we are skipping hours in between the last and current row, we
  143. // need to write a NaN row for the hours that had no data
  144. while next_hour <= trade.time() {
  145. writeln!(&mut wtr, "{},NaN,NaN,NaN", cur_hour)
  146. .map_err(|e| format!("writing output row failed: {}", e))?;
  147. n_written += 1;
  148. cur_hour = next_hour;
  149. next_hour += ONE_HOUR;
  150. }
  151. }
  152. update!(trade);
  153. if n % PROGRESS_EVERY == 0 {
  154. info!(logger, "calculating query";
  155. "n" => %n.thousands_sep(),
  156. "n_written" => %n_written.thousands_sep(),
  157. );
  158. }
  159. }
  160. info!(logger, "finished with easy query");
  161. Ok(n)
  162. }
  163. fn hard_query<W>(
  164. data: &memmap::Mmap,
  165. mut wtr: W,
  166. logger: &slog::Logger,
  167. ) -> Result<usize, String>
  168. where W: Write
  169. {
  170. let logger = logger.new(o!("hard-mode" => "challenge accepted"));
  171. info!(logger, "beginning hard mode");
  172. let n_records = data.len() / encoding::SERIALIZED_SIZE;
  173. let mut n = 0;
  174. let mut n_written = 0;
  175. let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);
  176. // pull out first row to initialize query calculations
  177. assert!(n_records > 0);
  178. let first = encoding::PackedTradeData::new(records.next().unwrap());
  179. n += 1;
  180. let mut cur_bucket = first.time() - (first.time() % (ONE_SECOND * 10)) + ONE_SECOND * 10;
  181. #[derive(Default, Clone)]
  182. struct Lookbacks<T> {
  183. pub p5: T,
  184. pub p15: T,
  185. pub p60: T,
  186. }
  187. let mut ratios: Lookbacks<f64> = Default::default();
  188. let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
  189. Lookbacks {
  190. p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
  191. p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
  192. p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
  193. };
  194. let mut gdax_windows = bmex_windows.clone();
  195. let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
  196. macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
  197. ($trade:ident) => {{
  198. match ($trade.exch(), $trade.base(), $trade.quote()) {
  199. (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
  200. bmex_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
  201. bmex_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
  202. bmex_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
  203. }
  204. (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
  205. gdax_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
  206. gdax_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
  207. gdax_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
  208. }
  209. _ => {}
  210. }
  211. }}
  212. }
  213. writeln!(&mut wtr, "time,r5,r15,r60")
  214. .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  215. update!(first);
  216. for record in records {
  217. n += 1;
  218. let trade = encoding::PackedTradeData::new(record);
  219. if trade.time() > cur_bucket {
  220. debug!(logger, "about to purge";
  221. "n" => n,
  222. "n written" => n_written,
  223. "trade.time" => trade.time(),
  224. "cur_bucket" => cur_bucket,
  225. "gdax p5 len" => gdax_windows.p5.len(),
  226. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  227. );
  228. bmex_windows.p5 .purge(cur_bucket);
  229. bmex_windows.p15.purge(cur_bucket);
  230. bmex_windows.p60.purge(cur_bucket);
  231. gdax_windows.p5 .purge(cur_bucket);
  232. gdax_windows.p15.purge(cur_bucket);
  233. gdax_windows.p60.purge(cur_bucket);
  234. debug!(logger, "finished purge";
  235. "n" => n,
  236. "n written" => n_written,
  237. "trade.time" => trade.time(),
  238. "cur_bucket" => cur_bucket,
  239. "gdax p5 len" => gdax_windows.p5.len(),
  240. "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
  241. );
  242. ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
  243. ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
  244. ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
  245. //row_buffers.iter_mut().for_each(|x| x.clear());
  246. row_buffer.clear();
  247. itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  248. row_buffer.push(b',');
  249. dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  250. row_buffer.push(b',');
  251. dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  252. row_buffer.push(b',');
  253. dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
  254. row_buffer.push(b'\n');
  255. wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
  256. n_written += 1;
  257. cur_bucket += ONE_SECOND * 10;
  258. }
  259. update!(trade);
  260. if n % PROGRESS_EVERY == 0 {
  261. info!(logger, "calculating hard query";
  262. "n" => %n.thousands_sep(),
  263. "n_written" => %n_written.thousands_sep(),
  264. "ratios.p5" => ratios.p5,
  265. "ratios.p15" => ratios.p15,
  266. "ratios.p60" => ratios.p60,
  267. );
  268. }
  269. }
  270. info!(logger, "finished with hard query");
  271. Ok(n)
  272. }
  273. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  274. let Opt { input_path, output_path, hard_mode } = Opt::from_args();
  275. info!(logger, "beginning to count";
  276. "input_path" => %input_path.display(),
  277. );
  278. if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) }
  279. let input_file =
  280. fs::OpenOptions::new()
  281. .read(true)
  282. .open(input_path)
  283. .map_err(|e| e.to_string())?;
  284. let file_length = input_file.metadata().unwrap().len();
  285. if file_length % encoding::SERIALIZED_SIZE as u64 != 0 || file_length == 0 {
  286. return Err(format!("file length is not a multiple of record size: {}", file_length))
  287. }
  288. let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE;
  289. info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep());
  290. let data: memmap::Mmap = unsafe {
  291. memmap::Mmap::map(&input_file)
  292. .map_err(|e| {
  293. format!("creating Mmap failed: {}", e)
  294. })?
  295. };
  296. info!(logger, "opening output file for writing");
  297. let wtr = fs::File::create(&output_path)
  298. .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
  299. let wtr = io::BufWriter::new(wtr);
  300. if hard_mode {
  301. hard_query(&data, wtr, &logger)
  302. } else {
  303. easy_query(&data, wtr, &logger)
  304. }
  305. }
  306. fn main() {
  307. let start = Instant::now();
  308. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  309. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  310. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  311. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  312. match run(start, &logger) {
  313. Ok(n) => {
  314. let took = Instant::now() - start;
  315. info!(logger, "finished in {:?}", took;
  316. "n rows" => %n.thousands_sep(),
  317. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
  318. );
  319. }
  320. Err(e) => {
  321. crit!(logger, "run failed: {:?}", e);
  322. eprintln!("\n\nError: {}", e);
  323. std::thread::sleep(Duration::from_millis(100));
  324. std::process::exit(1);
  325. }
  326. }
  327. }