diff --git a/Cargo.toml b/Cargo.toml index 6400892..ad109a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ termion = "1.4.0" pub-sub = "2.0" slog = "2.0.6" sloggers = "0.2" +slog-term = "2" windows = { path = "../windows" } money = { path = "../money" } diff --git a/examples/zmq-logger.rs b/examples/zmq-logger.rs new file mode 100644 index 0000000..e69f0b6 --- /dev/null +++ b/examples/zmq-logger.rs @@ -0,0 +1,35 @@ +#[macro_use] extern crate slog; +extern crate logging; +extern crate slog_term; + +use slog::*; +use logging::warnings::ZmqDrain; + +use std::io::Write; +use std::thread; +use std::time::Duration; + +fn main() { + //let term_decorator = slog_term::TermDecorator::new().build(); + //let term_drain = slog_term::CompactFormat::new(term_decorator).build().fuse(); + let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); + let plain_fuse = slog_term::FullFormat::new(plain).build().fuse(); + let w = logging::warnings::WarningsManager::new("test"); + let w_drain = logging::warnings::WarningsDrain::new(w.tx.clone(), plain_fuse); + //let zmq_drain = ZmqDrain::new(plain_fuse); + //let zmq_decorator = slog_term::PlainSyncDecorator::new(zmq_drain); + //let zmq_fuse = slog_term::FullFormat::new(zmq_decorator).build().fuse(); + let logger = Logger::root(w_drain, o!()); + //let logger = + // Logger::root(Duplicate::new(plain_fuse, zmq_fuse).fuse(), o!()); + + let mut i = 0; + + loop { + info!(logger, "hello world"; + "i" => i); + i += 1; + thread::sleep(Duration::from_secs(1)); + } + +} diff --git a/src/lib.rs b/src/lib.rs index fe66d67..d589957 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ extern crate hyper; extern crate termion; extern crate pub_sub; extern crate sloggers; +extern crate slog_term; extern crate windows; diff --git a/src/warnings.rs b/src/warnings.rs index c295a88..549b8fd 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -6,6 +6,8 @@ use std::sync::{Arc, Mutex, RwLock}; use std::sync::mpsc::{self, Sender, Receiver, channel}; use std::collections::{BTreeMap, VecDeque}; use std::fmt::{self, Display, Error as FmtError, Formatter}; +use std::io::{self, Read, Write}; +use std::fs; use zmq; use chrono::{DateTime, Utc, TimeZone}; @@ -18,7 +20,7 @@ use super::{nanos, file_logger}; use influx; -const N_WARNINGS: usize = 150; +const N_WARNINGS: usize = 500; #[macro_export] macro_rules! confirmed { @@ -333,31 +335,34 @@ pub struct WarningsDrain { drain: D } -impl WarningsDrain { - pub fn new(tx: Sender) -> Self { +impl WarningsDrain + where D: Drain +{ + pub fn new(tx: Sender, drain: D) -> Self { let tx = Arc::new(Mutex::new(tx)); - let drain = slog::Discard; WarningsDrain { tx, drain } } } +impl From> for WarningsDrain> { + fn from(tx: Sender) -> Self { + WarningsDrain::new(tx, slog::Discard.fuse()) + } +} impl Drain for WarningsDrain { type Ok = (); type Err = D::Err; fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { - //let mut meas = Measurement::new("warnings"); - //println!("{:?}", values); let mut ser = MeasurementRecord::new(); ser.serialize_values(record, values); - //values.serialize(record, &mut ser); record.kv().serialize(record, &mut ser); - //println!("{:?}", ser); let msg = record.msg().to_string(); if let Ok(lock) = self.tx.lock() { lock.send(Warning::Debug { msg, kv: ser }); } + let _ = self.drain.log(record, values)?; Ok(()) } } @@ -443,6 +448,71 @@ impl Drop for WarningsManager { } } +pub struct ZmqDrain + where D: Drain, +{ + drain: D, + ctx: zmq::Context, + socket: zmq::Socket, + buf: Vec +} + +impl ZmqDrain + where D: Drain, +{ + pub fn new(drain: D) -> Self { + let _ = fs::create_dir("/tmp/mm"); + let ctx = zmq::Context::new(); + let socket = ctx.socket(zmq::PUB).unwrap(); + socket.bind("ipc:///tmp/mm/log").expect("zmq publisher bind failed"); + let buf = Vec::with_capacity(4096); + + ZmqDrain { + drain, + ctx, + socket, + buf + } + } +} + +impl Drain for ZmqDrain + where D: Drain +{ + type Ok = D::Ok; + type Err = D::Err; + + fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { + self.drain.log(record, values) + } +} + +impl Write for ZmqDrain + where D: Drain +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buf.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + match self.buf.pop() { + Some(b'\n') => { + self.socket.send(&self.buf, 0); + } + + Some(other) => { + self.buf.push(other); + self.socket.send(&self.buf, 0); + } + + None => { + return Ok(()); + } + } + self.buf.clear(); + Ok(()) + } +} #[cfg(test)] mod tests { @@ -451,7 +521,7 @@ mod tests { #[test] fn it_creates_a_logger() { - let wm = WarningsManager::new(); + let wm = WarningsManager::new("rust-test"); let im = influx::writer(wm.tx.clone()); let drain = WarningsDrain { tx: Arc::new(Mutex::new(wm.tx.clone())), drain: slog::Discard }; let logger = slog::Logger::root(drain, o!());