From 31911bd037f1b4a20f2cfb7378f82075409ee148 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 15 Apr 2020 17:53:51 -0400 Subject: [PATCH] clean up csv --hard-mode code --- Cargo.toml | 2 + src/csv.rs | 113 +++++++++++++++++++++-------------------------------- 2 files changed, 47 insertions(+), 68 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c5d7478..421c946 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,8 @@ slog-term = "2" pretty_toa = "1" atoi = "0.3" lexical = "5.2" +itoa = "0.4.5" +dtoa = "0.4.5" chrono = { version = "0.4", features = ["serde"] } clap = "2" itertools-num = "0.1" diff --git a/src/csv.rs b/src/csv.rs index 70c1225..c5ed7b2 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -199,8 +199,8 @@ fn hard_mode( })?; let mut cur_bucket = trade.time - (trade.time % (ONE_SECOND * 10)) + ONE_SECOND * 10; - //let mut next_bucket = cur_bucket + ONE_SECOND * 10; - let mut last_price: f64 = NAN; + // let mut next_bucket = cur_bucket + ONE_SECOND * 10; + // let mut last_price: f64 = NAN; #[derive(Default, Clone)] struct Lookbacks { @@ -209,9 +209,6 @@ fn hard_mode( pub p60: T, } - let mut bprices: Lookbacks = Default::default(); - let mut gprices: Lookbacks = Default::default(); - let mut ratios: Lookbacks = Default::default(); let mut bmex_windows: Lookbacks = @@ -220,44 +217,30 @@ fn hard_mode( p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15), p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60), }; - let mut gdax_windows = bmex_windows.clone(); + let mut gdax_windows = bmex_windows.clone(); - #[inline(always)] - fn do_purge(windows: &mut Lookbacks, prices: &mut Lookbacks, time: u64) { - //if windows.p5.purge(time) { prices.p5 = windows.p5 .checked_weighted_mean().unwrap_or(NAN); } - //if windows.p15.purge(time) { prices.p15 = windows.p15.checked_weighted_mean().unwrap_or(NAN); } - //if windows.p60.purge(time) { prices.p60 = windows.p60.checked_weighted_mean().unwrap_or(NAN); } - windows.p5 .purge(time); - windows.p15.purge(time); - windows.p60.purge(time); - } + //let mut row_buffers: [Vec; 4] = [Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32), Vec::with_capacity(32)]; - #[allow(unused)] - #[inline(always)] - fn do_update(windows: &mut Lookbacks, prices: &mut Lookbacks, time: u64, price: f64, amount: f64) { - //prices.p5 = windows.p5 .update(time, price, amount).unwrap_or(NAN); - //prices.p15 = windows.p15.update(time, price, amount).unwrap_or(NAN); - //prices.p60 = windows.p60.update(time, price, amount).unwrap_or(NAN); + let mut row_buffer: Vec = Vec::with_capacity(512); - windows.p5 .push(time, price, amount); - windows.p15.push(time, price, amount); - windows.p60.push(time, price, amount); - } - macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body ($trade:ident) => {{ match $trade.exch { e!(bmex) => { - do_update(&mut bmex_windows, &mut bprices, $trade.time, $trade.price, $trade.amount); - //do_purge(&mut gdax_windows, &mut gprices, $trade.time); - last_price = $trade.price; + bmex_windows.p5 .push($trade.time, $trade.price, $trade.amount); + bmex_windows.p15.push($trade.time, $trade.price, $trade.amount); + bmex_windows.p60.push($trade.time, $trade.price, $trade.amount); + + // last_price = $trade.price; } e!(gdax) => { - do_update(&mut gdax_windows, &mut gprices, $trade.time, $trade.price, $trade.amount); - //do_purge(&mut bmex_windows, &mut bprices, $trade.time); - last_price = $trade.price; + gdax_windows.p5 .push($trade.time, $trade.price, $trade.amount); + gdax_windows.p15.push($trade.time, $trade.price, $trade.amount); + gdax_windows.p60.push($trade.time, $trade.price, $trade.amount); + + // last_price = $trade.price; } _ => {} @@ -267,25 +250,13 @@ fn hard_mode( wtr.write_record(&[ "time", - //"last", - //"bmex_5min", - //"gdax_5min", - //"n_bmex_p5", - //"n_gdax_p5", "r5", "r15", "r60", - //"n_bmex_p5", - //"n_bmex_p15", - //"n_bmex_p60", - //"n_gdax_p5", - //"n_gdax_p15", - //"n_gdax_p60", - //"gdax_p5_is_empty", - //"gdax_p5_checked_weighted_mean", - //"tradetime_minus_cur_bucket", ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?; + let mut wtr = wtr.into_inner().map_err(|e| format!("csv::Writer::into_inner failed: {}", e))?; + if trade.ticker == t!(btc-usd) { update!(trade); } let mut n = 0; @@ -313,8 +284,13 @@ fn hard_mode( "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(), ); - do_purge(&mut gdax_windows, &mut gprices, cur_bucket); - do_purge(&mut bmex_windows, &mut bprices, cur_bucket); + bmex_windows.p5 .purge(trade.time); + bmex_windows.p15.purge(trade.time); + bmex_windows.p60.purge(trade.time); + + gdax_windows.p5 .purge(trade.time); + gdax_windows.p15.purge(trade.time); + gdax_windows.p60.purge(trade.time); debug!(logger, "finished purge"; "n" => n, @@ -325,41 +301,42 @@ fn hard_mode( "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(), ); - ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean(); ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean(); ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean(); - //ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean(); - //ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean(); - //ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean(); + //row_buffers.iter_mut().for_each(|x| x.clear()); + row_buffer.clear(); + + itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b'\n'); + + wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?; + /* + wtr.write_record(&row_buffers[..]).map_err(|e| { + format!("writing csv row failed: {}", e) + })?; + */ + + /* wtr.write_record(&[ &format!("{}", cur_bucket), - //&format!("{}", last_price), - //&format!("{}", bmex_windows.p5.checked_weighted_mean().unwrap_or(NAN)), - //&format!("{}", gdax_windows.p5.checked_weighted_mean().unwrap_or(NAN)), - //&format!("{}", bmex_windows.p5.len()), - //&format!("{}", gdax_windows.p5.len()), &format!("{}", ratios.p5), &format!("{}", ratios.p15), &format!("{}", ratios.p60), - //&format!("{}", bmex_windows.p15.len()), - //&format!("{}", gdax_windows.p60.len()), - //&format!("{}", gdax_windows.p15.len()), - //&format!("{}", gdax_windows.p15.len()), - //&format!("{}", bmex_windows.p60.len()), - //&format!("{}", bmex_windows.p5.is_empty()), - //&format!("{:?}", bmex_windows.p5.checked_weighted_mean()), - //&format!("{}", trade.time - cur_bucket), - ]).map_err(|e| { format!("writing csv row failed: {}", e) })?; + */ n_written += 1; cur_bucket += ONE_SECOND * 10; - //cur_bucket = next_bucket; - //next_bucket += ONE_SECOND * 10; } if trade.ticker == t!(btc-usd) { update!(trade); }