Browse Source

adds a krkn_last and plnx_last to Updates

master
Jonathan Strong 7 years ago
parent
commit
c57e2273fa
1 changed files with 121 additions and 26 deletions
  1. +121
    -26
      src/latency.rs

+ 121
- 26
src/latency.rs View File

@@ -5,7 +5,7 @@ use std::collections::VecDeque;
use std::fmt::{self, Display, Error as FmtError, Formatter, Write}; use std::fmt::{self, Display, Error as FmtError, Formatter, Write};
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};


use chrono::{DateTime, Utc, TimeZone};
use chrono::{self, DateTime, Utc, TimeZone};
use pub_sub::PubSub; use pub_sub::PubSub;
use zmq; use zmq;
use influent::measurement::{Measurement, Value}; use influent::measurement::{Measurement, Value};
@@ -58,6 +58,14 @@ pub fn tfmt_dur(d: Duration) -> String {
tfmt(nanos(d)) tfmt(nanos(d))
} }


pub fn tfmt_dt(dt: DateTime<Utc>) -> String {
Utc::now().signed_duration_since(dt)
.to_std()
.map(|dur| {
tfmt_dur(dur)
}).unwrap_or("?".into())
}



pub fn tfmt_write(ns: Nanos, f: &mut Formatter) { pub fn tfmt_write(ns: Nanos, f: &mut Formatter) {
match ns { match ns {
@@ -142,12 +150,11 @@ impl MeasurementWindow for WTen {
fn duration(&self) -> Duration { Duration::from_secs(10) } fn duration(&self) -> Duration { Duration::from_secs(10) }
} }


#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct LatencyUpdate<W> pub struct LatencyUpdate<W>
where W: MeasurementWindow where W: MeasurementWindow
{ {
pub gdax_ws: Nanos, pub gdax_ws: Nanos,
//pub gdax_ws_nolock: Nanos,
pub krkn_pub: Nanos, pub krkn_pub: Nanos,
pub krkn_priv: Nanos, pub krkn_priv: Nanos,
pub plnx_pub: Nanos, pub plnx_pub: Nanos,
@@ -159,11 +166,39 @@ pub struct LatencyUpdate<W>
pub krkn_trade_300_mean: Nanos, pub krkn_trade_300_mean: Nanos,
pub krkn_trade_300_max: Nanos, pub krkn_trade_300_max: Nanos,


pub plnx_last: DateTime<Utc>,
pub krkn_last: DateTime<Utc>,

//pub event_loop: Nanos, //pub event_loop: Nanos,


pub size: W, pub size: W,
} }


impl<W> Default for LatencyUpdate<W>
where W: MeasurementWindow + Default
{
fn default() -> Self {
LatencyUpdate {
gdax_ws: Nanos::default(),
krkn_pub: Nanos::default(),
krkn_priv: Nanos::default(),
plnx_pub: Nanos::default(),
plnx_priv: Nanos::default(),
plnx_order: Nanos::default(),
krkn_trade_30_mean: Nanos::default(),
krkn_trade_30_max: Nanos::default(),

krkn_trade_300_mean: Nanos::default(),
krkn_trade_300_max: Nanos::default(),

plnx_last: Utc::now(),
krkn_last: Utc::now(),

size: W::default()
}
}
}

impl<W> Display for LatencyUpdate<W> impl<W> Display for LatencyUpdate<W>
where W: MeasurementWindow where W: MeasurementWindow
{ {
@@ -217,6 +252,29 @@ pub struct LatencyManager<W>
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
} }


/// returns a DateTime equal to now - `dur`
///
pub fn dt_from_dur(dur: Duration) -> DateTime<Utc> {
let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64);
Utc::now() - old_dur
}

struct Last {
broadcast: Instant,
plnx: Instant,
krkn: Instant,
}

impl Default for Last {
fn default() -> Self {
Last {
broadcast: Instant::now(),
plnx: Instant::now(),
krkn: Instant::now(),
}
}
}

//impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> { //impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> {
impl LatencyManager<WTen> { impl LatencyManager<WTen> {
pub fn new(w: WTen) -> Self { pub fn new(w: WTen) -> Self {
@@ -227,6 +285,7 @@ impl LatencyManager<WTen> {
let w = w.clone(); let w = w.clone();


let thread = Some(thread::spawn(move || { let thread = Some(thread::spawn(move || {

let ctx = zmq::Context::new(); let ctx = zmq::Context::new();
let socket = influx::push(&ctx).unwrap(); let socket = influx::push(&ctx).unwrap();
let mut buf = String::with_capacity(4096); let mut buf = String::with_capacity(4096);
@@ -239,6 +298,7 @@ impl LatencyManager<WTen> {
let mut plnx_priv = DurationWindow::new(w.duration()); let mut plnx_priv = DurationWindow::new(w.duration());
let mut plnx_order = DurationWindow::new(w.duration()); let mut plnx_order = DurationWindow::new(w.duration());



// yes I am intentionally breaking from the hard-typed duration // yes I am intentionally breaking from the hard-typed duration
// window ... that was a stupid idea // window ... that was a stupid idea
// //
@@ -246,26 +306,55 @@ impl LatencyManager<WTen> {
let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300)); let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300));
//let mut gdax_ws_nolock = DurationWindow::new(w.duration()); //let mut gdax_ws_nolock = DurationWindow::new(w.duration());
//let mut event_loop = DurationWindow::new(w.duration()); //let mut event_loop = DurationWindow::new(w.duration());
let mut last_broadcast = Instant::now();

let mut last = Last::default();

thread::sleep_ms(1);

loop { loop {
let loop_time = Instant::now();

if let Ok(msg) = rx.recv() { if let Ok(msg) = rx.recv() {
match msg { match msg {
ExperiencedLatency::Terminate => { ExperiencedLatency::Terminate => {
//println!("latency manager terminating"); //println!("latency manager terminating");
break; break;
} }
ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(Instant::now(), d),
//ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(Instant::now(), d),
ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(Instant::now(), d),
ExperiencedLatency::KrknHttpPublic(d) => krkn_pub.update(Instant::now(), d),
ExperiencedLatency::KrknHttpPrivate(d) => krkn_priv.update(Instant::now(), d),
ExperiencedLatency::PlnxHttpPublic(d) => plnx_pub.update(Instant::now(), d),
ExperiencedLatency::PlnxHttpPrivate(d) => plnx_priv.update(Instant::now(), d),
ExperiencedLatency::PlnxOrderBook(d) => plnx_order.update(Instant::now(), d),

ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d),
//ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(loop_time, d),
ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d),

ExperiencedLatency::KrknHttpPublic(d) => {
last.krkn = loop_time;
krkn_pub.update(loop_time, d)
}

ExperiencedLatency::KrknHttpPrivate(d) => {
last.krkn = loop_time;
krkn_priv.update(loop_time, d)
}

ExperiencedLatency::PlnxHttpPublic(d) => {
last.plnx = loop_time;
plnx_pub.update(loop_time, d)
}

ExperiencedLatency::PlnxHttpPrivate(d) => {
last.plnx = loop_time;
plnx_priv.update(loop_time, d)
}

ExperiencedLatency::PlnxOrderBook(d) => {
last.plnx = loop_time;
plnx_order.update(loop_time, d)
}

ExperiencedLatency::KrknTrade(d) => { ExperiencedLatency::KrknTrade(d) => {
last.krkn = loop_time;
let n = DurationWindow::nanos(d); let n = DurationWindow::nanos(d);
krkn_trade_30.update(Instant::now(), d);
krkn_trade_300.update(Instant::now(), d);
krkn_trade_30.update(loop_time, d);
krkn_trade_300.update(loop_time, d);
let mut m = Measurement::new("krkn_trade_api"); let mut m = Measurement::new("krkn_trade_api");
m.add_field("nanos", Value::Integer(n as i64)); m.add_field("nanos", Value::Integer(n as i64));
m.set_timestamp(now()); m.set_timestamp(now());
@@ -278,18 +367,21 @@ impl LatencyManager<WTen> {
} }
} }


if Instant::now() - last_broadcast > Duration::from_millis(100) {
let now = Instant::now();
krkn_trade_30.refresh(&now);
krkn_trade_300.refresh(&now);
if loop_time - last.broadcast > Duration::from_millis(100) {
// note - because we mutated the Window instances
// above, we need a fresh Instant to avoid less than other
// panic
//
krkn_trade_30.refresh(&loop_time);
krkn_trade_300.refresh(&loop_time);
let update = LatencyUpdate { let update = LatencyUpdate {
gdax_ws: gdax_ws.refresh(&now).mean_nanos(),
//gdax_ws_nolock: gdax_ws_nolock.refresh(&now).mean_nanos(),
krkn_pub: krkn_pub.refresh(&now).mean_nanos(),
krkn_priv: krkn_priv.refresh(&now).mean_nanos(),
plnx_pub: plnx_pub.refresh(&now).mean_nanos(),
plnx_priv: plnx_priv.refresh(&now).mean_nanos(),
plnx_order: plnx_order.refresh(&now).mean_nanos(),
gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
//gdax_ws_nolock: gdax_ws_nolock.refresh(&loop_time).mean_nanos(),
krkn_pub: krkn_pub.refresh(&loop_time).mean_nanos(),
krkn_priv: krkn_priv.refresh(&loop_time).mean_nanos(),
plnx_pub: plnx_pub.refresh(&loop_time).mean_nanos(),
plnx_priv: plnx_priv.refresh(&loop_time).mean_nanos(),
plnx_order: plnx_order.refresh(&loop_time).mean_nanos(),


krkn_trade_30_mean: krkn_trade_30.mean_nanos(), krkn_trade_30_mean: krkn_trade_30.mean_nanos(),
krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0), krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0),
@@ -297,11 +389,14 @@ impl LatencyManager<WTen> {
krkn_trade_300_mean: krkn_trade_300.mean_nanos(), krkn_trade_300_mean: krkn_trade_300.mean_nanos(),
krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0), krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0),


plnx_last: dt_from_dur(loop_time - last.plnx),
krkn_last: dt_from_dur(loop_time - last.krkn),

//event_loop: event_loop.refresh(&now).mean_nanos(), //event_loop: event_loop.refresh(&now).mean_nanos(),
size: w.clone(), size: w.clone(),
}; };
channel.send(update); channel.send(update);
last_broadcast = now;
last.broadcast = loop_time;
} }
} }
})); }));


Loading…
Cancel
Save