diff --git a/src/warnings.rs b/src/warnings.rs index 549b8fd..902a0f7 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -13,7 +13,7 @@ use zmq; use chrono::{DateTime, Utc, TimeZone}; use termion::color::{self, Fg, Bg}; use influent::measurement::{Measurement, Value as InfluentValue}; -use slog::{self, OwnedKVList, Drain, Key, KV}; +use slog::{self, OwnedKVList, Drain, Key, KV, Level}; use sloggers::types::Severity; use super::{nanos, file_logger}; @@ -86,7 +86,11 @@ pub enum Warning { Awesome(String), - Debug { + Log { + level: Level, + module: &'static str, + function: &'static str, + line: u32, msg: String, kv: MeasurementRecord, }, @@ -100,7 +104,7 @@ impl Warning { Warning::Notice(ref s) | Warning::Error(ref s) | Warning::DegradedService(ref s) | Warning::Critical(ref s) | Warning::Awesome(ref s) | Warning::Confirmed(ref s) | - Warning::Debug { msg: ref s, .. } => + Warning::Log { msg: ref s, .. } => s.clone(), Warning::Terminate => "".to_owned() @@ -111,7 +115,7 @@ impl Warning { Warning::Notice(ref s) | Warning::Error(ref s) | Warning::DegradedService(ref s) | Warning::Critical(ref s) | Warning::Awesome(ref s) | Warning::Confirmed(ref s) | - Warning::Debug { msg: ref s, .. } => + Warning::Log { msg: ref s, .. } => s.as_ref(), @@ -121,14 +125,14 @@ impl Warning { pub fn category_str(&self) -> &str { match self { - &Warning::Notice(_) => "notice", - &Warning::Error(_) => "error", - &Warning::Critical(_) => "critical", - &Warning::DegradedService(_) => "degraded_service", - &Warning::Confirmed(_) => "confirmed", - &Warning::Awesome(_) => "awesome", - &Warning::Debug { .. } => "debug", - &Warning::Terminate => "terminate", + &Warning::Notice(_) => "NOTC", + &Warning::Error(_) => "ERRO", + &Warning::Critical(_) => "CRIT", + &Warning::DegradedService(_) => "DGRD", + &Warning::Confirmed(_) => "CNFD", + &Warning::Awesome(_) => "AWSM", + &Warning::Log { ref level, .. } => level.as_short_str(), + &Warning::Terminate => "TERM", } } @@ -265,7 +269,16 @@ impl MeasurementRecord { } pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult { - self.tags.push((key, val)); + match key { + "exchange" | "thread" | "ticker" | "category" => { + self.tags.push((key, val)); + } + + other => { + self.add_field(key, Value::String(val)); + } + } + Ok(()) } @@ -322,15 +335,33 @@ pub struct TagBuilder<'a> { impl<'a> slog::Serializer for TagBuilder<'a> { fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { - self.mrec.add_tag(key, val.to_string()) + match key { + "exchange" | "thread" | "ticker" | "category" => { + self.mrec.add_tag(key, val.to_string()) + } + + other => { + self.mrec.add_field(key, Value::String(val.to_string())) + } + } } fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { - self.mrec.add_tag(key, val.to_string()) + match key { + "exchange" | "thread" | "ticker" | "category" => { + self.mrec.add_tag(key, val.to_string()) + } + + other => { + self.mrec.add_field(key, Value::String(val.to_string())) + } + } + } } pub struct WarningsDrain { + level: Level, tx: Arc>>, drain: D } @@ -338,15 +369,15 @@ pub struct WarningsDrain { impl WarningsDrain where D: Drain { - pub fn new(tx: Sender, drain: D) -> Self { + pub fn new(tx: Sender, level: Level, drain: D) -> Self { let tx = Arc::new(Mutex::new(tx)); - WarningsDrain { tx, drain } + WarningsDrain { tx, drain, level } } } impl From> for WarningsDrain> { fn from(tx: Sender) -> Self { - WarningsDrain::new(tx, slog::Discard.fuse()) + WarningsDrain::new(tx, Level::Debug, slog::Discard.fuse()) } } @@ -355,12 +386,21 @@ impl Drain for WarningsDrain { type Err = D::Err; fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { - let mut ser = MeasurementRecord::new(); - ser.serialize_values(record, values); - record.kv().serialize(record, &mut ser); - let msg = record.msg().to_string(); - if let Ok(lock) = self.tx.lock() { - lock.send(Warning::Debug { msg, kv: ser }); + if record.level() <= self.level { + let mut ser = MeasurementRecord::new(); + ser.serialize_values(record, values); + record.kv().serialize(record, &mut ser); + let msg = record.msg().to_string(); + if let Ok(lock) = self.tx.lock() { + lock.send(Warning::Log { + level: record.level(), + module: record.module(), + function: record.function(), + line: record.line(), + msg, + kv: ser + }); + } } let _ = self.drain.log(record, values)?; Ok(()) @@ -398,12 +438,12 @@ impl WarningsManager { break; } - Warning::Debug { msg, kv } => { + Warning::Log { level, module, function, line, msg, kv } => { debug!(logger, "new Warning::Debug arrived"; "msg" => &msg); let mut meas = kv.to_measurement(measurement_name); meas.add_field("msg", InfluentValue::String(msg.as_ref())); - meas.add_tag("category", "debug"); + meas.add_tag("category", level.as_short_str()); influx::serialize(&meas, &mut buf); socket.send_str(&buf, 0); buf.clear();