diff --git a/Cargo.toml b/Cargo.toml index 157e2d8..93cc15a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,5 +9,7 @@ influent = "0.4" chrono = { version = "0.4", features = ["serde"] } hyper = "0.10" termion = "1.4.0" -windows = { path = "../windows" } pub-sub = "2.0" +slog = "2.0.6" + +windows = { path = "../windows" } diff --git a/src/influx.rs b/src/influx.rs index 7784577..4f7bf6b 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -131,7 +131,6 @@ pub fn serialize(measurement: &Measurement, line: &mut String) { } - pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { let ctx = zmq::Context::new(); @@ -157,7 +156,7 @@ pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { buf.push_str(&msg); 1 } - n @ 1...20 => { + n @ 1...40 => { buf.push_str("\n"); buf.push_str(&msg); n + 1 diff --git a/src/lib.rs b/src/lib.rs index 20fa5de..1a29e84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,11 @@ //! Tools to record and display what's happening in your program //! +#![feature(test)] + +#[macro_use] extern crate slog; + +extern crate test; extern crate zmq; extern crate influent; extern crate chrono; diff --git a/src/warnings.rs b/src/warnings.rs index 5625ffa..682b4a4 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -4,17 +4,19 @@ use std::thread::{self, JoinHandle}; use std::sync::{Arc, Mutex, RwLock}; use std::sync::mpsc::{self, Sender, Receiver, channel}; -use std::collections::VecDeque; -use std::fmt::{Display, Error as FmtError, Formatter}; +use std::collections::{BTreeMap, VecDeque}; +use std::fmt::{self, Display, Error as FmtError, Formatter}; use zmq; use chrono::{DateTime, Utc, TimeZone}; use termion::color::{self, Fg, Bg}; -use influent::measurement::{Measurement, Value}; +use influent::measurement::{Measurement, Value as InfluentValue}; +use slog::{self, OwnedKVList, Drain, Key, KV}; use super::nanos; use influx; + const N_WARNINGS: usize = 150; #[macro_export] @@ -81,6 +83,11 @@ pub enum Warning { Awesome(String), + Debug { + msg: String, + kv: MeasurementRecord, + }, + Terminate } @@ -89,7 +96,8 @@ impl Warning { match *self { Warning::Notice(ref s) | Warning::Error(ref s) | Warning::DegradedService(ref s) | Warning::Critical(ref s) | - Warning::Awesome(ref s) | Warning::Confirmed(ref s) => + Warning::Awesome(ref s) | Warning::Confirmed(ref s) | + Warning::Debug { msg: ref s, .. } => s.clone(), Warning::Terminate => "".to_owned() @@ -99,7 +107,9 @@ impl Warning { match *self { Warning::Notice(ref s) | Warning::Error(ref s) | Warning::DegradedService(ref s) | Warning::Critical(ref s) | - Warning::Awesome(ref s) | Warning::Confirmed(ref s) => + Warning::Awesome(ref s) | Warning::Confirmed(ref s) | + Warning::Debug { msg: ref s, .. } => + s.as_ref(), Warning::Terminate => "Terminate" @@ -114,6 +124,7 @@ impl Warning { &Warning::DegradedService(_) => "degraded_service", &Warning::Confirmed(_) => "confirmed", &Warning::Awesome(_) => "awesome", + &Warning::Debug { .. } => "debug", &Warning::Terminate => "terminate", } } @@ -197,7 +208,7 @@ impl Record { let body = self.msg.msg_str(); let mut m = Measurement::new("warnings"); m.add_tag("category", cat); - m.add_field("msg", Value::String(body)); + m.add_field("msg", InfluentValue::String(body)); m.set_timestamp(nanos(self.time) as i64); m } @@ -209,6 +220,130 @@ impl Display for Record { } } +pub type SlogResult = Result<(), slog::Error>; + +#[derive(Debug, Clone, PartialEq)] +pub enum Value { + String(String), + Float(f64), + Integer(i64), + Boolean(bool) +} + +impl Value { + pub fn to_influent<'a>(&'a self) -> InfluentValue<'a> { + match self { + &Value::String(ref s) => InfluentValue::String(s), + &Value::Float(n) => InfluentValue::Float(n), + &Value::Integer(i) => InfluentValue::Integer(i), + &Value::Boolean(b) => InfluentValue::Boolean(b), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct MeasurementRecord { + fields: Vec<(Key, Value)>, + //measurement: &'a mut Measurement<'a>, + tags: Vec<(Key, String)>, +} + +impl MeasurementRecord { + pub fn new() -> Self { + MeasurementRecord { + fields: Vec::new(), + tags: Vec::new(), + } + } + + pub fn add_field(&mut self, key: Key, val: Value) -> SlogResult { + self.fields.push((key, val)); + Ok(()) + } + + pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult { + self.tags.push((key, val)); + Ok(()) + } + + pub fn meas<'a>(&'a self) -> Measurement<'a> { + let fields: BTreeMap<&'a str, InfluentValue<'a>> = + self.fields.iter() + .map(|&(k, ref v)| { + (k, v.to_influent()) + }).collect(); + + let tags: BTreeMap<&'a str, &'a str> = + self.tags.iter() + .map(|&(k, ref v)| { + (k, v.as_ref()) + }).collect(); + + Measurement { + key: "warnings", + timestamp: Some(nanos(Utc::now()) as i64), + fields, + tags, + } + + } +} + +impl slog::Serializer for MeasurementRecord { + fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)); Ok(()) } + //fn emit_char(&mut self, key: Key, val: char) -> SlogResult { seld(key, Value::String(val.into())); Ok(()) } + fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_i16(&mut self, key: Key, val: i16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_u32(&mut self, key: Key, val: u32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_i32(&mut self, key: Key, val: i32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_f32(&mut self, key: Key, val: f32) -> SlogResult { self.add_field(key, Value::Float(val as f64)) } + fn emit_u64(&mut self, key: Key, val: u64) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } + fn emit_i64(&mut self, key: Key, val: i64) -> SlogResult { self.add_field(key, Value::Integer(val)) } + fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) } + fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_tag(key, val.to_string()) } + fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) } + fn emit_none(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::String("none".into())) } + fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_tag(key, val.to_string()) } +} + +pub struct WarningsDrain { + tx: Arc>>, + drain: D +} + +impl WarningsDrain { + pub fn new(tx: Sender) -> Self { + let tx = Arc::new(Mutex::new(tx)); + let drain = slog::Discard; + WarningsDrain { tx, drain } + } +} + + +impl Drain for WarningsDrain { + type Ok = (); + type Err = D::Err; + + fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { + //let mut meas = Measurement::new("warnings"); + //println!("{:?}", values); + let mut ser = MeasurementRecord::new(); + //values.serialize(record, &mut ser); + record.kv().serialize(record, &mut ser); + //println!("{:?}", ser); + let msg = record.msg().to_string(); + if let Ok(lock) = self.tx.lock() { + lock.send(Warning::Debug { msg, kv: ser }); + } + Ok(()) + } +} + + #[derive(Debug)] pub struct WarningsManager { pub tx: Sender, @@ -231,6 +366,17 @@ impl WarningsManager { println!("warnings manager terminating"); break; } + + Warning::Debug { msg, kv } => { + let mut meas = kv.meas(); + meas.add_field("msg", InfluentValue::String(msg.as_ref())); + meas.add_tag("category", "debug"); + influx::serialize(&meas, &mut buf); + socket.send_str(&buf, 0); + buf.clear(); + // and don't push to warnings + // bc it's debug + } other => { let rec = Record::new(other); { @@ -266,4 +412,46 @@ impl Drop for WarningsManager { } +mod tests { + use super::*; + use test::{black_box, Bencher}; + #[test] + fn it_creates_a_logger() { + let wm = WarningsManager::new(); + let im = influx::writer(wm.tx.clone()); + let drain = WarningsDrain { tx: Arc::new(Mutex::new(wm.tx.clone())), drain: slog::Discard }; + let logger = slog::Logger::root(drain, o!("build" => "0.1.3")); + //for _ in 0..60 { + // debug!(logger, "test 123"; "exchange" => "plnx"); + //} + } + + #[bench] + fn it_sends_integers_with_a_sender_behind_a_mutex(b: &mut Bencher) { + let (tx, rx) = channel(); + enum Msg { + Val(usize), + Terminate + } + let worker = thread::spawn(move || { + let mut xs = Vec::new(); + loop { + match rx.recv().unwrap() { + Msg::Val(x) => { xs.push(x); } + Msg::Terminate => break, + } + } + xs.len() + }); + let tx = Arc::new(Mutex::new(tx)); + b.iter(|| { + let lock = tx.lock().unwrap(); + lock.send(Msg::Val(1)); + }); + tx.lock().unwrap().send(Msg::Terminate); + let len = worker.join().unwrap(); + println!("{}", len); + + } +}