From ab2b22e0be7b45e4dd016cca4cdbd3e25b53900c Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 9 Apr 2020 08:06:17 -0400 Subject: [PATCH] working hard query csv --- src/csv.rs | 207 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/windows.rs | 98 +++++++++++++++++++++++ 3 files changed, 306 insertions(+) diff --git a/src/csv.rs b/src/csv.rs index 98aec41..50be1b4 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -9,11 +9,13 @@ use std::path::PathBuf; use std::time::*; use std::io::{self, prelude::*}; use std::fs; +use std::f64::NAN; use structopt::StructOpt; use serde::{Serialize, Deserialize}; use slog::Drain; use pretty_toa::ThousandsSep; use markets::crypto::{Exchange, Ticker, Side}; +use pipelines::windows::WeightedAvgWindow; // equivalent to panic! but without the ugly 'thread main panicked' yada yada @@ -38,6 +40,9 @@ struct Opt { #[structopt(short = "o", long = "output-path")] #[structopt(parse(from_os_str))] output_path: PathBuf, + + #[structopt(short = "z", long = "hard-mode")] + hard_mode: bool, } #[derive(Deserialize)] @@ -172,6 +177,206 @@ fn fast_parse_bytes(mut rdr: csv::Reader) -> Result { Ok(n) } +fn hard_mode( + mut rdr: csv::Reader, + mut wtr: csv::Writer, + logger: &slog::Logger, +) -> Result + where R: Read, + W: Write +{ + let logger = logger.new(o!("hard-mode" => "challenge accepted")); + info!(logger, "beginning hard mode"); + + let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + let mut row = csv::StringRecord::new(); + + // pull out first row to initialize query calculations + rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?; + let trade: Trade = row.deserialize(Some(&headers)) + .map_err(|e| { + format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row) + })?; + + let mut cur_bucket = trade.time - (trade.time % (ONE_SECOND * 10)) + ONE_SECOND * 10; + //let mut next_bucket = cur_bucket + ONE_SECOND * 10; + let mut last_price: f64 = NAN; + + #[derive(Default, Clone)] + struct Lookbacks { + pub p5: T, + pub p15: T, + pub p60: T, + } + + let mut bprices: Lookbacks = Default::default(); + let mut gprices: Lookbacks = Default::default(); + + let mut ratios: Lookbacks = Default::default(); + + let mut bwindows: Lookbacks = + Lookbacks { + p5: WeightedAvgWindow::new(ONE_SECOND * 60 * 5 ), + p15: WeightedAvgWindow::new(ONE_SECOND * 60 * 15), + p60: WeightedAvgWindow::new(ONE_SECOND * 60 * 60), + }; + let mut gwindows = bwindows.clone(); + + + #[inline(always)] + fn do_purge(windows: &mut Lookbacks, prices: &mut Lookbacks, time: u64) { + if windows.p5.purge(time) { prices.p5 = windows.p5 .checked_wt_mean().unwrap_or(NAN); } + if windows.p15.purge(time) { prices.p15 = windows.p15.checked_wt_mean().unwrap_or(NAN); } + if windows.p60.purge(time) { prices.p60 = windows.p60.checked_wt_mean().unwrap_or(NAN); } + } + + #[inline(always)] + fn do_update(windows: &mut Lookbacks, prices: &mut Lookbacks, time: u64, price: f64, amount: f64) { + //prices.p5 = windows.p5 .update(time, price, amount).unwrap_or(NAN); + //prices.p15 = windows.p15.update(time, price, amount).unwrap_or(NAN); + //prices.p60 = windows.p60.update(time, price, amount).unwrap_or(NAN); + + windows.p5 .push(time, price, amount); + windows.p15.push(time, price, amount); + windows.p60.push(time, price, amount); + + } + + macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body + ($trade:ident) => {{ + match $trade.exch { + e!(bmex) => { + do_update(&mut bwindows, &mut bprices, $trade.time, $trade.price, $trade.amount); + //do_purge(&mut gwindows, &mut gprices, $trade.time); + last_price = $trade.price; + } + + e!(gdax) => { + do_update(&mut gwindows, &mut gprices, $trade.time, $trade.price, $trade.amount); + //do_purge(&mut bwindows, &mut bprices, $trade.time); + last_price = $trade.price; + } + + _ => {} + } + }} + } + + wtr.write_record(&[ + "time", + "last", + "bmex_5min", + "gdax_5min", + "n_bmex_p5", + "n_gdax_p5", + "r5", + "r15", + "r60", + //"n_bmex_p5", + //"n_bmex_p15", + //"n_bmex_p60", + //"n_gdax_p5", + //"n_gdax_p15", + //"n_gdax_p60", + //"gdax_p5_is_empty", + //"gdax_p5_checked_wt_mean", + //"tradetime_minus_cur_bucket", + ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?; + + if trade.ticker == t!(btc-usd) { update!(trade); } + + let mut n = 0; + let mut n_written = 0; + + while rdr.read_record(&mut row) + .map_err(|e| { + format!("reading row {} failed: {}", (n+1).thousands_sep(), e) + })? + { + n += 1; + + let trade: Trade = row.deserialize(Some(&headers)) + .map_err(|e| { + format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row) + })?; + + if trade.time > cur_bucket { + debug!(logger, "about to purge"; + "n" => n, + "n written" => n_written, + "trade.time" => trade.time, + "cur_bucket" => cur_bucket, + "gdax p5 len" => gwindows.p5.len(), + "gdax p5 wt avg" => gwindows.p5.wt_mean(), + ); + + do_purge(&mut gwindows, &mut gprices, cur_bucket); + do_purge(&mut bwindows, &mut bprices, cur_bucket); + + debug!(logger, "finished purge"; + "n" => n, + "n written" => n_written, + "trade.time" => trade.time, + "cur_bucket" => cur_bucket, + "gdax p5 len" => gwindows.p5.len(), + "gdax p5 wt avg" => gwindows.p5.wt_mean(), + ); + + + ratios.p5 = bwindows.p5 .checked_wt_mean().unwrap_or(NAN) / gwindows.p5 .checked_wt_mean().unwrap_or(NAN); + ratios.p15 = bwindows.p15.checked_wt_mean().unwrap_or(NAN) / gwindows.p15.checked_wt_mean().unwrap_or(NAN); + ratios.p60 = bwindows.p60.checked_wt_mean().unwrap_or(NAN) / gwindows.p60.checked_wt_mean().unwrap_or(NAN); + + //ratios.p5 = bwindows.p5 .wt_mean() / gwindows.p5 .wt_mean(); + //ratios.p15 = bwindows.p15.wt_mean() / gwindows.p15.wt_mean(); + //ratios.p60 = bwindows.p60.wt_mean() / gwindows.p60.wt_mean(); + + wtr.write_record(&[ + &format!("{}", cur_bucket), + &format!("{}", last_price), + &format!("{}", bwindows.p5.checked_wt_mean().unwrap_or(NAN)), + &format!("{}", gwindows.p5.checked_wt_mean().unwrap_or(NAN)), + &format!("{}", bwindows.p5.len()), + &format!("{}", gwindows.p5.len()), + &format!("{}", ratios.p5), + &format!("{}", ratios.p15), + &format!("{}", ratios.p60), + //&format!("{}", bwindows.p15.len()), + //&format!("{}", gwindows.p60.len()), + //&format!("{}", gwindows.p15.len()), + //&format!("{}", gwindows.p15.len()), + //&format!("{}", bwindows.p60.len()), + //&format!("{}", bwindows.p5.is_empty()), + //&format!("{:?}", bwindows.p5.checked_wt_mean()), + //&format!("{}", trade.time - cur_bucket), + + ]).map_err(|e| { + format!("writing csv row failed: {}", e) + })?; + n_written += 1; + cur_bucket += ONE_SECOND * 10; + //cur_bucket = next_bucket; + //next_bucket += ONE_SECOND * 10; + } + + if trade.ticker == t!(btc-usd) { update!(trade); } + + if n % PROGRESS_EVERY == 0 { + info!(logger, "calculating hard query"; + "n" => %n.thousands_sep(), + "n_written" => %n_written.thousands_sep(), + "ratios.p5" => ratios.p5, + "ratios.p15" => ratios.p15, + "ratios.p60" => ratios.p60, + ); + } + } + + info!(logger, "finished with hard query"); + + Ok(n) +} + fn run(start: Instant, logger: &slog::Logger) -> Result { let opt = Opt::from_args(); @@ -202,6 +407,8 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { let mut wtr = csv::Writer::from_writer(wtr); + if opt.hard_mode { return hard_mode(rdr, wtr, &logger) } + wtr.write_record(&[ "time", "ratio", diff --git a/src/lib.rs b/src/lib.rs index 413e675..fb2aff3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod windows; #[allow(unused)] #[cfg(test)] diff --git a/src/windows.rs b/src/windows.rs index 5061c59..db391a2 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1 +1,99 @@ use std::collections::VecDeque; + +#[derive(Debug, Clone)] +pub struct WeightedPoint { + pub time: u64, + /// value * weight. + /// + /// when purging expired items, do not subtract `wt_val * wt`, as `wt_val` + /// has already been multiplied by `wt`. Instead, simply substract `wt_val` + /// from `w_sum`. + pub wt_val: f64, + pub wt: f64, +} + +#[derive(Clone)] +pub struct WeightedAvgWindow { + size: u64, + items: VecDeque, + w_sum: f64, + sum_w: f64, + //w_mean: f64, +} + +impl WeightedAvgWindow { + pub fn new(size: u64) -> Self { + Self { + size, + items: Default::default(), + w_sum: 0.0, + sum_w: 0.0, + } + } + + /// Removes expired items and updates incremental calculations. + /// + /// Returns `true` if any items were removed. + pub fn purge(&mut self, time: u64) -> bool { + let mut n_remove = 0; + + { + let items = &self.items; + let w_sum = &mut self.w_sum; + let sum_w = &mut self.sum_w; + let size = self.size; + + for expired in items.iter().take_while(|x| time - x.time > size) { + *w_sum -= expired.wt_val; + *sum_w -= expired.wt; + n_remove += 1; + } + } + + for _ in 0..n_remove { self.items.pop_front(); } + + // when items is empty, set w_sum, sum_w to 0.0 + let zeroer: f64 = ( ! self.items.is_empty()) as u8 as f64; + self.w_sum *= zeroer; + self.sum_w *= zeroer; + + n_remove > 0 + } + + /// Add a new item, updating incremental calculations in the process. + pub fn push(&mut self, time: u64, val: f64, wt: f64) { + let wt_val: f64 = val * wt; + self.w_sum += wt_val; + self.sum_w += wt; + self.items.push_back(WeightedPoint { time, wt_val, wt }); + } + + /// Calculate the weighted mean from current state of incremental + /// accumulators. + /// + /// Note; this value is not cached. + pub fn wt_mean(&self) -> f64 { + self.w_sum / self.sum_w + } + + /// Checks whether items `is_empty` before trying to calculate. + /// Returns None if items is empty. + pub fn checked_wt_mean(&self) -> Option { + match self.is_empty() { + true => None, + false => Some(self.w_sum / self.sum_w), + } + } + + /// Purge, push and get `checked_wt_mean`, all in one convenient step. + pub fn update(&mut self, time: u64, val: f64, wt: f64) -> Option { + self.purge(time); + self.push(time, val, wt); + self.checked_wt_mean() + } + + pub fn len(&self) -> usize { self.items.len() } + + pub fn is_empty(&self) -> bool { self.items.is_empty() } +} +