diff --git a/src/influx.rs b/src/influx.rs index b142178..24c7907 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -292,10 +292,10 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 { /// incoming `Measurement`s that way *in addition* to the old socket/`String` /// method /// -pub fn writer_str_or_meas(warnings: Sender) -> (thread::JoinHandle<()>, Sender) { +pub fn writer_str_or_meas(log_path: &str, warnings: Sender) -> (thread::JoinHandle<()>, Sender) { let (tx, rx) = channel(); + let logger = file_logger(log_path); let thread = thread::spawn(move || { - let logger = file_logger("var/log/influx.log"); info!(logger, "initializing zmq"); let _ = fs::create_dir("/tmp/mm"); let ctx = zmq::Context::new(); diff --git a/src/latency.rs b/src/latency.rs index c167fdd..f45272c 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -2,7 +2,7 @@ use std::thread::{self, JoinHandle}; use std::sync::{Arc, Mutex, RwLock}; use std::sync::mpsc::{self, Sender, Receiver, channel}; use std::collections::VecDeque; -use std::fmt::{self, Display, Error as FmtError, Formatter, Write}; +use std::fmt::{self, Display, Write}; use std::time::{Instant, Duration}; use chrono::{self, DateTime, Utc, TimeZone}; @@ -11,10 +11,10 @@ use zmq; use influent::measurement::{Measurement, Value}; use windows::{DurationWindow, Incremental}; -use money::{Ticker, Side}; +use money::{Ticker, Side, ByExchange, Exchange}; use super::file_logger; -use influx; +use influx::{self, OwnedMeasurement, OwnedValue}; @@ -71,7 +71,7 @@ pub fn tfmt_dt(dt: DateTime) -> String { } -pub fn tfmt_write(ns: Nanos, f: &mut Formatter) { +pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) { match ns { t if t <= MICROSECOND => { write!(f, "{}ns", t); @@ -90,6 +90,14 @@ pub fn tfmt_write(ns: Nanos, f: &mut Formatter) { } } +#[derive(Debug)] +pub enum Latency { + Ws(Exchange, Ticker, Duration), + Http(Exchange, Duration), + Trade(Exchange, Ticker, Duration), + Terminate +} + #[derive(Debug)] pub enum ExperiencedLatency { @@ -154,6 +162,42 @@ impl MeasurementWindow for WTen { fn duration(&self) -> Duration { Duration::from_secs(10) } } + +#[derive(Debug, Clone)] +pub struct Update { + pub gdax_ws: Nanos, + pub gdax_trade: Nanos, + pub gdax_last: DateTime +} + +impl Default for Update { + fn default() -> Self { + Update { + gdax_ws: 0, + gdax_trade: 0, + gdax_last: Utc::now(), + } + } +} + +// impl Update { +// pub fn new(window: Duration) -> Self { +// Update { +// window, +// ws: OrderMap::new(), +// http: 0, +// trade: OrderMap::new(), +// } +// } +// } + +// #[derive(Clone)] +// pub struct Updates { +// pub gdax_5: Update, +// pub gdax_30: Update, +// pub last: ByExchange> +// } + #[derive(Debug, Clone)] pub struct LatencyUpdate where W: MeasurementWindow @@ -206,7 +250,7 @@ impl Default for LatencyUpdate impl Display for LatencyUpdate where W: MeasurementWindow { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, " gdax ws: "); tfmt_write(self.gdax_ws, f); write!(f, "\n krkn pub: "); @@ -248,6 +292,12 @@ impl LatencyUpdate { } +pub struct Manager { + pub tx: Sender, + pub channel: PubSub, + thread: Option>, +} + pub struct LatencyManager where W: MeasurementWindow + Clone + Send + Sync { @@ -267,6 +317,7 @@ struct Last { broadcast: Instant, plnx: Instant, krkn: Instant, + gdax: Instant, } impl Default for Last { @@ -275,10 +326,98 @@ impl Default for Last { broadcast: Instant::now(), plnx: Instant::now(), krkn: Instant::now(), + gdax: Instant::now(), + } + } +} + +impl Manager { + pub fn new(window: Duration, + log_path: &'static str, + measurements: Sender) -> Self { + + let (tx, rx) = channel(); + let tx_copy = tx.clone(); + let channel = PubSub::new(); + let channel_copy = channel.clone(); + let logger = file_logger(log_path); + + info!(logger, "initializing"); + + let mut gdax_ws = DurationWindow::new(window); + let mut gdax_trade = DurationWindow::new(window); + + let mut last = Last::default(); + + info!(logger, "entering loop"); + let mut terminate = false; + + let thread = Some(thread::spawn(move || { + loop { + + let loop_time = Instant::now(); + + rx.try_recv().map(|msg| { + debug!(logger, "rcvd {:?}", msg); + + match msg { + Latency::Ws(exch, ticker, dur) => { + // shortcut + gdax_ws.update(loop_time, dur); + last.gdax = loop_time; + } + + Latency::Trade(exch, ticker, dur) => { + //shorcut + gdax_trade.update(loop_time, dur); + last.gdax = loop_time; + let nanos = DurationWindow::nanos(dur); + measurements.send( + OwnedMeasurement::new("gdax_trade_api") + .add_string_tag("ticker", ticker.to_string()) + .add_field("nanos", OwnedValue::Integer(nanos as i64)) + .set_timestamp(influx::now())); + } + + Latency::Terminate => { + crit!(logger, "rcvd Terminate order"); + terminate = true; + } + + _ => {} + } + }); + + if loop_time - last.broadcast > Duration::from_millis(100) { + debug!(logger, "initalizing broadcast"); + + let update = Update { + gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(), + gdax_trade: gdax_trade.refresh(&loop_time).mean_nanos(), + gdax_last: dt_from_dur(loop_time - last.gdax) + }; + channel.send(update); + last.broadcast = loop_time; + debug!(logger, "sent broadcast"); + } else { + thread::sleep(Duration::from_millis(1) / 10); + } + + if terminate { break } + } + crit!(logger, "goodbye"); + })); + + Manager { + tx, + channel: channel_copy, + thread, } } } + + //impl LatencyManager { impl LatencyManager { pub fn new(w: WTen) -> Self { diff --git a/src/lib.rs b/src/lib.rs index 72f05c0..420d193 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,7 +36,7 @@ pub fn nanos(t: DateTime) -> u64 { (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64) } -pub fn file_logger(path: &'static str) -> slog::Logger { +pub fn file_logger(path: &str) -> slog::Logger { let mut builder = FileLoggerBuilder::new(path); builder.level(Severity::Debug); builder.timezone(TimeZone::Utc); diff --git a/src/warnings.rs b/src/warnings.rs index 1b9a971..57f6325 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -271,7 +271,7 @@ impl MeasurementRecord { values.serialize(record, &mut builder); } - pub fn meas<'a>(&'a self) -> Measurement<'a> { + pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> { let fields: BTreeMap<&'a str, InfluentValue<'a>> = self.fields.iter() .map(|&(k, ref v)| { @@ -285,7 +285,7 @@ impl MeasurementRecord { }).collect(); Measurement { - key: "log", + key: name, timestamp: Some(nanos(Utc::now()) as i64), fields, tags, @@ -381,7 +381,8 @@ impl WarningsManager { let ctx = zmq::Context::new(); let socket = influx::push(&ctx).unwrap(); let thread = thread::spawn(move || { - let logger = file_logger("var/log/warnings-manager.log"); + let path = format!("var/log/warnings-manager-{}.log", measurement_name); + let logger = file_logger(&path); info!(logger, "entering loop"); loop { if let Ok(msg) = rx.recv() { @@ -394,7 +395,7 @@ impl WarningsManager { Warning::Debug { msg, kv } => { debug!(logger, "new Warning::Debug arrived"; "msg" => &msg); - let mut meas = kv.meas(); + let mut meas = kv.to_measurement(measurement_name); meas.add_field("msg", InfluentValue::String(msg.as_ref())); meas.add_tag("category", "debug"); influx::serialize(&meas, &mut buf);