diff --git a/src/latency.rs b/src/latency.rs index 17803ba..034d00d 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -5,7 +5,7 @@ use std::collections::VecDeque; use std::fmt::{self, Display, Error as FmtError, Formatter, Write}; use std::time::{Instant, Duration}; -use chrono::{DateTime, Utc, TimeZone}; +use chrono::{self, DateTime, Utc, TimeZone}; use pub_sub::PubSub; use zmq; use influent::measurement::{Measurement, Value}; @@ -58,6 +58,14 @@ pub fn tfmt_dur(d: Duration) -> String { tfmt(nanos(d)) } +pub fn tfmt_dt(dt: DateTime) -> String { + Utc::now().signed_duration_since(dt) + .to_std() + .map(|dur| { + tfmt_dur(dur) + }).unwrap_or("?".into()) +} + pub fn tfmt_write(ns: Nanos, f: &mut Formatter) { match ns { @@ -142,12 +150,11 @@ impl MeasurementWindow for WTen { fn duration(&self) -> Duration { Duration::from_secs(10) } } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct LatencyUpdate where W: MeasurementWindow { pub gdax_ws: Nanos, - //pub gdax_ws_nolock: Nanos, pub krkn_pub: Nanos, pub krkn_priv: Nanos, pub plnx_pub: Nanos, @@ -159,11 +166,39 @@ pub struct LatencyUpdate pub krkn_trade_300_mean: Nanos, pub krkn_trade_300_max: Nanos, + pub plnx_last: DateTime, + pub krkn_last: DateTime, + //pub event_loop: Nanos, pub size: W, } +impl Default for LatencyUpdate + where W: MeasurementWindow + Default +{ + fn default() -> Self { + LatencyUpdate { + gdax_ws: Nanos::default(), + krkn_pub: Nanos::default(), + krkn_priv: Nanos::default(), + plnx_pub: Nanos::default(), + plnx_priv: Nanos::default(), + plnx_order: Nanos::default(), + krkn_trade_30_mean: Nanos::default(), + krkn_trade_30_max: Nanos::default(), + + krkn_trade_300_mean: Nanos::default(), + krkn_trade_300_max: Nanos::default(), + + plnx_last: Utc::now(), + krkn_last: Utc::now(), + + size: W::default() + } + } +} + impl Display for LatencyUpdate where W: MeasurementWindow { @@ -217,6 +252,29 @@ pub struct LatencyManager thread: Option>, } +/// returns a DateTime equal to now - `dur` +/// +pub fn dt_from_dur(dur: Duration) -> DateTime { + let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64); + Utc::now() - old_dur +} + +struct Last { + broadcast: Instant, + plnx: Instant, + krkn: Instant, +} + +impl Default for Last { + fn default() -> Self { + Last { + broadcast: Instant::now(), + plnx: Instant::now(), + krkn: Instant::now(), + } + } +} + //impl LatencyManager { impl LatencyManager { pub fn new(w: WTen) -> Self { @@ -227,6 +285,7 @@ impl LatencyManager { let w = w.clone(); let thread = Some(thread::spawn(move || { + let ctx = zmq::Context::new(); let socket = influx::push(&ctx).unwrap(); let mut buf = String::with_capacity(4096); @@ -239,6 +298,7 @@ impl LatencyManager { let mut plnx_priv = DurationWindow::new(w.duration()); let mut plnx_order = DurationWindow::new(w.duration()); + // yes I am intentionally breaking from the hard-typed duration // window ... that was a stupid idea // @@ -246,26 +306,55 @@ impl LatencyManager { let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300)); //let mut gdax_ws_nolock = DurationWindow::new(w.duration()); //let mut event_loop = DurationWindow::new(w.duration()); - let mut last_broadcast = Instant::now(); + + let mut last = Last::default(); + + thread::sleep_ms(1); + loop { + let loop_time = Instant::now(); + if let Ok(msg) = rx.recv() { match msg { ExperiencedLatency::Terminate => { //println!("latency manager terminating"); break; } - ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(Instant::now(), d), - //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(Instant::now(), d), - ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(Instant::now(), d), - ExperiencedLatency::KrknHttpPublic(d) => krkn_pub.update(Instant::now(), d), - ExperiencedLatency::KrknHttpPrivate(d) => krkn_priv.update(Instant::now(), d), - ExperiencedLatency::PlnxHttpPublic(d) => plnx_pub.update(Instant::now(), d), - ExperiencedLatency::PlnxHttpPrivate(d) => plnx_priv.update(Instant::now(), d), - ExperiencedLatency::PlnxOrderBook(d) => plnx_order.update(Instant::now(), d), + + ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d), + //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(loop_time, d), + ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d), + + ExperiencedLatency::KrknHttpPublic(d) => { + last.krkn = loop_time; + krkn_pub.update(loop_time, d) + } + + ExperiencedLatency::KrknHttpPrivate(d) => { + last.krkn = loop_time; + krkn_priv.update(loop_time, d) + } + + ExperiencedLatency::PlnxHttpPublic(d) => { + last.plnx = loop_time; + plnx_pub.update(loop_time, d) + } + + ExperiencedLatency::PlnxHttpPrivate(d) => { + last.plnx = loop_time; + plnx_priv.update(loop_time, d) + } + + ExperiencedLatency::PlnxOrderBook(d) => { + last.plnx = loop_time; + plnx_order.update(loop_time, d) + } + ExperiencedLatency::KrknTrade(d) => { + last.krkn = loop_time; let n = DurationWindow::nanos(d); - krkn_trade_30.update(Instant::now(), d); - krkn_trade_300.update(Instant::now(), d); + krkn_trade_30.update(loop_time, d); + krkn_trade_300.update(loop_time, d); let mut m = Measurement::new("krkn_trade_api"); m.add_field("nanos", Value::Integer(n as i64)); m.set_timestamp(now()); @@ -278,18 +367,21 @@ impl LatencyManager { } } - if Instant::now() - last_broadcast > Duration::from_millis(100) { - let now = Instant::now(); - krkn_trade_30.refresh(&now); - krkn_trade_300.refresh(&now); + if loop_time - last.broadcast > Duration::from_millis(100) { + // note - because we mutated the Window instances + // above, we need a fresh Instant to avoid less than other + // panic + // + krkn_trade_30.refresh(&loop_time); + krkn_trade_300.refresh(&loop_time); let update = LatencyUpdate { - gdax_ws: gdax_ws.refresh(&now).mean_nanos(), - //gdax_ws_nolock: gdax_ws_nolock.refresh(&now).mean_nanos(), - krkn_pub: krkn_pub.refresh(&now).mean_nanos(), - krkn_priv: krkn_priv.refresh(&now).mean_nanos(), - plnx_pub: plnx_pub.refresh(&now).mean_nanos(), - plnx_priv: plnx_priv.refresh(&now).mean_nanos(), - plnx_order: plnx_order.refresh(&now).mean_nanos(), + gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(), + //gdax_ws_nolock: gdax_ws_nolock.refresh(&loop_time).mean_nanos(), + krkn_pub: krkn_pub.refresh(&loop_time).mean_nanos(), + krkn_priv: krkn_priv.refresh(&loop_time).mean_nanos(), + plnx_pub: plnx_pub.refresh(&loop_time).mean_nanos(), + plnx_priv: plnx_priv.refresh(&loop_time).mean_nanos(), + plnx_order: plnx_order.refresh(&loop_time).mean_nanos(), krkn_trade_30_mean: krkn_trade_30.mean_nanos(), krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0), @@ -297,11 +389,14 @@ impl LatencyManager { krkn_trade_300_mean: krkn_trade_300.mean_nanos(), krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0), + plnx_last: dt_from_dur(loop_time - last.plnx), + krkn_last: dt_from_dur(loop_time - last.krkn), + //event_loop: event_loop.refresh(&now).mean_nanos(), size: w.clone(), }; channel.send(update); - last_broadcast = now; + last.broadcast = loop_time; } } }));