|
|
@@ -13,7 +13,7 @@ use zmq; |
|
|
|
use chrono::{DateTime, Utc, TimeZone}; |
|
|
|
use termion::color::{self, Fg, Bg}; |
|
|
|
use influent::measurement::{Measurement, Value as InfluentValue}; |
|
|
|
use slog::{self, OwnedKVList, Drain, Key, KV}; |
|
|
|
use slog::{self, OwnedKVList, Drain, Key, KV, Level}; |
|
|
|
use sloggers::types::Severity; |
|
|
|
|
|
|
|
use super::{nanos, file_logger}; |
|
|
@@ -86,7 +86,11 @@ pub enum Warning { |
|
|
|
|
|
|
|
Awesome(String), |
|
|
|
|
|
|
|
Debug { |
|
|
|
Log { |
|
|
|
level: Level, |
|
|
|
module: &'static str, |
|
|
|
function: &'static str, |
|
|
|
line: u32, |
|
|
|
msg: String, |
|
|
|
kv: MeasurementRecord, |
|
|
|
}, |
|
|
@@ -100,7 +104,7 @@ impl Warning { |
|
|
|
Warning::Notice(ref s) | Warning::Error(ref s) | |
|
|
|
Warning::DegradedService(ref s) | Warning::Critical(ref s) | |
|
|
|
Warning::Awesome(ref s) | Warning::Confirmed(ref s) | |
|
|
|
Warning::Debug { msg: ref s, .. } => |
|
|
|
Warning::Log { msg: ref s, .. } => |
|
|
|
s.clone(), |
|
|
|
|
|
|
|
Warning::Terminate => "".to_owned() |
|
|
@@ -111,7 +115,7 @@ impl Warning { |
|
|
|
Warning::Notice(ref s) | Warning::Error(ref s) | |
|
|
|
Warning::DegradedService(ref s) | Warning::Critical(ref s) | |
|
|
|
Warning::Awesome(ref s) | Warning::Confirmed(ref s) | |
|
|
|
Warning::Debug { msg: ref s, .. } => |
|
|
|
Warning::Log { msg: ref s, .. } => |
|
|
|
|
|
|
|
s.as_ref(), |
|
|
|
|
|
|
@@ -121,14 +125,14 @@ impl Warning { |
|
|
|
|
|
|
|
pub fn category_str(&self) -> &str { |
|
|
|
match self { |
|
|
|
&Warning::Notice(_) => "notice", |
|
|
|
&Warning::Error(_) => "error", |
|
|
|
&Warning::Critical(_) => "critical", |
|
|
|
&Warning::DegradedService(_) => "degraded_service", |
|
|
|
&Warning::Confirmed(_) => "confirmed", |
|
|
|
&Warning::Awesome(_) => "awesome", |
|
|
|
&Warning::Debug { .. } => "debug", |
|
|
|
&Warning::Terminate => "terminate", |
|
|
|
&Warning::Notice(_) => "NOTC", |
|
|
|
&Warning::Error(_) => "ERRO", |
|
|
|
&Warning::Critical(_) => "CRIT", |
|
|
|
&Warning::DegradedService(_) => "DGRD", |
|
|
|
&Warning::Confirmed(_) => "CNFD", |
|
|
|
&Warning::Awesome(_) => "AWSM", |
|
|
|
&Warning::Log { ref level, .. } => level.as_short_str(), |
|
|
|
&Warning::Terminate => "TERM", |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -265,7 +269,16 @@ impl MeasurementRecord { |
|
|
|
} |
|
|
|
|
|
|
|
pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult { |
|
|
|
self.tags.push((key, val)); |
|
|
|
match key { |
|
|
|
"exchange" | "thread" | "ticker" | "category" => { |
|
|
|
self.tags.push((key, val)); |
|
|
|
} |
|
|
|
|
|
|
|
other => { |
|
|
|
self.add_field(key, Value::String(val)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
@@ -322,15 +335,33 @@ pub struct TagBuilder<'a> { |
|
|
|
|
|
|
|
impl<'a> slog::Serializer for TagBuilder<'a> { |
|
|
|
fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { |
|
|
|
self.mrec.add_tag(key, val.to_string()) |
|
|
|
match key { |
|
|
|
"exchange" | "thread" | "ticker" | "category" => { |
|
|
|
self.mrec.add_tag(key, val.to_string()) |
|
|
|
} |
|
|
|
|
|
|
|
other => { |
|
|
|
self.mrec.add_field(key, Value::String(val.to_string())) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { |
|
|
|
self.mrec.add_tag(key, val.to_string()) |
|
|
|
match key { |
|
|
|
"exchange" | "thread" | "ticker" | "category" => { |
|
|
|
self.mrec.add_tag(key, val.to_string()) |
|
|
|
} |
|
|
|
|
|
|
|
other => { |
|
|
|
self.mrec.add_field(key, Value::String(val.to_string())) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
pub struct WarningsDrain<D: Drain> { |
|
|
|
level: Level, |
|
|
|
tx: Arc<Mutex<Sender<Warning>>>, |
|
|
|
drain: D |
|
|
|
} |
|
|
@@ -338,15 +369,15 @@ pub struct WarningsDrain<D: Drain> { |
|
|
|
impl<D> WarningsDrain<D> |
|
|
|
where D: Drain |
|
|
|
{ |
|
|
|
pub fn new(tx: Sender<Warning>, drain: D) -> Self { |
|
|
|
pub fn new(tx: Sender<Warning>, level: Level, drain: D) -> Self { |
|
|
|
let tx = Arc::new(Mutex::new(tx)); |
|
|
|
WarningsDrain { tx, drain } |
|
|
|
WarningsDrain { tx, drain, level } |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
impl From<Sender<Warning>> for WarningsDrain<slog::Fuse<slog::Discard>> { |
|
|
|
fn from(tx: Sender<Warning>) -> Self { |
|
|
|
WarningsDrain::new(tx, slog::Discard.fuse()) |
|
|
|
WarningsDrain::new(tx, Level::Debug, slog::Discard.fuse()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -355,12 +386,21 @@ impl<D: Drain> Drain for WarningsDrain<D> { |
|
|
|
type Err = D::Err; |
|
|
|
|
|
|
|
fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> { |
|
|
|
let mut ser = MeasurementRecord::new(); |
|
|
|
ser.serialize_values(record, values); |
|
|
|
record.kv().serialize(record, &mut ser); |
|
|
|
let msg = record.msg().to_string(); |
|
|
|
if let Ok(lock) = self.tx.lock() { |
|
|
|
lock.send(Warning::Debug { msg, kv: ser }); |
|
|
|
if record.level() <= self.level { |
|
|
|
let mut ser = MeasurementRecord::new(); |
|
|
|
ser.serialize_values(record, values); |
|
|
|
record.kv().serialize(record, &mut ser); |
|
|
|
let msg = record.msg().to_string(); |
|
|
|
if let Ok(lock) = self.tx.lock() { |
|
|
|
lock.send(Warning::Log { |
|
|
|
level: record.level(), |
|
|
|
module: record.module(), |
|
|
|
function: record.function(), |
|
|
|
line: record.line(), |
|
|
|
msg, |
|
|
|
kv: ser |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
let _ = self.drain.log(record, values)?; |
|
|
|
Ok(()) |
|
|
@@ -398,12 +438,12 @@ impl WarningsManager { |
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
Warning::Debug { msg, kv } => { |
|
|
|
Warning::Log { level, module, function, line, msg, kv } => { |
|
|
|
debug!(logger, "new Warning::Debug arrived"; |
|
|
|
"msg" => &msg); |
|
|
|
let mut meas = kv.to_measurement(measurement_name); |
|
|
|
meas.add_field("msg", InfluentValue::String(msg.as_ref())); |
|
|
|
meas.add_tag("category", "debug"); |
|
|
|
meas.add_tag("category", level.as_short_str()); |
|
|
|
influx::serialize(&meas, &mut buf); |
|
|
|
socket.send_str(&buf, 0); |
|
|
|
buf.clear(); |
|
|
|