Browse Source

middle of the night debugging

I copied several Window structs from windows crate to avoid a cyclic
dependency. The rationale was, latency mod is on its way out anyway.
master
Jonathan Strong 6 years ago
parent
commit
0713744d12
3 changed files with 373 additions and 16 deletions
  1. +2
    -1
      Cargo.toml
  2. +369
    -14
      src/latency.rs
  3. +2
    -1
      src/lib.rs

+ 2
- 1
Cargo.toml View File

@@ -26,12 +26,13 @@ uuid = { version = "0.5", features = ["serde", "v4"] }
hdrhistogram = "6" hdrhistogram = "6"
slog-async = "2" slog-async = "2"
smallvec = "0.6" smallvec = "0.6"
num = "0.1"


sloggers = { path = "../sloggers" } sloggers = { path = "../sloggers" }


decimal = { path = "../decimal", version = "2" } decimal = { path = "../decimal", version = "2" }


windows = { path = "../windows", version = "0.2" }
#windows = { path = "../windows", version = "0.2" }
money = { path = "../money", version = "0.2" } money = { path = "../money", version = "0.2" }
pubsub = { path = "../pubsub" } pubsub = { path = "../pubsub" }




+ 369
- 14
src/latency.rs View File

@@ -7,12 +7,14 @@ use chrono::{self, DateTime, Utc};
use pub_sub::PubSub; use pub_sub::PubSub;
use sloggers::types::Severity; use sloggers::types::Severity;


use windows::{DurationWindow, Incremental, Window};
//use windows::{DurationWindow, Incremental, Window};
use money::{Ticker, Side, Exchange}; use money::{Ticker, Side, Exchange};


use super::file_logger; use super::file_logger;
use influx::{self, OwnedMeasurement, OwnedValue}; use influx::{self, OwnedMeasurement, OwnedValue};


use self::windows::Incremental;

pub type Nanos = u64; pub type Nanos = u64;


pub const SECOND: u64 = 1e9 as u64; pub const SECOND: u64 = 1e9 as u64;
@@ -83,6 +85,359 @@ pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) -> fmt::Result {
} }
} }


#[doc(hide)]
mod windows {
use super::*;
use std::ops::{Div, Mul, Sub, SubAssign, AddAssign};
use std::collections::VecDeque;
use num::Float;

const INITIAL_CAPACITY: usize = 1000;

#[derive(Clone, Debug)]
pub struct Point<T>
//where T: Default
{
time: Instant,
value: T
}

#[derive(Debug, Clone)]
pub struct Window<T>
where T: Default
{
pub size: Duration, // window size
mean: T,
ps: T,
psa: T,
var: T,
sum: T,
count: u32,
items: VecDeque<Point<T>>,
}

#[derive(Default)]
pub struct DurationWindow {
pub size: Duration,
mean: Duration,
sum: Duration,
count: u32,
items: VecDeque<Point<Duration>>
}

impl<T> Point<T>
where T: Default + Copy
{
fn new(time: Instant, value: T) -> Self {
Point { time, value }
}

fn value(&self) -> T {
self.value
}
}

impl<T> Window<T>
where T: Default + Zero
{
pub fn new(size: Duration) -> Self {
Window {
size,
mean: T::default(),
psa: T::default(),
ps: T::default(),
sum: T::default(),
count: 0,
var: T::default(),
items: VecDeque::with_capacity(INITIAL_CAPACITY),
}
}

pub fn with_size_and_capacity(size: Duration, capacity: usize) -> Self {
Window {
size,
mean: T::default(),
psa: T::default(),
ps: T::default(),
sum: T::default(),
count: 0,
var: T::default(),
items: VecDeque::with_capacity(capacity),
}
}
}

impl<T> From<Duration> for Window<T>
where T: Default + Zero
{
fn from(size: Duration) -> Self {
Window::new(size)
}
}

impl From<Duration> for DurationWindow {
fn from(size: Duration) -> Self {
DurationWindow::new(size)
}
}

pub trait Incremental<T> {
/// Purge expired items.
///
#[inline]
fn refresh(&mut self, t: Instant) -> &Self;

/// Add a new item.
///
#[inline]
fn add(&mut self, time: Instant, value: T);

/// Add a new item and purge expired items.
///
#[inline]
fn update(&mut self, time: Instant, value: T) {
self.refresh(time);
self.add(time, value);
}
}

pub trait Zero {
fn zero() -> Self;
}

pub trait One {
fn one() -> Self;
}

macro_rules! zero {
($t:ty, $body:expr) => {

impl Zero for $t {
fn zero() -> $t { $body }
}
}
}

macro_rules! one {
($t:ty, $body:expr) => {

impl One for $t {
fn one() -> $t { $body }
}
}
}

zero!(f64, 0.0);
zero!(f32, 0.0);
zero!(u128, 0);
zero!(i128, 0);
zero!(u64, 0);
zero!(i64, 0);
zero!(i32, 0);
zero!(u32, 0);
zero!(u16, 0);
one!(f64, 1.0);
one!(f32, 1.0);
one!(u128, 1);
one!(i128, 1);
one!(u64, 1);
one!(i64, 1);
one!(i32, 1);
one!(u32, 1);
one!(u16, 1);

impl<T> Incremental<T> for Window<T>
where T: Default + AddAssign<T> + SubAssign<T> + From<u32> + Div<Output = T> +
Mul<Output = T> + Sub<Output = T> + Copy
{
#[inline]
fn refresh(&mut self, t: Instant) -> &Self {
if !self.items.is_empty() {
let (n_remove, sum, ps, count) =
self.items.iter()
.take_while(|x| t - x.time > self.size)
.fold((0, self.sum, self.ps, self.count), |(n_remove, sum, ps, count), x| {
(n_remove + 1, sum - x.value, ps - x.value * x.value, count - 1)
});
self.sum = sum;
self.ps = ps;
self.count = count;
for _ in 0..n_remove {
self.items.pop_front();
}
}

if self.count > 0 {
self.mean = self.sum / self.count.into();
self.psa = self.ps / self.count.into();
let c: T = self.count.into();
self.var = (self.psa * c - c * self.mean * self.mean) / c;
}
self
}

/// Creates `Point { time, value }` and pushes to `self.items`.
///
#[inline]
fn add(&mut self, time: Instant, value: T) {
let p = Point::new(time, value);
self.sum += p.value;
self.ps += p.value * p.value;
self.count += 1;
self.items.push_back(p);
}

#[inline]
fn update(&mut self, time: Instant, value: T) {
self.add(time, value);
self.refresh(time);
}
}

impl Incremental<Duration> for DurationWindow {
#[inline]
fn refresh(&mut self, t: Instant) -> &Self {
if !self.items.is_empty() {
let (n_remove, sum, count) =
self.items.iter()
.take_while(|x| t - x.time > self.size)
.fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
(n_remove + 1, sum - x.value, count - 1)
});
self.sum = sum;
self.count = count;
for _ in 0..n_remove {
self.items.pop_front();
}
}

if self.count > 0 {
self.mean = self.sum / self.count.into();
}

self
}

#[inline]
fn add(&mut self, time: Instant, value: Duration) {
let p = Point::new(time, value);
self.sum += p.value;
self.count += 1;
self.items.push_back(p);
}
}


impl<T> Window<T>
where T: Default + Copy
{
pub fn mean(&self) -> T { self.mean }
pub fn var(&self) -> T { self.var }
pub fn psa(&self) -> T { self.psa }
pub fn ps(&self) -> T { self.ps }
pub fn count(&self) -> u32 { self.count }
pub fn len(&self) -> usize { self.items.len() }
pub fn is_empty(&self) -> bool { self.items.is_empty() }

/// Returns the `Duration` between `t` and the first `Point` in `self.items`.
///
/// If there are no items, returns `Duration { secs: 0, nanos: 0 }`.
///
/// # Panics
///
/// This function will panic if `t` is earlier than the first `Point`'s `Instant`.
///
#[inline]
pub fn elapsed(&self, t: Instant) -> Duration {
self.items.front()
.map(|p| {
t - p.time
}).unwrap_or_else(|| Duration::new(0, 0))
}
}

impl<T> Window<T>
where T: Float + Default
{
#[inline]
pub fn std(&self) -> T { self.var.sqrt() }
}

impl DurationWindow {
pub fn new(size: Duration) -> Self { DurationWindow { size, ..Default::default() } }
pub fn mean(&self) -> Duration { self.mean }
pub fn count(&self) -> u32 { self.count }
pub fn len(&self) -> usize { self.items.len() }
pub fn is_empty(&self) -> bool { self.items.is_empty() }

#[inline]
pub fn nanos(d: Duration) -> u64 { d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) }

/// Returns number of microseconds as `u32` if `d <= Duration::new(4_294, 967_295_000)`.
///
/// Any duration above ~4,295 seconds as micros is larger than `u32::MAX`. 4,295 seconds
/// is about 71.5 minutes.
///
/// # Examples
///
/// ```
/// use windows::DurationWindow;
/// use std::time::Duration;
///
/// assert_eq!(DurationWindow::micros(Duration::new(1, 0)), Some(1_000_000));
/// assert_eq!(DurationWindow::micros(Duration::new(4_295, 0)), None);
/// ```
///
#[inline]
pub fn micros(d: Duration) -> Option<u32> {
if d <= Duration::new(4_294, 967_295_000) {
Some((d.as_secs() * 1_000_000) as u32 + d.subsec_nanos() / 1_000u32)
} else {
None
}
}

#[inline]
pub fn mean_nanos(&self) -> u64 { DurationWindow::nanos(self.mean()) }

#[inline]
pub fn max(&self) -> Option<Duration> {
self.items.iter()
.map(|p| p.value)
.max()
}

#[inline]
pub fn max_nanos(&self) -> Option<u64> {
self.max()
.map(|x| DurationWindow::nanos(x))
}

#[inline]
pub fn first(&self) -> Option<Duration> {
self.items
.front()
.map(|pt| pt.value())
}

/// Returns the `Duration` between `t` and the first `Point` in `self.items`.
///
/// If there are no items, returns `Duration { secs: 0, nanos: 0 }`.
///
/// # Panics
///
/// This function will panic if `t` is earlier than the first `Point`'s `Instant`.
///
#[inline]
pub fn elapsed(&self, t: Instant) -> Duration {
self.items.front()
.map(|p| {
t - p.time
}).unwrap_or_else(|| Duration::new(0, 0))
}
}
}

#[derive(Debug)] #[derive(Debug)]
pub enum Latency { pub enum Latency {
Ws(Exchange, Ticker, Duration), Ws(Exchange, Ticker, Duration),
@@ -215,8 +570,8 @@ impl Manager {
info!(logger, "initializing"); info!(logger, "initializing");
let mut gdax_ws = DurationWindow::new(window);
let mut gdax_trade = DurationWindow::new(window);
let mut gdax_ws = windows::DurationWindow::new(window);
let mut gdax_trade = windows::DurationWindow::new(window);


let mut last = Last::default(); let mut last = Last::default();


@@ -239,7 +594,7 @@ impl Manager {
Latency::Trade(_, ticker, dur) => { Latency::Trade(_, ticker, dur) => {
gdax_trade.update(loop_time, dur); gdax_trade.update(loop_time, dur);
last.gdax = loop_time; last.gdax = loop_time;
let nanos = DurationWindow::nanos(dur);
let nanos = windows::DurationWindow::nanos(dur);
measurements.send( measurements.send(
OwnedMeasurement::new("gdax_trade_api") OwnedMeasurement::new("gdax_trade_api")
.add_tag("ticker", ticker.as_str()) .add_tag("ticker", ticker.as_str())
@@ -309,20 +664,20 @@ impl LatencyManager {
info!(logger, "initializing zmq"); info!(logger, "initializing zmq");


info!(logger, "initializing DurationWindows"); info!(logger, "initializing DurationWindows");
let mut gdax_ws = DurationWindow::new(d);
let mut gdax_priv = DurationWindow::new(d);
let mut krkn_pub = DurationWindow::new(d);
let mut krkn_priv = DurationWindow::new(d);
let mut plnx_pub = DurationWindow::new(d);
let mut plnx_priv = DurationWindow::new(d);
let mut plnx_order = DurationWindow::new(d);
let mut plnx_ws_count: Window<u32> = Window::new(d);
let mut gdax_ws = windows::DurationWindow::new(d);
let mut gdax_priv = windows::DurationWindow::new(d);
let mut krkn_pub = windows::DurationWindow::new(d);
let mut krkn_priv = windows::DurationWindow::new(d);
let mut plnx_pub = windows::DurationWindow::new(d);
let mut plnx_priv = windows::DurationWindow::new(d);
let mut plnx_order = windows::DurationWindow::new(d);
let mut plnx_ws_count: windows::Window<u32> = windows::Window::new(d);


// yes I am intentionally breaking from the hard-typed duration // yes I am intentionally breaking from the hard-typed duration
// window ... that was a stupid idea // window ... that was a stupid idea
// //
let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30));
let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300));
let mut krkn_trade_30 = windows::DurationWindow::new(Duration::from_secs(30));
let mut krkn_trade_300 = windows::DurationWindow::new(Duration::from_secs(300));


let mut last = Last::default(); let mut last = Last::default();




+ 2
- 1
src/lib.rs View File

@@ -23,8 +23,9 @@ extern crate decimal;
extern crate uuid; extern crate uuid;
extern crate hdrhistogram; extern crate hdrhistogram;
extern crate smallvec; extern crate smallvec;
extern crate num;


extern crate windows;
//extern crate windows;
extern crate pubsub as pub_sub; extern crate pubsub as pub_sub;


use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};


Loading…
Cancel
Save