From b27ad08c10dd77c14aa73c4f4098d7a1371c4fc1 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 30 Aug 2017 18:15:37 -0400 Subject: [PATCH] file loggers for latency / warnings to inspect wtf is going on there --- Cargo.toml | 1 + src/latency.rs | 20 ++++++++++++--- src/lib.rs | 17 ++++++++++--- src/warnings.rs | 66 +++++++++++++++++++++++++++---------------------- 4 files changed, 69 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8f9609e..e72c8af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ hyper = "0.10" termion = "1.4.0" pub-sub = "2.0" slog = "2.0.6" +sloggers = "0.2" windows = { path = "../windows" } money = { path = "../money" } diff --git a/src/latency.rs b/src/latency.rs index 5f658d4..c167fdd 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -13,10 +13,12 @@ use influent::measurement::{Measurement, Value}; use windows::{DurationWindow, Incremental}; use money::{Ticker, Side}; +use super::file_logger; use influx; + pub type Nanos = u64; pub const SECOND: u64 = 1e9 as u64; @@ -287,11 +289,13 @@ impl LatencyManager { let w = w.clone(); let thread = Some(thread::spawn(move || { + let logger = file_logger("var/log/latency-manager.log"); + info!(logger, "initializing zmq"); let ctx = zmq::Context::new(); let socket = influx::push(&ctx).unwrap(); let mut buf = String::with_capacity(4096); - let w = w.clone(); + info!(logger, "initializing DurationWindows"); let mut gdax_ws = DurationWindow::new(w.duration()); let mut gdax_priv = DurationWindow::new(w.duration()); let mut krkn_pub = DurationWindow::new(w.duration()); @@ -313,13 +317,16 @@ impl LatencyManager { thread::sleep_ms(1); + info!(logger, "entering loop"); loop { let loop_time = Instant::now(); if let Ok(msg) = rx.recv() { + debug!(logger, "new msg: {:?}", msg); + match msg { ExperiencedLatency::Terminate => { - //println!("latency manager terminating"); + crit!(logger, "terminating"); break; } @@ -353,6 +360,8 @@ impl LatencyManager { } ExperiencedLatency::KrknTrade(d, cmd, ticker, side) => { + debug!(logger, "new KrknTrade"; + "cmd" => cmd); last.krkn = loop_time; let n = DurationWindow::nanos(d); krkn_trade_30.update(loop_time, d); @@ -374,11 +383,14 @@ impl LatencyManager { buf.clear(); } //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d), - other => {} + other => { + warn!(logger, "unexpected msg: {:?}", other); + } } } if loop_time - last.broadcast > Duration::from_millis(100) { + debug!(logger, "initalizing broadcast"); // note - because we mutated the Window instances // above, we need a fresh Instant to avoid less than other // panic @@ -408,8 +420,10 @@ impl LatencyManager { }; channel.send(update); last.broadcast = loop_time; + debug!(logger, "sent broadcast"); } } + crit!(logger, "goodbye"); })); LatencyManager { diff --git a/src/lib.rs b/src/lib.rs index 84d5fa0..72f05c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,21 +13,32 @@ extern crate chrono; extern crate hyper; extern crate termion; extern crate pub_sub; +extern crate sloggers; extern crate windows; +use std::sync::Arc; + use chrono::{DateTime, Utc}; +use sloggers::Build; +use sloggers::types::{Severity, TimeZone}; +use sloggers::file::FileLoggerBuilder; pub mod influx; pub mod warnings; pub mod latency; +//pub type FileLogger = slog::Logger>>; + /// converts a chrono::DateTime to an integer timestamp (ns) /// pub fn nanos(t: DateTime) -> u64 { (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64) } -// #[cfg(test)] -// mod tests { -// } +pub fn file_logger(path: &'static str) -> slog::Logger { + let mut builder = FileLoggerBuilder::new(path); + builder.level(Severity::Debug); + builder.timezone(TimeZone::Utc); + builder.build().unwrap() +} diff --git a/src/warnings.rs b/src/warnings.rs index 6a0623f..1b9a971 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -13,7 +13,7 @@ use termion::color::{self, Fg, Bg}; use influent::measurement::{Measurement, Value as InfluentValue}; use slog::{self, OwnedKVList, Drain, Key, KV}; -use super::nanos; +use super::{nanos, file_logger}; use influx; @@ -380,41 +380,49 @@ impl WarningsManager { let mut buf = String::with_capacity(4096); let ctx = zmq::Context::new(); let socket = influx::push(&ctx).unwrap(); - let thread = thread::spawn(move || { loop { - if let Ok(msg) = rx.recv() { - match msg { - Warning::Terminate => { - //println!("warnings manager terminating"); - break; - } - - Warning::Debug { msg, kv } => { - let mut meas = kv.meas(); - meas.add_field("msg", InfluentValue::String(msg.as_ref())); - meas.add_tag("category", "debug"); - influx::serialize(&meas, &mut buf); - socket.send_str(&buf, 0); - buf.clear(); - // and don't push to warnings - // bc it's debug - } + let thread = thread::spawn(move || { + let logger = file_logger("var/log/warnings-manager.log"); + info!(logger, "entering loop"); + loop { + if let Ok(msg) = rx.recv() { + match msg { + Warning::Terminate => { + crit!(logger, "terminating"); + break; + } - other => { - let rec = Record::new(other); - { - let m = rec.to_measurement(measurement_name); - influx::serialize(&m, &mut buf); + Warning::Debug { msg, kv } => { + debug!(logger, "new Warning::Debug arrived"; + "msg" => &msg); + let mut meas = kv.meas(); + meas.add_field("msg", InfluentValue::String(msg.as_ref())); + meas.add_tag("category", "debug"); + influx::serialize(&meas, &mut buf); socket.send_str(&buf, 0); buf.clear(); + // and don't push to warnings + // bc it's debug } - if let Ok(mut lock) = warnings.write() { - lock.push_front(rec); - lock.truncate(N_WARNINGS); + + other => { + debug!(logger, "new {} arrived", other.category_str(); + "msg" => other.category_str()); + let rec = Record::new(other); + { + let m = rec.to_measurement(measurement_name); + influx::serialize(&m, &mut buf); + socket.send_str(&buf, 0); + buf.clear(); + } + if let Ok(mut lock) = warnings.write() { + lock.push_front(rec); + lock.truncate(N_WARNINGS); + } } } } - } - } }); + } + }); WarningsManager { warnings: warnings_copy,