#![allow(unused)] use std::thread::{self, JoinHandle}; use std::sync::mpsc::{Sender, channel}; use std::fmt; use std::time::{Instant, Duration}; use chrono::{self, DateTime, Utc}; use pub_sub::PubSub; use sloggers::types::Severity; //use windows::{DurationWindow, Incremental, Window}; use money::{Ticker, Side, Exchange}; use super::file_logger; use influx::{self, OwnedMeasurement, OwnedValue}; use self::windows::Incremental; pub type Nanos = u64; pub const SECOND: u64 = 1e9 as u64; pub const MINUTE: u64 = SECOND * 60; pub const HOUR: u64 = MINUTE * 60; pub const MILLISECOND: u64 = SECOND / 1000; pub const MICROSECOND: u64 = MILLISECOND / 1000; pub fn nanos(d: Duration) -> Nanos { d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) } pub fn dt_nanos(t: DateTime) -> i64 { (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64) } pub fn now() -> i64 { dt_nanos(Utc::now()) } pub fn tfmt(ns: Nanos) -> String { match ns { t if t <= MICROSECOND => { format!("{}ns", t) } t if t > MICROSECOND && t < MILLISECOND => { format!("{}u", t / MICROSECOND) } t if t > MILLISECOND && t < SECOND => { format!("{}ms", t / MILLISECOND) } t => { format!("{}.{}sec", t / SECOND, t / MILLISECOND) } } } pub fn tfmt_dur(d: Duration) -> String { tfmt(nanos(d)) } pub fn tfmt_dt(dt: DateTime) -> String { Utc::now().signed_duration_since(dt) .to_std() .map(|dur| { tfmt_dur(dur) }).unwrap_or("?".into()) } pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) -> fmt::Result { match ns { t if t <= MICROSECOND => { write!(f, "{}ns", t) } t if t > MICROSECOND && t < MILLISECOND => { write!(f, "{}u", t / MICROSECOND) } t if t > MILLISECOND && t < SECOND => { write!(f, "{}ms", t / MILLISECOND) } t => { write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND) } } } #[doc(hide)] mod windows { use super::*; use std::ops::{Div, Mul, Sub, SubAssign, AddAssign}; use std::collections::VecDeque; use num::Float; const INITIAL_CAPACITY: usize = 1000; #[derive(Clone, Debug)] pub struct Point //where T: Default { time: Instant, value: T } #[derive(Debug, Clone)] pub struct Window where T: Default { pub size: Duration, // window size mean: T, ps: T, psa: T, var: T, sum: T, count: u32, items: VecDeque>, } #[derive(Default)] pub struct DurationWindow { pub size: Duration, mean: Duration, sum: Duration, count: u32, items: VecDeque> } impl Point where T: Default + Copy { fn new(time: Instant, value: T) -> Self { Point { time, value } } fn value(&self) -> T { self.value } } impl Window where T: Default + Zero { pub fn new(size: Duration) -> Self { Window { size, mean: T::default(), psa: T::default(), ps: T::default(), sum: T::default(), count: 0, var: T::default(), items: VecDeque::with_capacity(INITIAL_CAPACITY), } } pub fn with_size_and_capacity(size: Duration, capacity: usize) -> Self { Window { size, mean: T::default(), psa: T::default(), ps: T::default(), sum: T::default(), count: 0, var: T::default(), items: VecDeque::with_capacity(capacity), } } } impl From for Window where T: Default + Zero { fn from(size: Duration) -> Self { Window::new(size) } } impl From for DurationWindow { fn from(size: Duration) -> Self { DurationWindow::new(size) } } pub trait Incremental { /// Purge expired items. /// #[inline] fn refresh(&mut self, t: Instant) -> &Self; /// Add a new item. /// #[inline] fn add(&mut self, time: Instant, value: T); /// Add a new item and purge expired items. /// #[inline] fn update(&mut self, time: Instant, value: T) { self.refresh(time); self.add(time, value); } } pub trait Zero { fn zero() -> Self; } pub trait One { fn one() -> Self; } macro_rules! zero { ($t:ty, $body:expr) => { impl Zero for $t { fn zero() -> $t { $body } } } } macro_rules! one { ($t:ty, $body:expr) => { impl One for $t { fn one() -> $t { $body } } } } zero!(f64, 0.0); zero!(f32, 0.0); zero!(u128, 0); zero!(i128, 0); zero!(u64, 0); zero!(i64, 0); zero!(i32, 0); zero!(u32, 0); zero!(u16, 0); one!(f64, 1.0); one!(f32, 1.0); one!(u128, 1); one!(i128, 1); one!(u64, 1); one!(i64, 1); one!(i32, 1); one!(u32, 1); one!(u16, 1); impl Incremental for Window where T: Default + AddAssign + SubAssign + From + Div + Mul + Sub + Copy { #[inline] fn refresh(&mut self, t: Instant) -> &Self { if !self.items.is_empty() { let (n_remove, sum, ps, count) = self.items.iter() .take_while(|x| t - x.time > self.size) .fold((0, self.sum, self.ps, self.count), |(n_remove, sum, ps, count), x| { (n_remove + 1, sum - x.value, ps - x.value * x.value, count - 1) }); self.sum = sum; self.ps = ps; self.count = count; for _ in 0..n_remove { self.items.pop_front(); } } if self.count > 0 { self.mean = self.sum / self.count.into(); self.psa = self.ps / self.count.into(); let c: T = self.count.into(); self.var = (self.psa * c - c * self.mean * self.mean) / c; } self } /// Creates `Point { time, value }` and pushes to `self.items`. /// #[inline] fn add(&mut self, time: Instant, value: T) { let p = Point::new(time, value); self.sum += p.value; self.ps += p.value * p.value; self.count += 1; self.items.push_back(p); } #[inline] fn update(&mut self, time: Instant, value: T) { self.add(time, value); self.refresh(time); } } impl Incremental for DurationWindow { #[inline] fn refresh(&mut self, t: Instant) -> &Self { if !self.items.is_empty() { let (n_remove, sum, count) = self.items.iter() .take_while(|x| t - x.time > self.size) .fold((0, self.sum, self.count), |(n_remove, sum, count), x| { (n_remove + 1, sum - x.value, count - 1) }); self.sum = sum; self.count = count; for _ in 0..n_remove { self.items.pop_front(); } } if self.count > 0 { self.mean = self.sum / self.count.into(); } self } #[inline] fn add(&mut self, time: Instant, value: Duration) { let p = Point::new(time, value); self.sum += p.value; self.count += 1; self.items.push_back(p); } } impl Window where T: Default + Copy { pub fn mean(&self) -> T { self.mean } pub fn var(&self) -> T { self.var } pub fn psa(&self) -> T { self.psa } pub fn ps(&self) -> T { self.ps } pub fn count(&self) -> u32 { self.count } pub fn len(&self) -> usize { self.items.len() } pub fn is_empty(&self) -> bool { self.items.is_empty() } /// Returns the `Duration` between `t` and the first `Point` in `self.items`. /// /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. /// /// # Panics /// /// This function will panic if `t` is earlier than the first `Point`'s `Instant`. /// #[inline] pub fn elapsed(&self, t: Instant) -> Duration { self.items.front() .map(|p| { t - p.time }).unwrap_or_else(|| Duration::new(0, 0)) } } impl Window where T: Float + Default { #[inline] pub fn std(&self) -> T { self.var.sqrt() } } impl DurationWindow { pub fn new(size: Duration) -> Self { DurationWindow { size, ..Default::default() } } pub fn mean(&self) -> Duration { self.mean } pub fn count(&self) -> u32 { self.count } pub fn len(&self) -> usize { self.items.len() } pub fn is_empty(&self) -> bool { self.items.is_empty() } #[inline] pub fn nanos(d: Duration) -> u64 { d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) } #[inline] pub fn micros(d: Duration) -> Option { if d <= Duration::new(4_294, 967_295_000) { Some((d.as_secs() * 1_000_000) as u32 + d.subsec_nanos() / 1_000u32) } else { None } } #[inline] pub fn mean_nanos(&self) -> u64 { DurationWindow::nanos(self.mean()) } #[inline] pub fn max(&self) -> Option { self.items.iter() .map(|p| p.value) .max() } #[inline] pub fn max_nanos(&self) -> Option { self.max() .map(|x| DurationWindow::nanos(x)) } #[inline] pub fn first(&self) -> Option { self.items .front() .map(|pt| pt.value()) } /// Returns the `Duration` between `t` and the first `Point` in `self.items`. /// /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. /// /// # Panics /// /// This function will panic if `t` is earlier than the first `Point`'s `Instant`. /// #[inline] pub fn elapsed(&self, t: Instant) -> Duration { self.items.front() .map(|p| { t - p.time }).unwrap_or_else(|| Duration::new(0, 0)) } } } #[derive(Debug)] pub enum Latency { Ws(Exchange, Ticker, Duration), Http(Exchange, Duration), Trade(Exchange, Ticker, Duration), Terminate } #[derive(Debug)] pub enum ExperiencedLatency { GdaxWebsocket(Duration), GdaxHttpPublic(Duration), GdaxHttpPrivate(Duration), PlnxHttpPublic(Duration), PlnxHttpPrivate(Duration), PlnxOrderBook(Duration), KrknHttpPublic(Duration), KrknHttpPrivate(Duration), KrknTrade(Duration, &'static str, Option, Option), PlnxWs(Ticker), Terminate } #[derive(Debug, Clone)] pub struct Update { pub gdax_ws: Nanos, pub gdax_trade: Nanos, pub gdax_last: DateTime } impl Default for Update { fn default() -> Self { Update { gdax_ws: 0, gdax_trade: 0, gdax_last: Utc::now(), } } } #[derive(Debug, Clone)] pub struct LatencyUpdate { pub gdax_ws: Nanos, pub krkn_pub: Nanos, pub krkn_priv: Nanos, pub plnx_pub: Nanos, pub plnx_priv: Nanos, pub plnx_order: Nanos, pub krkn_trade_30_mean: Nanos, pub krkn_trade_30_max: Nanos, pub krkn_trade_300_mean: Nanos, pub krkn_trade_300_max: Nanos, pub plnx_last: DateTime, pub krkn_last: DateTime, pub plnx_ws_count: u64, } impl Default for LatencyUpdate { fn default() -> Self { LatencyUpdate { gdax_ws : 0, krkn_pub : 0, krkn_priv : 0, plnx_pub : 0, plnx_priv : 0, plnx_order : 0, krkn_trade_30_mean : 0, krkn_trade_30_max : 0, krkn_trade_300_mean : 0, krkn_trade_300_max : 0, plnx_ws_count : 0, plnx_last : Utc::now(), krkn_last : Utc::now(), } } } pub struct Manager { pub tx: Sender, pub channel: PubSub, thread: Option>, } pub struct LatencyManager { pub tx: Sender, pub channel: PubSub, thread: Option>, } /// returns a DateTime equal to now - `dur` /// pub fn dt_from_dur(dur: Duration) -> DateTime { let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64); Utc::now() - old_dur } struct Last { broadcast: Instant, plnx: Instant, krkn: Instant, gdax: Instant, } impl Default for Last { fn default() -> Self { Last { broadcast: Instant::now(), plnx: Instant::now(), krkn: Instant::now(), gdax: Instant::now(), } } } impl Manager { pub fn new(window: Duration, log_path: &'static str, measurements: Sender) -> Self { let (tx, rx) = channel(); let channel = PubSub::new(); let channel_copy = channel.clone(); let logger = file_logger(log_path, Severity::Info); info!(logger, "initializing"); let mut gdax_ws = windows::DurationWindow::new(window); let mut gdax_trade = windows::DurationWindow::new(window); let mut last = Last::default(); info!(logger, "entering loop"); let thread = Some(thread::spawn(move || { loop { let loop_time = Instant::now(); if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1)) { debug!(logger, "rcvd {:?}", msg); match msg { Latency::Ws(_, _, dur) => { gdax_ws.update(loop_time, dur); last.gdax = loop_time; } Latency::Trade(_, ticker, dur) => { gdax_trade.update(loop_time, dur); last.gdax = loop_time; let nanos = windows::DurationWindow::nanos(dur); measurements.send( OwnedMeasurement::new("gdax_trade_api") .add_tag("ticker", ticker.as_str()) .add_field("nanos", OwnedValue::Integer(nanos as i64)) .set_timestamp(influx::now())).unwrap(); } Latency::Terminate => break, _ => {} } } if loop_time - last.broadcast > Duration::from_millis(100) { debug!(logger, "initalizing broadcast"); let update = Update { gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(), gdax_trade: gdax_trade.refresh(loop_time).mean_nanos(), gdax_last: dt_from_dur(loop_time - last.gdax) }; channel.send(update).unwrap(); last.broadcast = loop_time; debug!(logger, "sent broadcast"); } } debug!(logger, "latency manager terminating"); })); Manager { tx, channel: channel_copy, thread, } } } impl Drop for LatencyManager { fn drop(&mut self) { for _ in 0..100 { self.tx.send(ExperiencedLatency::Terminate).unwrap(); } if let Some(thread) = self.thread.take() { let _ = thread.join(); } } } impl Drop for Manager { fn drop(&mut self) { for _ in 0..100 { self.tx.send(Latency::Terminate).unwrap(); } if let Some(thread) = self.thread.take() { let _ = thread.join(); } } } impl LatencyManager { pub fn new(d: Duration) -> Self { let (tx, rx) = channel(); let tx_copy = tx.clone(); let channel = PubSub::new(); let channel_copy = channel.clone(); //let w = w.clone(); let thread = Some(thread::spawn(move || { let logger = file_logger("var/log/latency-manager.log", Severity::Info); info!(logger, "initializing DurationWindows"); let mut gdax_ws = windows::DurationWindow::new(d); let mut gdax_priv = windows::DurationWindow::new(d); let mut krkn_pub = windows::DurationWindow::new(d); let mut krkn_priv = windows::DurationWindow::new(d); let mut plnx_pub = windows::DurationWindow::new(d); let mut plnx_priv = windows::DurationWindow::new(d); let mut plnx_order = windows::DurationWindow::new(d); let mut plnx_ws_count: windows::Window = windows::Window::new(d); // yes I am intentionally breaking from the hard-typed duration // window ... that was a stupid idea // let mut krkn_trade_30 = windows::DurationWindow::new(Duration::from_secs(30)); let mut krkn_trade_300 = windows::DurationWindow::new(Duration::from_secs(300)); let mut last = Last::default(); thread::sleep(Duration::from_millis(1)); info!(logger, "entering loop"); loop { let loop_time = Instant::now(); if let Ok(msg) = rx.recv() { debug!(logger, "new msg: {:?}", msg); match msg { ExperiencedLatency::Terminate => { crit!(logger, "terminating"); break; } ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d), ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d), ExperiencedLatency::KrknHttpPublic(d) => { last.krkn = loop_time; krkn_pub.update(loop_time, d) } ExperiencedLatency::KrknHttpPrivate(d) => { last.krkn = loop_time; krkn_priv.update(loop_time, d) } ExperiencedLatency::PlnxHttpPublic(d) => { last.plnx = loop_time; plnx_pub.update(loop_time, d) } ExperiencedLatency::PlnxHttpPrivate(d) => { last.plnx = loop_time; plnx_priv.update(loop_time, d) } ExperiencedLatency::PlnxOrderBook(d) => { last.plnx = loop_time; plnx_order.update(loop_time, d) } ExperiencedLatency::PlnxWs(_) => { last.plnx = loop_time; plnx_ws_count.update(loop_time, 1_u32); } ExperiencedLatency::KrknTrade(d, cmd, _, _) => { debug!(logger, "new KrknTrade"; "cmd" => cmd); last.krkn = loop_time; krkn_trade_30.update(loop_time, d); krkn_trade_300.update(loop_time, d); } other => { warn!(logger, "unexpected msg: {:?}", other); } } } if loop_time - last.broadcast > Duration::from_millis(100) { debug!(logger, "initalizing broadcast"); // note - because we mutated the Window instances // above, we need a fresh Instant to avoid less than other // panic // krkn_trade_30.refresh(loop_time); krkn_trade_300.refresh(loop_time); let update = LatencyUpdate { gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(), krkn_pub: krkn_pub.refresh(loop_time).mean_nanos(), krkn_priv: krkn_priv.refresh(loop_time).mean_nanos(), plnx_pub: plnx_pub.refresh(loop_time).mean_nanos(), plnx_priv: plnx_priv.refresh(loop_time).mean_nanos(), plnx_order: plnx_order.refresh(loop_time).mean_nanos(), krkn_trade_30_mean: krkn_trade_30.mean_nanos(), krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0), krkn_trade_300_mean: krkn_trade_300.mean_nanos(), krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0), plnx_last: dt_from_dur(loop_time - last.plnx), krkn_last: dt_from_dur(loop_time - last.krkn), plnx_ws_count: plnx_ws_count.refresh(loop_time).count() as u64, }; channel.send(update).unwrap(); last.broadcast = loop_time; debug!(logger, "sent broadcast"); } } crit!(logger, "goodbye"); })); LatencyManager { tx: tx_copy, channel: channel_copy, thread } } }