#![allow(unused)] #[macro_use] extern crate slog; #[macro_use] extern crate markets; use std::io::{self, prelude::*}; use std::fs; use std::path::{Path, PathBuf}; use std::time::*; use pretty_toa::ThousandsSep; use structopt::StructOpt; use serde::{Serialize, Deserialize}; use slog::Drain; use chrono::{DateTime, Utc, NaiveDateTime}; use markets::crypto::{Exchange, Ticker, Side, Currency}; use pipelines::encoding; use pipelines::windows::WeightedMeanWindow; macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ eprintln!($fmt, $($args)*); std::process::exit(1); }}} const PROGRESS_EVERY: usize = 1024 * 1024 * 16; const ONE_SECOND: u64 = 1_000_000_000; const ONE_HOUR: u64 = ONE_SECOND * 60 * 60; #[structopt(rename_all="kebab-case")] #[derive(Debug, StructOpt)] struct Opt { /// Path to file with binary trades data #[structopt(short = "f", long = "input-file")] #[structopt(parse(from_os_str))] input_path: PathBuf, /// Where to save the query results (CSV output) #[structopt(short = "o", long = "output-path")] #[structopt(parse(from_os_str))] output_path: PathBuf, #[structopt(short = "z", long = "hard-mode")] hard_mode: bool, } fn nanos_to_utc(nanos: u64) -> DateTime { const ONE_SECOND: u64 = 1_000_000_000; let sec: i64 = (nanos / ONE_SECOND) as i64; let nsec: u32 = (nanos % ONE_SECOND) as u32; let naive = NaiveDateTime::from_timestamp(sec, nsec); DateTime::from_utc(naive, Utc) } fn per_sec(n: usize, span: Duration) -> f64 { if n == 0 || span < Duration::from_micros(1) { return 0.0 } let s: f64 = span.as_nanos() as f64 / 1e9f64; n as f64 / s } fn nanos(utc: DateTime) -> u64 { (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64) } fn easy_query( data: &memmap::Mmap, mut wtr: W, logger: &slog::Logger, ) -> Result where W: Write { let logger = logger.new(o!("easy-mode" => "whatever, man")); info!(logger, "beginning easy mode"); let n_records = data.len() / encoding::SERIALIZED_SIZE; let mut n = 0; let mut n_written = 0; let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE); let mut row_buffer: Vec = Vec::with_capacity(512); writeln!(&mut wtr, "time,ratio,bmex,gdax") .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?; assert!(n_records > 0); let first = encoding::PackedTradeData::new(records.next().unwrap()); n += 1; let mut cur_hour = first.time() - first.time() % ONE_HOUR; let mut next_hour = cur_hour + ONE_HOUR; let mut bmex_total = 0.0; let mut bmex_amount = 0.0; let mut n_bmex = 0; let mut gdax_total = 0.0; let mut gdax_amount = 0.0; let mut n_gdax = 0; const MASK : i32 = i32::from_le_bytes([ 255, 255, 255, 0 ]); const BMEX_BTC_USD : i32 = i32::from_le_bytes([ e!(bmex) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]); const GDAX_BTC_USD : i32 = i32::from_le_bytes([ e!(gdax) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]); macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body ($trade:ident) => {{ let meta_sans_side: i32 = $trade.meta_i32() & MASK; let amount = $trade.amount(); let total = $trade.price() * amount; if meta_sans_side == BMEX_BTC_USD { bmex_total += total; bmex_amount += amount; n_bmex += 1; } else if meta_sans_side == GDAX_BTC_USD { gdax_total += total; gdax_amount += amount; n_gdax += 1; } /* let is_bmex_btc_usd: f64 = (meta_sans_side == BMEX_BTC_USD) as u8 as f64; let is_gdax_btc_usd: f64 = (meta_sans_side == GDAX_BTC_USD) as u8 as f64; bmex_total += is_bmex_btc_usd * total; bmex_amount += is_bmex_btc_usd * amount; n_bmex += is_bmex_btc_usd as usize * 1; gdax_total += is_gdax_btc_usd * total; gdax_amount += is_gdax_btc_usd * amount; n_gdax += is_gdax_btc_usd as usize * 1; */ //match ($trade.exch(), $trade.base(), $trade.quote()) { // (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => { // n_bmex += 1; // } // (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => { // gdax_total += $trade.price() * $trade.amount(); // gdax_amount += $trade.amount(); // n_gdax += 1; // } // // _ => {} //} }} } update!(first); for record in records { n += 1; let trade = encoding::PackedTradeData::new(record); if trade.time() > next_hour { row_buffer.clear(); itoa::write(&mut row_buffer, cur_hour).map_err(|e| format!("serializing number to buffer failed: {}", e))?; if n_bmex == 0 || n_gdax == 0 { row_buffer.write(",NaN,NaN,NaN\n".as_bytes()).unwrap(); } else { let bmex_wt_avg = bmex_total / bmex_amount; let gdax_wt_avg = gdax_total / gdax_amount; let ratio = bmex_wt_avg / gdax_wt_avg; row_buffer.push(b','); dtoa::write(&mut row_buffer, ratio).map_err(|e| format!("serializing number to buffer failed: {}", e))?; row_buffer.push(b','); dtoa::write(&mut row_buffer, bmex_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?; row_buffer.push(b','); dtoa::write(&mut row_buffer, gdax_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?; row_buffer.push(b'\n'); } wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?; n_written += 1; bmex_total = 0.0; bmex_amount = 0.0; gdax_total = 0.0; gdax_amount = 0.0; n_bmex = 0; n_gdax = 0; cur_hour = next_hour; next_hour += ONE_HOUR; // if we are skipping hours in between the last and current row, we // need to write a NaN row for the hours that had no data while next_hour <= trade.time() { writeln!(&mut wtr, "{},NaN,NaN,NaN", cur_hour) .map_err(|e| format!("writing output row failed: {}", e))?; n_written += 1; cur_hour = next_hour; next_hour += ONE_HOUR; } } update!(trade); if n % PROGRESS_EVERY == 0 { info!(logger, "calculating query"; "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep(), ); } } info!(logger, "finished with easy query"); Ok(n) } fn hard_query( data: &memmap::Mmap, mut wtr: W, logger: &slog::Logger, ) -> Result where W: Write { let logger = logger.new(o!("hard-mode" => "challenge accepted")); info!(logger, "beginning hard mode"); let n_records = data.len() / encoding::SERIALIZED_SIZE; let mut n = 0; let mut n_written = 0; let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE); // pull out first row to initialize query calculations assert!(n_records > 0); let first = encoding::PackedTradeData::new(records.next().unwrap()); n += 1; let mut cur_bucket = first.time() - (first.time() % (ONE_SECOND * 10)) + ONE_SECOND * 10; #[derive(Default, Clone)] struct Lookbacks { pub p5: T, pub p15: T, pub p60: T, } let mut ratios: Lookbacks = Default::default(); let mut bmex_windows: Lookbacks = Lookbacks { p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ), p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15), p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60), }; let mut gdax_windows = bmex_windows.clone(); let mut row_buffer: Vec = Vec::with_capacity(512); macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body ($trade:ident) => {{ match ($trade.exch(), $trade.base(), $trade.quote()) { (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => { bmex_windows.p5 .push($trade.time(), $trade.price(), $trade.amount()); bmex_windows.p15.push($trade.time(), $trade.price(), $trade.amount()); bmex_windows.p60.push($trade.time(), $trade.price(), $trade.amount()); } (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => { gdax_windows.p5 .push($trade.time(), $trade.price(), $trade.amount()); gdax_windows.p15.push($trade.time(), $trade.price(), $trade.amount()); gdax_windows.p60.push($trade.time(), $trade.price(), $trade.amount()); } _ => {} } }} } writeln!(&mut wtr, "time,r5,r15,r60") .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?; update!(first); for record in records { n += 1; let trade = encoding::PackedTradeData::new(record); if trade.time() > cur_bucket { debug!(logger, "about to purge"; "n" => n, "n written" => n_written, "trade.time" => trade.time(), "cur_bucket" => cur_bucket, "gdax p5 len" => gdax_windows.p5.len(), "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(), ); bmex_windows.p5 .purge(cur_bucket); bmex_windows.p15.purge(cur_bucket); bmex_windows.p60.purge(cur_bucket); gdax_windows.p5 .purge(cur_bucket); gdax_windows.p15.purge(cur_bucket); gdax_windows.p60.purge(cur_bucket); debug!(logger, "finished purge"; "n" => n, "n written" => n_written, "trade.time" => trade.time(), "cur_bucket" => cur_bucket, "gdax p5 len" => gdax_windows.p5.len(), "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(), ); ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean(); ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean(); ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean(); //row_buffers.iter_mut().for_each(|x| x.clear()); row_buffer.clear(); itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?; row_buffer.push(b','); dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?; row_buffer.push(b','); dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?; row_buffer.push(b','); dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?; row_buffer.push(b'\n'); wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?; n_written += 1; cur_bucket += ONE_SECOND * 10; } update!(trade); if n % PROGRESS_EVERY == 0 { info!(logger, "calculating hard query"; "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep(), "ratios.p5" => ratios.p5, "ratios.p15" => ratios.p15, "ratios.p60" => ratios.p60, ); } } info!(logger, "finished with hard query"); Ok(n) } fn run(start: Instant, logger: &slog::Logger) -> Result { let Opt { input_path, output_path, hard_mode } = Opt::from_args(); info!(logger, "beginning to count"; "input_path" => %input_path.display(), ); if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) } let input_file = fs::OpenOptions::new() .read(true) .open(input_path) .map_err(|e| e.to_string())?; let file_length = input_file.metadata().unwrap().len(); if file_length % encoding::SERIALIZED_SIZE as u64 != 0 || file_length == 0 { return Err(format!("file length is not a multiple of record size: {}", file_length)) } let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE; info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep()); let data: memmap::Mmap = unsafe { memmap::Mmap::map(&input_file) .map_err(|e| { format!("creating Mmap failed: {}", e) })? }; info!(logger, "opening output file for writing"); let wtr = fs::File::create(&output_path) .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; let wtr = io::BufWriter::new(wtr); if hard_mode { hard_query(&data, wtr, &logger) } else { easy_query(&data, wtr, &logger) } } fn main() { let start = Instant::now(); let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!())); match run(start, &logger) { Ok(n) => { let took = Instant::now() - start; info!(logger, "finished in {:?}", took; "n rows" => %n.thousands_sep(), "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(), ); } Err(e) => { crit!(logger, "run failed: {:?}", e); eprintln!("\n\nError: {}", e); std::thread::sleep(Duration::from_millis(100)); std::process::exit(1); } } }