From 0713744d128c037685209741da4f296b8c68d7d1 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 12 Apr 2018 04:35:13 -0400 Subject: [PATCH] middle of the night debugging I copied several Window structs from windows crate to avoid a cyclic dependency. The rationale was, latency mod is on its way out anyway. --- Cargo.toml | 3 +- src/latency.rs | 383 +++++++++++++++++++++++++++++++++++++++++++++++-- src/lib.rs | 3 +- 3 files changed, 373 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2af1645..d8a8668 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,12 +26,13 @@ uuid = { version = "0.5", features = ["serde", "v4"] } hdrhistogram = "6" slog-async = "2" smallvec = "0.6" +num = "0.1" sloggers = { path = "../sloggers" } decimal = { path = "../decimal", version = "2" } -windows = { path = "../windows", version = "0.2" } +#windows = { path = "../windows", version = "0.2" } money = { path = "../money", version = "0.2" } pubsub = { path = "../pubsub" } diff --git a/src/latency.rs b/src/latency.rs index 5a387be..05b6726 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -7,12 +7,14 @@ use chrono::{self, DateTime, Utc}; use pub_sub::PubSub; use sloggers::types::Severity; -use windows::{DurationWindow, Incremental, Window}; +//use windows::{DurationWindow, Incremental, Window}; use money::{Ticker, Side, Exchange}; use super::file_logger; use influx::{self, OwnedMeasurement, OwnedValue}; +use self::windows::Incremental; + pub type Nanos = u64; pub const SECOND: u64 = 1e9 as u64; @@ -83,6 +85,359 @@ pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) -> fmt::Result { } } +#[doc(hide)] +mod windows { + use super::*; + use std::ops::{Div, Mul, Sub, SubAssign, AddAssign}; + use std::collections::VecDeque; + use num::Float; + + const INITIAL_CAPACITY: usize = 1000; + + #[derive(Clone, Debug)] + pub struct Point + //where T: Default + { + time: Instant, + value: T + } + + #[derive(Debug, Clone)] + pub struct Window + where T: Default + { + pub size: Duration, // window size + mean: T, + ps: T, + psa: T, + var: T, + sum: T, + count: u32, + items: VecDeque>, + } + + #[derive(Default)] + pub struct DurationWindow { + pub size: Duration, + mean: Duration, + sum: Duration, + count: u32, + items: VecDeque> + } + + impl Point + where T: Default + Copy + { + fn new(time: Instant, value: T) -> Self { + Point { time, value } + } + + fn value(&self) -> T { + self.value + } + } + + impl Window + where T: Default + Zero + { + pub fn new(size: Duration) -> Self { + Window { + size, + mean: T::default(), + psa: T::default(), + ps: T::default(), + sum: T::default(), + count: 0, + var: T::default(), + items: VecDeque::with_capacity(INITIAL_CAPACITY), + } + } + + pub fn with_size_and_capacity(size: Duration, capacity: usize) -> Self { + Window { + size, + mean: T::default(), + psa: T::default(), + ps: T::default(), + sum: T::default(), + count: 0, + var: T::default(), + items: VecDeque::with_capacity(capacity), + } + } + } + + impl From for Window + where T: Default + Zero + { + fn from(size: Duration) -> Self { + Window::new(size) + } + } + + impl From for DurationWindow { + fn from(size: Duration) -> Self { + DurationWindow::new(size) + } + } + + pub trait Incremental { + /// Purge expired items. + /// + #[inline] + fn refresh(&mut self, t: Instant) -> &Self; + + /// Add a new item. + /// + #[inline] + fn add(&mut self, time: Instant, value: T); + + /// Add a new item and purge expired items. + /// + #[inline] + fn update(&mut self, time: Instant, value: T) { + self.refresh(time); + self.add(time, value); + } + } + + pub trait Zero { + fn zero() -> Self; + } + + pub trait One { + fn one() -> Self; + } + + macro_rules! zero { + ($t:ty, $body:expr) => { + + impl Zero for $t { + fn zero() -> $t { $body } + } + } + } + + macro_rules! one { + ($t:ty, $body:expr) => { + + impl One for $t { + fn one() -> $t { $body } + } + } + } + + zero!(f64, 0.0); + zero!(f32, 0.0); + zero!(u128, 0); + zero!(i128, 0); + zero!(u64, 0); + zero!(i64, 0); + zero!(i32, 0); + zero!(u32, 0); + zero!(u16, 0); + one!(f64, 1.0); + one!(f32, 1.0); + one!(u128, 1); + one!(i128, 1); + one!(u64, 1); + one!(i64, 1); + one!(i32, 1); + one!(u32, 1); + one!(u16, 1); + + impl Incremental for Window + where T: Default + AddAssign + SubAssign + From + Div + + Mul + Sub + Copy + { + #[inline] + fn refresh(&mut self, t: Instant) -> &Self { + if !self.items.is_empty() { + let (n_remove, sum, ps, count) = + self.items.iter() + .take_while(|x| t - x.time > self.size) + .fold((0, self.sum, self.ps, self.count), |(n_remove, sum, ps, count), x| { + (n_remove + 1, sum - x.value, ps - x.value * x.value, count - 1) + }); + self.sum = sum; + self.ps = ps; + self.count = count; + for _ in 0..n_remove { + self.items.pop_front(); + } + } + + if self.count > 0 { + self.mean = self.sum / self.count.into(); + self.psa = self.ps / self.count.into(); + let c: T = self.count.into(); + self.var = (self.psa * c - c * self.mean * self.mean) / c; + } + self + } + + /// Creates `Point { time, value }` and pushes to `self.items`. + /// + #[inline] + fn add(&mut self, time: Instant, value: T) { + let p = Point::new(time, value); + self.sum += p.value; + self.ps += p.value * p.value; + self.count += 1; + self.items.push_back(p); + } + + #[inline] + fn update(&mut self, time: Instant, value: T) { + self.add(time, value); + self.refresh(time); + } + } + + impl Incremental for DurationWindow { + #[inline] + fn refresh(&mut self, t: Instant) -> &Self { + if !self.items.is_empty() { + let (n_remove, sum, count) = + self.items.iter() + .take_while(|x| t - x.time > self.size) + .fold((0, self.sum, self.count), |(n_remove, sum, count), x| { + (n_remove + 1, sum - x.value, count - 1) + }); + self.sum = sum; + self.count = count; + for _ in 0..n_remove { + self.items.pop_front(); + } + } + + if self.count > 0 { + self.mean = self.sum / self.count.into(); + } + + self + } + + #[inline] + fn add(&mut self, time: Instant, value: Duration) { + let p = Point::new(time, value); + self.sum += p.value; + self.count += 1; + self.items.push_back(p); + } + } + + + impl Window + where T: Default + Copy + { + pub fn mean(&self) -> T { self.mean } + pub fn var(&self) -> T { self.var } + pub fn psa(&self) -> T { self.psa } + pub fn ps(&self) -> T { self.ps } + pub fn count(&self) -> u32 { self.count } + pub fn len(&self) -> usize { self.items.len() } + pub fn is_empty(&self) -> bool { self.items.is_empty() } + + /// Returns the `Duration` between `t` and the first `Point` in `self.items`. + /// + /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. + /// + /// # Panics + /// + /// This function will panic if `t` is earlier than the first `Point`'s `Instant`. + /// + #[inline] + pub fn elapsed(&self, t: Instant) -> Duration { + self.items.front() + .map(|p| { + t - p.time + }).unwrap_or_else(|| Duration::new(0, 0)) + } + } + + impl Window + where T: Float + Default + { + #[inline] + pub fn std(&self) -> T { self.var.sqrt() } + } + + impl DurationWindow { + pub fn new(size: Duration) -> Self { DurationWindow { size, ..Default::default() } } + pub fn mean(&self) -> Duration { self.mean } + pub fn count(&self) -> u32 { self.count } + pub fn len(&self) -> usize { self.items.len() } + pub fn is_empty(&self) -> bool { self.items.is_empty() } + + #[inline] + pub fn nanos(d: Duration) -> u64 { d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) } + + /// Returns number of microseconds as `u32` if `d <= Duration::new(4_294, 967_295_000)`. + /// + /// Any duration above ~4,295 seconds as micros is larger than `u32::MAX`. 4,295 seconds + /// is about 71.5 minutes. + /// + /// # Examples + /// + /// ``` + /// use windows::DurationWindow; + /// use std::time::Duration; + /// + /// assert_eq!(DurationWindow::micros(Duration::new(1, 0)), Some(1_000_000)); + /// assert_eq!(DurationWindow::micros(Duration::new(4_295, 0)), None); + /// ``` + /// + #[inline] + pub fn micros(d: Duration) -> Option { + if d <= Duration::new(4_294, 967_295_000) { + Some((d.as_secs() * 1_000_000) as u32 + d.subsec_nanos() / 1_000u32) + } else { + None + } + } + + #[inline] + pub fn mean_nanos(&self) -> u64 { DurationWindow::nanos(self.mean()) } + + #[inline] + pub fn max(&self) -> Option { + self.items.iter() + .map(|p| p.value) + .max() + } + + #[inline] + pub fn max_nanos(&self) -> Option { + self.max() + .map(|x| DurationWindow::nanos(x)) + } + + #[inline] + pub fn first(&self) -> Option { + self.items + .front() + .map(|pt| pt.value()) + } + + /// Returns the `Duration` between `t` and the first `Point` in `self.items`. + /// + /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. + /// + /// # Panics + /// + /// This function will panic if `t` is earlier than the first `Point`'s `Instant`. + /// + #[inline] + pub fn elapsed(&self, t: Instant) -> Duration { + self.items.front() + .map(|p| { + t - p.time + }).unwrap_or_else(|| Duration::new(0, 0)) + } + } +} + #[derive(Debug)] pub enum Latency { Ws(Exchange, Ticker, Duration), @@ -215,8 +570,8 @@ impl Manager { info!(logger, "initializing"); - let mut gdax_ws = DurationWindow::new(window); - let mut gdax_trade = DurationWindow::new(window); + let mut gdax_ws = windows::DurationWindow::new(window); + let mut gdax_trade = windows::DurationWindow::new(window); let mut last = Last::default(); @@ -239,7 +594,7 @@ impl Manager { Latency::Trade(_, ticker, dur) => { gdax_trade.update(loop_time, dur); last.gdax = loop_time; - let nanos = DurationWindow::nanos(dur); + let nanos = windows::DurationWindow::nanos(dur); measurements.send( OwnedMeasurement::new("gdax_trade_api") .add_tag("ticker", ticker.as_str()) @@ -309,20 +664,20 @@ impl LatencyManager { info!(logger, "initializing zmq"); info!(logger, "initializing DurationWindows"); - let mut gdax_ws = DurationWindow::new(d); - let mut gdax_priv = DurationWindow::new(d); - let mut krkn_pub = DurationWindow::new(d); - let mut krkn_priv = DurationWindow::new(d); - let mut plnx_pub = DurationWindow::new(d); - let mut plnx_priv = DurationWindow::new(d); - let mut plnx_order = DurationWindow::new(d); - let mut plnx_ws_count: Window = Window::new(d); + let mut gdax_ws = windows::DurationWindow::new(d); + let mut gdax_priv = windows::DurationWindow::new(d); + let mut krkn_pub = windows::DurationWindow::new(d); + let mut krkn_priv = windows::DurationWindow::new(d); + let mut plnx_pub = windows::DurationWindow::new(d); + let mut plnx_priv = windows::DurationWindow::new(d); + let mut plnx_order = windows::DurationWindow::new(d); + let mut plnx_ws_count: windows::Window = windows::Window::new(d); // yes I am intentionally breaking from the hard-typed duration // window ... that was a stupid idea // - let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30)); - let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300)); + let mut krkn_trade_30 = windows::DurationWindow::new(Duration::from_secs(30)); + let mut krkn_trade_300 = windows::DurationWindow::new(Duration::from_secs(300)); let mut last = Last::default(); diff --git a/src/lib.rs b/src/lib.rs index 73096c9..59fb10b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,8 +23,9 @@ extern crate decimal; extern crate uuid; extern crate hdrhistogram; extern crate smallvec; +extern crate num; -extern crate windows; +//extern crate windows; extern crate pubsub as pub_sub; use chrono::{DateTime, Utc};