diff --git a/Cargo.toml b/Cargo.toml index 688226b..69f70b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "logging" -version = "0.2.3" +version = "0.3.0" authors = ["Jonathan Strong "] [dependencies] diff --git a/examples/zmq-logger.rs b/examples/zmq-logger.rs index eb62fba..8e47498 100644 --- a/examples/zmq-logger.rs +++ b/examples/zmq-logger.rs @@ -1,3 +1,4 @@ +#![allow(unused_imports)] #[macro_use] extern crate slog; extern crate logging; extern crate slog_term; diff --git a/src/influx.rs b/src/influx.rs index 088492c..2d029d1 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -1,13 +1,12 @@ //! Utilities to efficiently send data to influx //! -use std::iter::FromIterator; -use std::io::{Write, Read}; -use std::sync::mpsc::{Sender, Receiver, channel, SendError}; +use std::io::Read; +use std::sync::mpsc::{Sender, channel, SendError}; use std::thread; -use std::fs::{self, OpenOptions}; +use std::fs; use std::time::Duration; -use std::hash::{Hash, BuildHasherDefault}; +use std::hash::BuildHasherDefault; use hyper::status::StatusCode; use hyper::client::response::Response; @@ -15,15 +14,14 @@ use hyper::Url; use hyper::client::Client; use influent::measurement::{Measurement, Value}; use zmq; -use chrono::{DateTime, Utc, TimeZone}; +#[allow(unused_imports)] +use chrono::{DateTime, Utc}; use sloggers::types::Severity; use ordermap::OrderMap; use fnv::FnvHasher; use decimal::d128; use uuid::Uuid; -use money::Ticker; - use super::{nanos, file_logger}; use warnings::Warning; @@ -211,6 +209,7 @@ impl InfluxWriter { self.tx.clone() } + #[allow(unused_assignments)] pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self { let (kill_switch, terminate) = channel(); let (tx, rx) = channel(); @@ -219,8 +218,12 @@ impl InfluxWriter { debug!(logger, "initializing url"; "DB_HOST" => host, "DB_NAME" => db); + + #[cfg(not(test))] let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse"); + #[cfg(not(test))] let client = Client::new(); + debug!(logger, "initializing buffers"); let mut meas_buf = String::with_capacity(32 * 32 * 32); let mut buf = String::with_capacity(32 * 32 * 32); @@ -264,7 +267,7 @@ impl InfluxWriter { Ok(mut resp) => { let mut server_resp = String::with_capacity(1024); - resp.read_to_string(&mut server_resp); //.unwrap_or(0); + let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); error!(logger, "influx server error"; "status" => resp.status.to_string(), "body" => server_resp); @@ -285,7 +288,7 @@ impl InfluxWriter { loop { rcvd_msg = false; - rx.recv_timeout(Duration::from_millis(10)) + let _ = rx.recv_timeout(Duration::from_millis(10)) .map(|mut meas: OwnedMeasurement| { // if we didn't set the timestamp, it would end up // being whenever we accumulated `BUFFER_SIZE` messages, @@ -473,8 +476,6 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { add_tag(line, key, value); } - let mut fields = measurement.fields.iter(); - let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| { if is_first { line.push_str(" "); } else { line.push_str(","); } line.push_str(&escape_tag(key)); @@ -543,7 +544,7 @@ pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { Ok(Response { status, .. }) if status == StatusCode::NoContent => {} Ok(mut resp) => { - resp.read_to_string(&mut server_resp); //.unwrap_or(0); + let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); let _ = warnings.send( Warning::Error( format!("Influx server: {}", server_resp))); @@ -582,9 +583,6 @@ pub struct OwnedMeasurement { pub timestamp: Option, pub fields: Map<&'static str, OwnedValue>, pub tags: Map<&'static str, &'static str>, - //pub n_tags: usize, - //pub n_fields: usize, - //pub string_tags: HashMap<&'static str, String> } impl OwnedMeasurement { @@ -594,9 +592,6 @@ impl OwnedMeasurement { timestamp: None, tags: new_map(n_tags), fields: new_map(n_fields), - //n_tags, - //n_fields, - //string_tags: HashMap::new() } } @@ -609,11 +604,6 @@ impl OwnedMeasurement { self } - // pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self { - // self.string_tags.insert(key, value); - // self - // } - pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { self.fields.insert(key, value); self @@ -630,6 +620,7 @@ impl OwnedMeasurement { } } +#[allow(unused_imports, unused_variables)] #[cfg(test)] mod tests { use super::*; @@ -735,7 +726,7 @@ mod tests { int [ seven => { 1 + 2 } ], time [ 1 ] ); - thread::sleep_ms(10); + thread::sleep(Duration::from_millis(10)); let meas: OwnedMeasurement = rx.try_recv().unwrap(); assert_eq!(meas.key, "test_measurement"); assert_eq!(meas.tags.get("one"), Some(&"a")); @@ -759,7 +750,7 @@ mod tests { time[t] ); - thread::sleep_ms(10); + thread::sleep(Duration::from_millis(10)); let meas: OwnedMeasurement = rx.try_recv().unwrap(); assert_eq!(meas.key, "test_measurement"); assert_eq!(meas.tags.get("one"), Some(&"a")); @@ -783,7 +774,7 @@ mod tests { time[1] ); - thread::sleep_ms(10); + thread::sleep(Duration::from_millis(10)); let meas: OwnedMeasurement = rx.try_recv().unwrap(); assert_eq!(meas.key, "test_measurement"); assert_eq!(meas.tags.get("one"), Some(&"a")); @@ -872,7 +863,7 @@ mod tests { let now = now(); meas.set_timestamp(now); serialize(&meas, &mut buf); - socket.send_str(&buf, 0); + socket.send_str(&buf, 0).unwrap(); drop(w); } @@ -914,7 +905,7 @@ mod tests { Ok(Response { status, .. }) if status == StatusCode::NoContent => {} Ok(mut resp) => { - resp.read_to_string(&mut server_resp); //.unwrap_or(0); + resp.read_to_string(&mut server_resp).unwrap(); panic!("{}", server_resp); } @@ -968,7 +959,7 @@ mod tests { let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; let mut buf = String::new(); let mut server_resp = String::new(); - let mut m = OwnedMeasurement::new("rust_test") + let m = OwnedMeasurement::new("rust_test") .add_field("s", OwnedValue::String(raw.to_string())) .set_timestamp(now()); serialize_owned(&m, &mut buf); @@ -987,7 +978,7 @@ mod tests { Ok(Response { status, .. }) if status == StatusCode::NoContent => {} Ok(mut resp) => { - resp.read_to_string(&mut server_resp); //.unwrap_or(0); + resp.read_to_string(&mut server_resp).unwrap(); panic!("{}", server_resp); } diff --git a/src/latency.rs b/src/latency.rs index 7891644..7ce77d3 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -1,19 +1,14 @@ use std::thread::{self, JoinHandle}; -use std::sync::{Arc, Mutex, RwLock}; -use std::sync::mpsc::{self, Sender, Receiver, channel}; -use std::collections::VecDeque; -use std::fmt::{self, Display, Write}; +use std::sync::mpsc::{Sender, channel}; +use std::fmt; use std::time::{Instant, Duration}; -use chrono::{self, DateTime, Utc, TimeZone}; +use chrono::{self, DateTime, Utc}; use pub_sub::PubSub; -use zmq; -use influent::measurement::{Measurement, Value}; use sloggers::types::Severity; -//use chashmap::CHashMap; use windows::{DurationWindow, Incremental, Window}; -use money::{Ticker, Side, ByExchange, Exchange}; +use money::{Ticker, Side, Exchange}; use super::file_logger; use influx::{self, OwnedMeasurement, OwnedValue}; @@ -37,24 +32,22 @@ pub fn dt_nanos(t: DateTime) -> i64 { pub fn now() -> i64 { dt_nanos(Utc::now()) } pub fn tfmt(ns: Nanos) -> String { - let mut f = String::new(); match ns { t if t <= MICROSECOND => { - write!(f, "{}ns", t); + format!("{}ns", t) } t if t > MICROSECOND && t < MILLISECOND => { - write!(f, "{}u", t / MICROSECOND); + format!("{}u", t / MICROSECOND) } t if t > MILLISECOND && t < SECOND => { - write!(f, "{}ms", t / MILLISECOND); + format!("{}ms", t / MILLISECOND) } t => { - write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND); + format!("{}.{}sec", t / SECOND, t / MILLISECOND) } } - f } pub fn tfmt_dur(d: Duration) -> String { @@ -70,21 +63,22 @@ pub fn tfmt_dt(dt: DateTime) -> String { } -pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) { +pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) -> fmt::Result { match ns { t if t <= MICROSECOND => { - write!(f, "{}ns", t); + write!(f, "{}ns", t) } t if t > MICROSECOND && t < MILLISECOND => { - write!(f, "{}u", t / MICROSECOND); + write!(f, "{}u", t / MICROSECOND) } + t if t > MILLISECOND && t < SECOND => { - write!(f, "{}ms", t / MILLISECOND); + write!(f, "{}ms", t / MILLISECOND) } t => { - write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND); + write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND) } } } @@ -101,69 +95,19 @@ pub enum Latency { pub enum ExperiencedLatency { GdaxWebsocket(Duration), - - //GdaxWebsocketNoLock(Duration), - GdaxHttpPublic(Duration), - GdaxHttpPrivate(Duration), - PlnxHttpPublic(Duration), - PlnxHttpPrivate(Duration), - PlnxOrderBook(Duration), - - ExmoHttpPublic(Duration), - KrknHttpPublic(Duration), - KrknHttpPrivate(Duration), - KrknTrade(Duration, &'static str, Option, Option), - - EventLoop(Duration), - PlnxWs(Ticker), Terminate } -// impl Message for ExperiencedLatency { -// fn kill_switch() -> Self { -// ExperiencedLatency::Terminate -// } -// } - -/// represents over what period of time -/// the latency measurements were taken -pub trait MeasurementWindow { - fn duration(&self) -> Duration; -} - -#[derive(Debug, Clone, Copy)] -pub struct WThirty; - -impl Default for WThirty { - fn default() -> Self { WThirty {} } -} - -impl MeasurementWindow for WThirty { - fn duration(&self) -> Duration { Duration::from_secs(30) } -} - -#[derive(Debug, Clone, Copy)] -pub struct WTen; - -impl Default for WTen { - fn default() -> Self { WTen {} } -} - -impl MeasurementWindow for WTen { - fn duration(&self) -> Duration { Duration::from_secs(10) } -} - - #[derive(Debug, Clone)] pub struct Update { pub gdax_ws: Nanos, @@ -182,9 +126,7 @@ impl Default for Update { } #[derive(Debug, Clone)] -pub struct LatencyUpdate - where W: MeasurementWindow -{ +pub struct LatencyUpdate { pub gdax_ws: Nanos, pub krkn_pub: Nanos, pub krkn_priv: Nanos, @@ -201,94 +143,38 @@ pub struct LatencyUpdate pub krkn_last: DateTime, pub plnx_ws_count: u64, - - //pub event_loop: Nanos, - - pub size: W, } -impl Default for LatencyUpdate - where W: MeasurementWindow + Default -{ +impl Default for LatencyUpdate { fn default() -> Self { LatencyUpdate { - gdax_ws: Nanos::default(), - krkn_pub: Nanos::default(), - krkn_priv: Nanos::default(), - plnx_pub: Nanos::default(), - plnx_priv: Nanos::default(), - plnx_order: Nanos::default(), - krkn_trade_30_mean: Nanos::default(), - krkn_trade_30_max: Nanos::default(), - - krkn_trade_300_mean: Nanos::default(), - krkn_trade_300_max: Nanos::default(), - - plnx_ws_count: 0, - - plnx_last: Utc::now(), - krkn_last: Utc::now(), - - size: W::default() + gdax_ws : 0, + krkn_pub : 0, + krkn_priv : 0, + plnx_pub : 0, + plnx_priv : 0, + plnx_order : 0, + krkn_trade_30_mean : 0, + krkn_trade_30_max : 0, + krkn_trade_300_mean : 0, + krkn_trade_300_max : 0, + plnx_ws_count : 0, + + plnx_last : Utc::now(), + krkn_last : Utc::now(), } } } -impl Display for LatencyUpdate - where W: MeasurementWindow -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, " gdax ws: "); - tfmt_write(self.gdax_ws, f); - write!(f, "\n krkn pub: "); - tfmt_write(self.krkn_pub, f); - write!(f, "\n krkn priv: "); - tfmt_write(self.krkn_priv, f); - - write!(f, "\n krkn trade 30 mean: "); - tfmt_write(self.krkn_trade_30_mean, f); - - write!(f, "\n krkn trade 30 max: "); - tfmt_write(self.krkn_trade_30_max, f); - - write!(f, "\n krkn trade 300 mean: "); - tfmt_write(self.krkn_trade_300_mean, f); - - write!(f, "\n krkn trade 300 max: "); - tfmt_write(self.krkn_trade_300_max, f); - - write!(f, "\n plnx pub: "); - tfmt_write(self.plnx_pub, f); - write!(f, "\n plnx priv: "); - tfmt_write(self.plnx_priv, f); - write!(f, "\n plnx orderbook loop: "); - tfmt_write(self.plnx_order, f); - - //write!(f, "\n gdax ws nolock: "); - //tfmt_write(self.gdax_ws_nolock, f); - //write!(f, "\n event loop: "); - //tfmt(self.event_loop, f); - write!(f,"") - } -} - -impl LatencyUpdate { - pub fn measurement_window(&self) -> Duration { - self.size.duration() - } -} - pub struct Manager { pub tx: Sender, pub channel: PubSub, thread: Option>, } -pub struct LatencyManager - where W: MeasurementWindow + Clone + Send + Sync -{ +pub struct LatencyManager { pub tx: Sender, - pub channel: PubSub>, + pub channel: PubSub, thread: Option>, } @@ -323,7 +209,6 @@ impl Manager { measurements: Sender) -> Self { let (tx, rx) = channel(); - let tx_copy = tx.clone(); let channel = PubSub::new(); let channel_copy = channel.clone(); let logger = file_logger(log_path, Severity::Info); @@ -336,25 +221,22 @@ impl Manager { let mut last = Last::default(); info!(logger, "entering loop"); - let mut terminate = false; let thread = Some(thread::spawn(move || { loop { let loop_time = Instant::now(); - rx.try_recv().map(|msg| { + if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1)) { debug!(logger, "rcvd {:?}", msg); match msg { - Latency::Ws(exch, ticker, dur) => { - // shortcut + Latency::Ws(_, _, dur) => { gdax_ws.update(loop_time, dur); last.gdax = loop_time; } - Latency::Trade(exch, ticker, dur) => { - //shorcut + Latency::Trade(_, ticker, dur) => { gdax_trade.update(loop_time, dur); last.gdax = loop_time; let nanos = DurationWindow::nanos(dur); @@ -362,17 +244,14 @@ impl Manager { OwnedMeasurement::new("gdax_trade_api") .add_tag("ticker", ticker.to_str()) .add_field("nanos", OwnedValue::Integer(nanos as i64)) - .set_timestamp(influx::now())); + .set_timestamp(influx::now())).unwrap(); } - Latency::Terminate => { - crit!(logger, "rcvd Terminate order"); - terminate = true; - } + Latency::Terminate => break, _ => {} } - }); + } if loop_time - last.broadcast > Duration::from_millis(100) { debug!(logger, "initalizing broadcast"); @@ -382,17 +261,13 @@ impl Manager { gdax_trade: gdax_trade.refresh(&loop_time).mean_nanos(), gdax_last: dt_from_dur(loop_time - last.gdax) }; - channel.send(update); + channel.send(update).unwrap(); last.broadcast = loop_time; debug!(logger, "sent broadcast"); - } else { - #[cfg(feature = "no-thrash")] - thread::sleep(Duration::new(0, 1000)); - } + } - if terminate { break } } - crit!(logger, "goodbye"); + debug!(logger, "latency manager terminating"); })); Manager { @@ -403,54 +278,55 @@ impl Manager { } } -impl Drop for Manager { +impl Drop for LatencyManager { fn drop(&mut self) { - self.tx.send(Latency::Terminate); + for _ in 0..100 { self.tx.send(ExperiencedLatency::Terminate).unwrap(); } if let Some(thread) = self.thread.take() { let _ = thread.join(); } } } +impl Drop for Manager { + fn drop(&mut self) { + for _ in 0..100 { self.tx.send(Latency::Terminate).unwrap(); } + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } + } +} - -//impl LatencyManager { -impl LatencyManager { - pub fn new(w: WTen) -> Self { +impl LatencyManager { + pub fn new(d: Duration) -> Self { let (tx, rx) = channel(); let tx_copy = tx.clone(); let channel = PubSub::new(); let channel_copy = channel.clone(); - let w = w.clone(); + //let w = w.clone(); let thread = Some(thread::spawn(move || { let logger = file_logger("var/log/latency-manager.log", Severity::Info); info!(logger, "initializing zmq"); - let ctx = zmq::Context::new(); - let socket = influx::push(&ctx).unwrap(); - let mut buf = String::with_capacity(4096); info!(logger, "initializing DurationWindows"); - let mut gdax_ws = DurationWindow::new(w.duration()); - let mut gdax_priv = DurationWindow::new(w.duration()); - let mut krkn_pub = DurationWindow::new(w.duration()); - let mut krkn_priv = DurationWindow::new(w.duration()); - let mut plnx_pub = DurationWindow::new(w.duration()); - let mut plnx_priv = DurationWindow::new(w.duration()); - let mut plnx_order = DurationWindow::new(w.duration()); - let mut plnx_ws_count: Window = Window::new(w.duration()); + let mut gdax_ws = DurationWindow::new(d); + let mut gdax_priv = DurationWindow::new(d); + let mut krkn_pub = DurationWindow::new(d); + let mut krkn_priv = DurationWindow::new(d); + let mut plnx_pub = DurationWindow::new(d); + let mut plnx_priv = DurationWindow::new(d); + let mut plnx_order = DurationWindow::new(d); + let mut plnx_ws_count: Window = Window::new(d); // yes I am intentionally breaking from the hard-typed duration // window ... that was a stupid idea // let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30)); let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300)); - //let mut gdax_ws_nolock = DurationWindow::new(w.duration()); - //let mut event_loop = DurationWindow::new(w.duration()); let mut last = Last::default(); - thread::sleep_ms(1); + thread::sleep(Duration::from_millis(1)); info!(logger, "entering loop"); loop { @@ -466,7 +342,7 @@ impl LatencyManager { } ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d), - //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(loop_time, d), + ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d), ExperiencedLatency::KrknHttpPublic(d) => { @@ -499,30 +375,14 @@ impl LatencyManager { plnx_ws_count.update(loop_time, 1_u32); } - ExperiencedLatency::KrknTrade(d, cmd, ticker, side) => { + ExperiencedLatency::KrknTrade(d, cmd, _, _) => { debug!(logger, "new KrknTrade"; "cmd" => cmd); last.krkn = loop_time; - let n = DurationWindow::nanos(d); krkn_trade_30.update(loop_time, d); krkn_trade_300.update(loop_time, d); - // let ticker_s = ticker.map(|t| t.to_string()).unwrap_or("".into()); - // let side_s = side.map(|s| s.to_string()).unwrap_or("".into()); - // let mut m = Measurement::new("krkn_trade_api"); - // m.add_field("nanos", Value::Integer(n as i64)); - // m.add_tag("cmd", cmd); - // if ticker.is_some() { - // m.add_tag("ticker", &ticker_s); - // } - // if side.is_some() { - // m.add_tag("side", &side_s); - // } - // m.set_timestamp(now()); - // influx::serialize(&m, &mut buf); - // socket.send_str(&buf, 0); - // buf.clear(); } - //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d), + other => { warn!(logger, "unexpected msg: {:?}", other); } @@ -539,7 +399,6 @@ impl LatencyManager { krkn_trade_300.refresh(&loop_time); let update = LatencyUpdate { gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(), - //gdax_ws_nolock: gdax_ws_nolock.refresh(&loop_time).mean_nanos(), krkn_pub: krkn_pub.refresh(&loop_time).mean_nanos(), krkn_priv: krkn_priv.refresh(&loop_time).mean_nanos(), plnx_pub: plnx_pub.refresh(&loop_time).mean_nanos(), @@ -557,10 +416,8 @@ impl LatencyManager { plnx_ws_count: plnx_ws_count.refresh(&loop_time).count() as u64, - //event_loop: event_loop.refresh(&now).mean_nanos(), - size: w.clone(), }; - channel.send(update); + channel.send(update).unwrap(); last.broadcast = loop_time; debug!(logger, "sent broadcast"); } @@ -575,9 +432,3 @@ impl LatencyManager { } } } - - - - - - diff --git a/src/lib.rs b/src/lib.rs index 4149355..f2bdb7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ #![feature(test)] #[macro_use] extern crate slog; + +#[allow(unused_imports)] #[macro_use] extern crate money; extern crate test; @@ -12,32 +14,28 @@ extern crate influent; extern crate chrono; extern crate hyper; extern crate termion; -//extern crate pub_sub; extern crate sloggers; extern crate slog_term; extern crate fnv; extern crate ordermap; extern crate decimal; extern crate uuid; -//extern crate shuteye; -//extern crate chashmap; extern crate windows; extern crate pubsub as pub_sub; -use std::sync::Arc; - use chrono::{DateTime, Utc}; +#[allow(unused_imports)] use sloggers::Build; +#[allow(unused_imports)] use sloggers::types::{Severity, TimeZone}; +#[allow(unused_imports)] use sloggers::file::FileLoggerBuilder; pub mod influx; pub mod warnings; pub mod latency; -//pub type FileLogger = slog::Logger>>; - /// converts a chrono::DateTime to an integer timestamp (ns) /// pub fn nanos(t: DateTime) -> u64 { @@ -52,9 +50,8 @@ pub fn file_logger(path: &str, level: Severity) -> slog::Logger { builder.build().unwrap() } - #[cfg(any(test, feature = "test"))] -pub fn file_logger(path: &str, level: Severity) -> slog::Logger { +pub fn file_logger(_: &str, _: Severity) -> slog::Logger { use slog::*; Logger::root(Discard, o!()) } diff --git a/src/warnings.rs b/src/warnings.rs index da3231d..2772b6e 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -3,14 +3,14 @@ use std::thread::{self, JoinHandle}; use std::sync::{Arc, Mutex, RwLock}; -use std::sync::mpsc::{self, Sender, Receiver, channel}; +use std::sync::mpsc::{Sender, channel}; use std::collections::{BTreeMap, VecDeque}; use std::fmt::{self, Display, Error as FmtError, Formatter}; -use std::io::{self, Read, Write}; +use std::io::{self, Write}; use std::fs; use zmq; -use chrono::{DateTime, Utc, TimeZone}; +use chrono::{DateTime, Utc}; use termion::color::{self, Fg, Bg}; use influent::measurement::{Measurement, Value as InfluentValue}; use slog::{self, OwnedKVList, Drain, Key, KV, Level, Logger}; @@ -188,17 +188,11 @@ impl Warning { impl Display for Warning { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - self.category(f); + self.category(f)?; write!(f, " {}", self.msg()) } } -// impl Message for Warning { -// fn kill_switch() -> Self { -// Warning::Terminate -// } -// } - #[derive(Debug, Clone)] pub struct Record { pub time: DateTime, @@ -252,7 +246,6 @@ impl Value { #[derive(Debug, Clone, PartialEq)] pub struct MeasurementRecord { fields: Vec<(Key, Value)>, - //measurement: &'a mut Measurement<'a>, tags: Vec<(Key, String)>, } @@ -276,7 +269,7 @@ impl MeasurementRecord { } other => { - self.add_field(key, Value::String(val)); + self.add_field(other, Value::String(val)).unwrap(); } } @@ -285,7 +278,7 @@ impl MeasurementRecord { pub fn serialize_values(&mut self, record: &slog::Record, values: &OwnedKVList) { let mut builder = TagBuilder { mrec: self }; - values.serialize(record, &mut builder); + let _ = values.serialize(record, &mut builder); } pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> { @@ -313,7 +306,7 @@ impl MeasurementRecord { impl slog::Serializer for MeasurementRecord { fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)); Ok(()) } + fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)) } fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } @@ -326,7 +319,7 @@ impl slog::Serializer for MeasurementRecord { fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) } fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_field(key, Value::String(val.to_string())) } fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) } - fn emit_none(&mut self, key: Key) -> SlogResult { Ok(()) } //self.add_field(key, Value::String("none".into())) } + fn emit_none(&mut self, _: Key) -> SlogResult { Ok(()) } //self.add_field(key, Value::String("none".into())) } fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_field(key, Value::String(val.to_string())) } } @@ -342,7 +335,7 @@ impl<'a> slog::Serializer for TagBuilder<'a> { } other => { - self.mrec.add_field(key, Value::String(val.to_string())) + self.mrec.add_field(other, Value::String(val.to_string())) } } } @@ -354,7 +347,7 @@ impl<'a> slog::Serializer for TagBuilder<'a> { } other => { - self.mrec.add_field(key, Value::String(val.to_string())) + self.mrec.add_field(other, Value::String(val.to_string())) } } @@ -392,7 +385,7 @@ impl Drain for WarningsDrain { if record.level() <= self.level { let mut ser = MeasurementRecord::new(); ser.serialize_values(record, values); - record.kv().serialize(record, &mut ser); + let _ = record.kv().serialize(record, &mut ser); let msg = record.msg().to_string(); if let Ok(lock) = self.tx.lock() { let _ = lock.send(Warning::Log { @@ -413,7 +406,6 @@ impl Drain for WarningsDrain { } } - #[derive(Debug)] pub struct WarningsManager { pub tx: Sender, @@ -440,11 +432,11 @@ impl WarningsManager { if let Ok(msg) = rx.recv() { match msg { Warning::Terminate => { - crit!(logger, "terminating"); + debug!(logger, "terminating"); break; } - Warning::Log { level, module, function, line, msg, kv } => { + Warning::Log { level, msg, kv, .. } => { debug!(logger, "new Warning::Debug arrived"; "msg" => &msg); let mut meas = kv.to_measurement(measurement_name); @@ -489,11 +481,12 @@ impl Drop for WarningsManager { fn drop(&mut self) { let _ = self.tx.send(Warning::Terminate); if let Some(thread) = self.thread.take() { - thread.join(); + thread.join().unwrap(); } } } +#[allow(dead_code)] pub struct ZmqDrain where D: Drain, { @@ -533,24 +526,25 @@ impl Drain for ZmqDrain fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { { let mut buf = self.buf.lock().unwrap(); - write!(buf, "{time} {level}", + let _ = write!(buf, "{time} {level}", time = Utc::now().format(TIMESTAMP_FORMAT), level = record.level().as_short_str()); { let mut thread_ser = ThreadSer(&mut buf); - record.kv().serialize(record, &mut thread_ser); - values.serialize(record, &mut thread_ser); + let _ = record.kv().serialize(record, &mut thread_ser); + let _ = values.serialize(record, &mut thread_ser); } - write!(buf, " {file:<20} {line:<5} {msg}", - file = record.file(), - line = record.line(), - msg = record.msg()); + let _ = write!(buf, " {file:<20} {line:<5} {msg}", + file = record.file(), + line = record.line(), + msg = record.msg()); { let mut kv_ser = KvSer(&mut buf); - record.kv().serialize(record, &mut kv_ser); - values.serialize(record, &mut kv_ser); + // discarding any errors here... + let _ = record.kv().serialize(record, &mut kv_ser); + let _ = values.serialize(record, &mut kv_ser); } let _ = self.socket.send(&buf, 0); @@ -563,6 +557,7 @@ impl Drain for ZmqDrain /// Can be used as a `Write` with `slog_term` and /// other libraries. /// +#[allow(dead_code)] pub struct ZmqIo { ctx: zmq::Context, socket: zmq::Socket, @@ -611,13 +606,13 @@ impl Write for ZmqIo { struct ThreadSer<'a>(&'a mut Vec); impl<'a> slog::ser::Serializer for ThreadSer<'a> { - fn emit_arguments(&mut self, key: &str, val: &fmt::Arguments) -> slog::Result { + fn emit_arguments(&mut self, _: &str, _: &fmt::Arguments) -> slog::Result { Ok(()) } fn emit_str(&mut self, key: &str, val: &str) -> slog::Result { if key == "thread" { - write!(self.0, " {:<20}", val); + write!(self.0, " {:<20}", val)?; } Ok(()) } @@ -717,6 +712,7 @@ impl<'a> slog::ser::Serializer for KvSer<'a> { } } +#[allow(unused_variables, unused_imports)] #[cfg(test)] mod tests { use super::*; @@ -735,9 +731,6 @@ mod tests { level: Level::Trace, }; let logger = slog::Logger::root(drain, o!()); - //for _ in 0..60 { - // debug!(logger, "test 123"; "exchange" => "plnx"); - //} } #[bench] @@ -765,6 +758,5 @@ mod tests { let _ = tx.lock().unwrap().send(Msg::Terminate); let len = worker.join().unwrap(); //println!("{}", len); - } }