|
- #![allow(unused)]
-
- #[macro_use]
- extern crate slog;
- #[macro_use]
- extern crate markets;
-
- use std::io::{self, prelude::*};
- use std::fs;
- use std::path::{Path, PathBuf};
- use std::time::*;
- use pretty_toa::ThousandsSep;
- use structopt::StructOpt;
- use serde::{Serialize, Deserialize};
- use slog::Drain;
- use chrono::{DateTime, Utc, NaiveDateTime};
- use markets::crypto::{Exchange, Ticker, Side, Currency};
- use pipelines::encoding;
- use pipelines::windows::WeightedMeanWindow;
-
- macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
- eprintln!($fmt, $($args)*);
- std::process::exit(1);
- }}}
-
- const PROGRESS_EVERY: usize = 1024 * 1024 * 16;
- const ONE_SECOND: u64 = 1_000_000_000;
- const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
-
-
- #[structopt(rename_all="kebab-case")]
- #[derive(Debug, StructOpt)]
- struct Opt {
- /// Path to file with binary trades data
- #[structopt(short = "f", long = "input-file")]
- #[structopt(parse(from_os_str))]
- input_path: PathBuf,
-
- /// Where to save the query results (CSV output)
- #[structopt(short = "o", long = "output-path")]
- #[structopt(parse(from_os_str))]
- output_path: PathBuf,
-
- #[structopt(short = "z", long = "hard-mode")]
- hard_mode: bool,
- }
-
- fn nanos_to_utc(nanos: u64) -> DateTime<Utc> {
- const ONE_SECOND: u64 = 1_000_000_000;
- let sec: i64 = (nanos / ONE_SECOND) as i64;
- let nsec: u32 = (nanos % ONE_SECOND) as u32;
- let naive = NaiveDateTime::from_timestamp(sec, nsec);
- DateTime::from_utc(naive, Utc)
- }
-
- fn per_sec(n: usize, span: Duration) -> f64 {
- if n == 0 || span < Duration::from_micros(1) { return 0.0 }
- let s: f64 = span.as_nanos() as f64 / 1e9f64;
- n as f64 / s
- }
-
- fn nanos(utc: DateTime<Utc>) -> u64 {
- (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64)
- }
-
- fn easy_query<W>(
- data: &memmap::Mmap,
- mut wtr: W,
- logger: &slog::Logger,
- ) -> Result<usize, String>
- where W: Write
- {
- let logger = logger.new(o!("easy-mode" => "whatever, man"));
- info!(logger, "beginning easy mode");
-
- let n_records = data.len() / encoding::SERIALIZED_SIZE;
-
- let mut n = 0;
- let mut n_written = 0;
-
- let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);
-
- let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
-
- writeln!(&mut wtr, "time,ratio,bmex,gdax")
- .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
-
- assert!(n_records > 0);
- let first = encoding::PackedTradeData::new(records.next().unwrap());
- n += 1;
-
- let mut cur_hour = first.time() - first.time() % ONE_HOUR;
- let mut next_hour = cur_hour + ONE_HOUR;
-
- let mut bmex_total = 0.0;
- let mut bmex_amount = 0.0;
- let mut n_bmex = 0;
-
- let mut gdax_total = 0.0;
- let mut gdax_amount = 0.0;
- let mut n_gdax = 0;
-
- const MASK : i32 = i32::from_le_bytes([ 255, 255, 255, 0 ]);
- const BMEX_BTC_USD : i32 = i32::from_le_bytes([ e!(bmex) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);
- const GDAX_BTC_USD : i32 = i32::from_le_bytes([ e!(gdax) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);
-
- macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
- ($trade:ident) => {{
-
- let meta_sans_side: i32 = $trade.meta_i32() & MASK;
-
- let amount = $trade.amount();
- let total = $trade.price() * amount;
-
- if meta_sans_side == BMEX_BTC_USD {
- bmex_total += total;
- bmex_amount += amount;
- n_bmex += 1;
- } else if meta_sans_side == GDAX_BTC_USD {
- gdax_total += total;
- gdax_amount += amount;
- n_gdax += 1;
- }
-
- /*
- let is_bmex_btc_usd: f64 = (meta_sans_side == BMEX_BTC_USD) as u8 as f64;
-
- let is_gdax_btc_usd: f64 = (meta_sans_side == GDAX_BTC_USD) as u8 as f64;
-
- bmex_total += is_bmex_btc_usd * total;
- bmex_amount += is_bmex_btc_usd * amount;
- n_bmex += is_bmex_btc_usd as usize * 1;
-
- gdax_total += is_gdax_btc_usd * total;
- gdax_amount += is_gdax_btc_usd * amount;
- n_gdax += is_gdax_btc_usd as usize * 1;
- */
-
-
- //match ($trade.exch(), $trade.base(), $trade.quote()) {
- // (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
- // n_bmex += 1;
- // }
-
- // (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
- // gdax_total += $trade.price() * $trade.amount();
- // gdax_amount += $trade.amount();
- // n_gdax += 1;
-
- // }
- //
- // _ => {}
- //}
- }}
- }
-
- update!(first);
-
- for record in records {
- n += 1;
-
- let trade = encoding::PackedTradeData::new(record);
-
- if trade.time() > next_hour {
- row_buffer.clear();
- itoa::write(&mut row_buffer, cur_hour).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
-
- if n_bmex == 0 || n_gdax == 0 {
- row_buffer.write(",NaN,NaN,NaN\n".as_bytes()).unwrap();
- } else {
- let bmex_wt_avg = bmex_total / bmex_amount;
- let gdax_wt_avg = gdax_total / gdax_amount;
- let ratio = bmex_wt_avg / gdax_wt_avg;
-
- row_buffer.push(b',');
- dtoa::write(&mut row_buffer, ratio).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
- row_buffer.push(b',');
- dtoa::write(&mut row_buffer, bmex_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
- row_buffer.push(b',');
- dtoa::write(&mut row_buffer, gdax_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
- row_buffer.push(b'\n');
- }
-
- wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
- n_written += 1;
-
- bmex_total = 0.0;
- bmex_amount = 0.0;
- gdax_total = 0.0;
- gdax_amount = 0.0;
- n_bmex = 0;
- n_gdax = 0;
-
- cur_hour = next_hour;
- next_hour += ONE_HOUR;
-
- // if we are skipping hours in between the last and current row, we
- // need to write a NaN row for the hours that had no data
- while next_hour <= trade.time() {
- writeln!(&mut wtr, "{},NaN,NaN,NaN", cur_hour)
- .map_err(|e| format!("writing output row failed: {}", e))?;
-
- n_written += 1;
- cur_hour = next_hour;
- next_hour += ONE_HOUR;
- }
- }
-
- update!(trade);
-
- if n % PROGRESS_EVERY == 0 {
- info!(logger, "calculating query";
- "n" => %n.thousands_sep(),
- "n_written" => %n_written.thousands_sep(),
- );
- }
- }
- info!(logger, "finished with easy query");
- Ok(n)
-
- }
-
- fn hard_query<W>(
- data: &memmap::Mmap,
- mut wtr: W,
- logger: &slog::Logger,
- ) -> Result<usize, String>
- where W: Write
- {
- let logger = logger.new(o!("hard-mode" => "challenge accepted"));
- info!(logger, "beginning hard mode");
-
- let n_records = data.len() / encoding::SERIALIZED_SIZE;
-
- let mut n = 0;
- let mut n_written = 0;
-
- let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);
-
- // pull out first row to initialize query calculations
- assert!(n_records > 0);
- let first = encoding::PackedTradeData::new(records.next().unwrap());
- n += 1;
-
- let mut cur_bucket = first.time() - (first.time() % (ONE_SECOND * 10)) + ONE_SECOND * 10;
-
- #[derive(Default, Clone)]
- struct Lookbacks<T> {
- pub p5: T,
- pub p15: T,
- pub p60: T,
- }
-
- let mut ratios: Lookbacks<f64> = Default::default();
-
- let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
- Lookbacks {
- p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
- p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
- p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
- };
-
- let mut gdax_windows = bmex_windows.clone();
-
- let mut row_buffer: Vec<u8> = Vec::with_capacity(512);
-
- macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
- ($trade:ident) => {{
- match ($trade.exch(), $trade.base(), $trade.quote()) {
- (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
- bmex_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
- bmex_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
- bmex_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
- }
-
- (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
- gdax_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
- gdax_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
- gdax_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
- }
-
- _ => {}
- }
- }}
- }
-
- writeln!(&mut wtr, "time,r5,r15,r60")
- .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
-
- update!(first);
-
- for record in records {
- n += 1;
-
- let trade = encoding::PackedTradeData::new(record);
-
- if trade.time() > cur_bucket {
- debug!(logger, "about to purge";
- "n" => n,
- "n written" => n_written,
- "trade.time" => trade.time(),
- "cur_bucket" => cur_bucket,
- "gdax p5 len" => gdax_windows.p5.len(),
- "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
- );
-
- bmex_windows.p5 .purge(cur_bucket);
- bmex_windows.p15.purge(cur_bucket);
- bmex_windows.p60.purge(cur_bucket);
-
- gdax_windows.p5 .purge(cur_bucket);
- gdax_windows.p15.purge(cur_bucket);
- gdax_windows.p60.purge(cur_bucket);
-
- debug!(logger, "finished purge";
- "n" => n,
- "n written" => n_written,
- "trade.time" => trade.time(),
- "cur_bucket" => cur_bucket,
- "gdax p5 len" => gdax_windows.p5.len(),
- "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
- );
-
- ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
- ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
- ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();
-
- //row_buffers.iter_mut().for_each(|x| x.clear());
- row_buffer.clear();
-
- itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
- row_buffer.push(b',');
- dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
- row_buffer.push(b',');
- dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
- row_buffer.push(b',');
- dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
- row_buffer.push(b'\n');
-
- wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
-
- n_written += 1;
- cur_bucket += ONE_SECOND * 10;
- }
-
- update!(trade);
-
- if n % PROGRESS_EVERY == 0 {
- info!(logger, "calculating hard query";
- "n" => %n.thousands_sep(),
- "n_written" => %n_written.thousands_sep(),
- "ratios.p5" => ratios.p5,
- "ratios.p15" => ratios.p15,
- "ratios.p60" => ratios.p60,
- );
- }
- }
- info!(logger, "finished with hard query");
- Ok(n)
- }
-
-
-
- fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
- let Opt { input_path, output_path, hard_mode } = Opt::from_args();
-
- info!(logger, "beginning to count";
- "input_path" => %input_path.display(),
- );
-
- if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) }
-
- let input_file =
- fs::OpenOptions::new()
- .read(true)
- .open(input_path)
- .map_err(|e| e.to_string())?;
-
- let file_length = input_file.metadata().unwrap().len();
-
- if file_length % encoding::SERIALIZED_SIZE as u64 != 0 || file_length == 0 {
- return Err(format!("file length is not a multiple of record size: {}", file_length))
- }
-
- let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE;
-
- info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep());
-
- let data: memmap::Mmap = unsafe {
- memmap::Mmap::map(&input_file)
- .map_err(|e| {
- format!("creating Mmap failed: {}", e)
- })?
- };
-
- info!(logger, "opening output file for writing");
-
- let wtr = fs::File::create(&output_path)
- .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
-
- let wtr = io::BufWriter::new(wtr);
-
- if hard_mode {
- hard_query(&data, wtr, &logger)
- } else {
- easy_query(&data, wtr, &logger)
- }
- }
-
- fn main() {
- let start = Instant::now();
-
- let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
- let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
- let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
- let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
-
- match run(start, &logger) {
- Ok(n) => {
- let took = Instant::now() - start;
- info!(logger, "finished in {:?}", took;
- "n rows" => %n.thousands_sep(),
- "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
- );
- }
-
- Err(e) => {
- crit!(logger, "run failed: {:?}", e);
- eprintln!("\n\nError: {}", e);
- std::thread::sleep(Duration::from_millis(100));
- std::process::exit(1);
- }
- }
-
- }
|