|
@@ -6,6 +6,8 @@ use std::sync::{Arc, Mutex, RwLock}; |
|
|
use std::sync::mpsc::{self, Sender, Receiver, channel}; |
|
|
use std::sync::mpsc::{self, Sender, Receiver, channel}; |
|
|
use std::collections::{BTreeMap, VecDeque}; |
|
|
use std::collections::{BTreeMap, VecDeque}; |
|
|
use std::fmt::{self, Display, Error as FmtError, Formatter}; |
|
|
use std::fmt::{self, Display, Error as FmtError, Formatter}; |
|
|
|
|
|
use std::io::{self, Read, Write}; |
|
|
|
|
|
use std::fs; |
|
|
|
|
|
|
|
|
use zmq; |
|
|
use zmq; |
|
|
use chrono::{DateTime, Utc, TimeZone}; |
|
|
use chrono::{DateTime, Utc, TimeZone}; |
|
@@ -18,7 +20,7 @@ use super::{nanos, file_logger}; |
|
|
use influx; |
|
|
use influx; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const N_WARNINGS: usize = 150; |
|
|
|
|
|
|
|
|
const N_WARNINGS: usize = 500; |
|
|
|
|
|
|
|
|
#[macro_export] |
|
|
#[macro_export] |
|
|
macro_rules! confirmed { |
|
|
macro_rules! confirmed { |
|
@@ -333,31 +335,34 @@ pub struct WarningsDrain<D: Drain> { |
|
|
drain: D |
|
|
drain: D |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
impl WarningsDrain<slog::Discard> { |
|
|
|
|
|
pub fn new(tx: Sender<Warning>) -> Self { |
|
|
|
|
|
|
|
|
impl<D> WarningsDrain<D> |
|
|
|
|
|
where D: Drain |
|
|
|
|
|
{ |
|
|
|
|
|
pub fn new(tx: Sender<Warning>, drain: D) -> Self { |
|
|
let tx = Arc::new(Mutex::new(tx)); |
|
|
let tx = Arc::new(Mutex::new(tx)); |
|
|
let drain = slog::Discard; |
|
|
|
|
|
WarningsDrain { tx, drain } |
|
|
WarningsDrain { tx, drain } |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl From<Sender<Warning>> for WarningsDrain<slog::Fuse<slog::Discard>> { |
|
|
|
|
|
fn from(tx: Sender<Warning>) -> Self { |
|
|
|
|
|
WarningsDrain::new(tx, slog::Discard.fuse()) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
impl<D: Drain> Drain for WarningsDrain<D> { |
|
|
impl<D: Drain> Drain for WarningsDrain<D> { |
|
|
type Ok = (); |
|
|
type Ok = (); |
|
|
type Err = D::Err; |
|
|
type Err = D::Err; |
|
|
|
|
|
|
|
|
fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> { |
|
|
fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> { |
|
|
//let mut meas = Measurement::new("warnings"); |
|
|
|
|
|
//println!("{:?}", values); |
|
|
|
|
|
let mut ser = MeasurementRecord::new(); |
|
|
let mut ser = MeasurementRecord::new(); |
|
|
ser.serialize_values(record, values); |
|
|
ser.serialize_values(record, values); |
|
|
//values.serialize(record, &mut ser); |
|
|
|
|
|
record.kv().serialize(record, &mut ser); |
|
|
record.kv().serialize(record, &mut ser); |
|
|
//println!("{:?}", ser); |
|
|
|
|
|
let msg = record.msg().to_string(); |
|
|
let msg = record.msg().to_string(); |
|
|
if let Ok(lock) = self.tx.lock() { |
|
|
if let Ok(lock) = self.tx.lock() { |
|
|
lock.send(Warning::Debug { msg, kv: ser }); |
|
|
lock.send(Warning::Debug { msg, kv: ser }); |
|
|
} |
|
|
} |
|
|
|
|
|
let _ = self.drain.log(record, values)?; |
|
|
Ok(()) |
|
|
Ok(()) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@@ -443,6 +448,71 @@ impl Drop for WarningsManager { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub struct ZmqDrain<D> |
|
|
|
|
|
where D: Drain, |
|
|
|
|
|
{ |
|
|
|
|
|
drain: D, |
|
|
|
|
|
ctx: zmq::Context, |
|
|
|
|
|
socket: zmq::Socket, |
|
|
|
|
|
buf: Vec<u8> |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl<D> ZmqDrain<D> |
|
|
|
|
|
where D: Drain, |
|
|
|
|
|
{ |
|
|
|
|
|
pub fn new(drain: D) -> Self { |
|
|
|
|
|
let _ = fs::create_dir("/tmp/mm"); |
|
|
|
|
|
let ctx = zmq::Context::new(); |
|
|
|
|
|
let socket = ctx.socket(zmq::PUB).unwrap(); |
|
|
|
|
|
socket.bind("ipc:///tmp/mm/log").expect("zmq publisher bind failed"); |
|
|
|
|
|
let buf = Vec::with_capacity(4096); |
|
|
|
|
|
|
|
|
|
|
|
ZmqDrain { |
|
|
|
|
|
drain, |
|
|
|
|
|
ctx, |
|
|
|
|
|
socket, |
|
|
|
|
|
buf |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl<D> Drain for ZmqDrain<D> |
|
|
|
|
|
where D: Drain |
|
|
|
|
|
{ |
|
|
|
|
|
type Ok = D::Ok; |
|
|
|
|
|
type Err = D::Err; |
|
|
|
|
|
|
|
|
|
|
|
fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> { |
|
|
|
|
|
self.drain.log(record, values) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl<D> Write for ZmqDrain<D> |
|
|
|
|
|
where D: Drain |
|
|
|
|
|
{ |
|
|
|
|
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
|
|
|
|
|
self.buf.write(buf) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn flush(&mut self) -> io::Result<()> { |
|
|
|
|
|
match self.buf.pop() { |
|
|
|
|
|
Some(b'\n') => { |
|
|
|
|
|
self.socket.send(&self.buf, 0); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Some(other) => { |
|
|
|
|
|
self.buf.push(other); |
|
|
|
|
|
self.socket.send(&self.buf, 0); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
None => { |
|
|
|
|
|
return Ok(()); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
self.buf.clear(); |
|
|
|
|
|
Ok(()) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
#[cfg(test)] |
|
|
#[cfg(test)] |
|
|
mod tests { |
|
|
mod tests { |
|
@@ -451,7 +521,7 @@ mod tests { |
|
|
|
|
|
|
|
|
#[test] |
|
|
#[test] |
|
|
fn it_creates_a_logger() { |
|
|
fn it_creates_a_logger() { |
|
|
let wm = WarningsManager::new(); |
|
|
|
|
|
|
|
|
let wm = WarningsManager::new("rust-test"); |
|
|
let im = influx::writer(wm.tx.clone()); |
|
|
let im = influx::writer(wm.tx.clone()); |
|
|
let drain = WarningsDrain { tx: Arc::new(Mutex::new(wm.tx.clone())), drain: slog::Discard }; |
|
|
let drain = WarningsDrain { tx: Arc::new(Mutex::new(wm.tx.clone())), drain: slog::Discard }; |
|
|
let logger = slog::Logger::root(drain, o!()); |
|
|
let logger = slog::Logger::root(drain, o!()); |
|
|