From ca9d21f9192d343cf085e19ac6050feeb712e07a Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 20 Sep 2017 18:28:26 -0400 Subject: [PATCH] ZmqDrain is really a drain, and now there's ZmqIo too! --- src/warnings.rs | 167 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 162 insertions(+), 5 deletions(-) diff --git a/src/warnings.rs b/src/warnings.rs index 902a0f7..6c6cab0 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -494,7 +494,7 @@ pub struct ZmqDrain drain: D, ctx: zmq::Context, socket: zmq::Socket, - buf: Vec + buf: Arc>> } impl ZmqDrain @@ -505,7 +505,7 @@ impl ZmqDrain let ctx = zmq::Context::new(); let socket = ctx.socket(zmq::PUB).unwrap(); socket.bind("ipc:///tmp/mm/log").expect("zmq publisher bind failed"); - let buf = Vec::with_capacity(4096); + let buf = Arc::new(Mutex::new(Vec::with_capacity(4096))); ZmqDrain { drain, @@ -516,6 +516,8 @@ impl ZmqDrain } } +const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f"; + impl Drain for ZmqDrain where D: Drain { @@ -523,13 +525,57 @@ impl Drain for ZmqDrain type Err = D::Err; fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { + { + let mut buf = self.buf.lock().unwrap(); + write!(buf, "{time} {level}", + time = Utc::now().format(TIMESTAMP_FORMAT), + level = record.level().as_short_str()); + { + let mut thread_ser = ThreadSer(&mut buf); + record.kv().serialize(record, &mut thread_ser); + values.serialize(record, &mut thread_ser); + } + + write!(buf, " {file:<20} {line:<5} {msg}", + file = record.file(), + line = record.line(), + msg = record.msg()); + + { + let mut kv_ser = KvSer(&mut buf); + record.kv().serialize(record, &mut kv_ser); + values.serialize(record, &mut kv_ser); + } + + self.socket.send(&buf, 0); + buf.clear(); + } self.drain.log(record, values) } } -impl Write for ZmqDrain - where D: Drain -{ +/// Can be used as a `Write` with `slog_term` and +/// other libraries. +/// +pub struct ZmqIo { + ctx: zmq::Context, + socket: zmq::Socket, + buf: Vec +} + +impl ZmqIo { + pub fn new(addr: &str) -> Self { + let _ = fs::create_dir("/tmp/mm"); + let ctx = zmq::Context::new(); + let socket = ctx.socket(zmq::PUB).unwrap(); + let addr = format!("ipc:///tmp/mm/{}", addr); + socket.bind(&addr).expect("zmq publisher bind failed"); + let buf = Vec::with_capacity(4096); + ZmqIo { ctx, socket, buf } + } +} + +impl Write for ZmqIo { fn write(&mut self, buf: &[u8]) -> io::Result { self.buf.write(buf) } @@ -554,6 +600,117 @@ impl Write for ZmqDrain } } +/// Serializes *only* KV pair with `key == "thread"` +/// +struct ThreadSer<'a>(&'a mut Vec); + +impl<'a> slog::ser::Serializer for ThreadSer<'a> { + fn emit_arguments(&mut self, key: &str, val: &fmt::Arguments) -> slog::Result { + Ok(()) + } + + fn emit_str(&mut self, key: &str, val: &str) -> slog::Result { + if key == "thread" { + write!(self.0, " {:<20}", val); + } + Ok(()) + } +} + + +/// Serializes KV pairs as ", k: v" +/// +struct KvSer<'a>(&'a mut Vec); + +macro_rules! s( + ($s:expr, $k:expr, $v:expr) => { + try!(write!($s.0, ", {}: {}", $k, $v)); + }; +); + +impl<'a> slog::ser::Serializer for KvSer<'a> { + fn emit_none(&mut self, key: &str) -> slog::Result { + s!(self, key, "None"); + Ok(()) + } + fn emit_unit(&mut self, key: &str) -> slog::Result { + s!(self, key, "()"); + Ok(()) + } + + fn emit_bool(&mut self, key: &str, val: bool) -> slog::Result { + s!(self, key, val); + Ok(()) + } + + fn emit_char(&mut self, key: &str, val: char) -> slog::Result { + s!(self, key, val); + Ok(()) + } + + fn emit_usize(&mut self, key: &str, val: usize) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_isize(&mut self, key: &str, val: isize) -> slog::Result { + s!(self, key, val); + Ok(()) + } + + fn emit_u8(&mut self, key: &str, val: u8) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_i8(&mut self, key: &str, val: i8) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_u16(&mut self, key: &str, val: u16) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_i16(&mut self, key: &str, val: i16) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_u32(&mut self, key: &str, val: u32) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_i32(&mut self, key: &str, val: i32) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_f32(&mut self, key: &str, val: f32) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_u64(&mut self, key: &str, val: u64) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_i64(&mut self, key: &str, val: i64) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_f64(&mut self, key: &str, val: f64) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_str(&mut self, key: &str, val: &str) -> slog::Result { + s!(self, key, val); + Ok(()) + } + fn emit_arguments( + &mut self, + key: &str, + val: &fmt::Arguments, + ) -> slog::Result { + s!(self, key, val); + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*;