Browse Source

adapts Warning to slog crate

master
Jonathan Strong 7 years ago
parent
commit
7688ab20b4
4 changed files with 203 additions and 9 deletions
  1. +3
    -1
      Cargo.toml
  2. +1
    -2
      src/influx.rs
  3. +5
    -0
      src/lib.rs
  4. +194
    -6
      src/warnings.rs

+ 3
- 1
Cargo.toml View File

@@ -9,5 +9,7 @@ influent = "0.4"
chrono = { version = "0.4", features = ["serde"] }
hyper = "0.10"
termion = "1.4.0"
windows = { path = "../windows" }
pub-sub = "2.0"
slog = "2.0.6"

windows = { path = "../windows" }

+ 1
- 2
src/influx.rs View File

@@ -131,7 +131,6 @@ pub fn serialize(measurement: &Measurement, line: &mut String) {
}



pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {

let ctx = zmq::Context::new();
@@ -157,7 +156,7 @@ pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
buf.push_str(&msg);
1
}
n @ 1...20 => {
n @ 1...40 => {
buf.push_str("\n");
buf.push_str(&msg);
n + 1


+ 5
- 0
src/lib.rs View File

@@ -1,6 +1,11 @@
//! Tools to record and display what's happening in your program
//!

#![feature(test)]

#[macro_use] extern crate slog;

extern crate test;
extern crate zmq;
extern crate influent;
extern crate chrono;


+ 194
- 6
src/warnings.rs View File

@@ -4,17 +4,19 @@
use std::thread::{self, JoinHandle};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{self, Sender, Receiver, channel};
use std::collections::VecDeque;
use std::fmt::{Display, Error as FmtError, Formatter};
use std::collections::{BTreeMap, VecDeque};
use std::fmt::{self, Display, Error as FmtError, Formatter};

use zmq;
use chrono::{DateTime, Utc, TimeZone};
use termion::color::{self, Fg, Bg};
use influent::measurement::{Measurement, Value};
use influent::measurement::{Measurement, Value as InfluentValue};
use slog::{self, OwnedKVList, Drain, Key, KV};

use super::nanos;
use influx;


const N_WARNINGS: usize = 150;

#[macro_export]
@@ -81,6 +83,11 @@ pub enum Warning {

Awesome(String),

Debug {
msg: String,
kv: MeasurementRecord,
},

Terminate
}

@@ -89,7 +96,8 @@ impl Warning {
match *self {
Warning::Notice(ref s) | Warning::Error(ref s) |
Warning::DegradedService(ref s) | Warning::Critical(ref s) |
Warning::Awesome(ref s) | Warning::Confirmed(ref s) =>
Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
Warning::Debug { msg: ref s, .. } =>
s.clone(),

Warning::Terminate => "".to_owned()
@@ -99,7 +107,9 @@ impl Warning {
match *self {
Warning::Notice(ref s) | Warning::Error(ref s) |
Warning::DegradedService(ref s) | Warning::Critical(ref s) |
Warning::Awesome(ref s) | Warning::Confirmed(ref s) =>
Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
Warning::Debug { msg: ref s, .. } =>

s.as_ref(),

Warning::Terminate => "Terminate"
@@ -114,6 +124,7 @@ impl Warning {
&Warning::DegradedService(_) => "degraded_service",
&Warning::Confirmed(_) => "confirmed",
&Warning::Awesome(_) => "awesome",
&Warning::Debug { .. } => "debug",
&Warning::Terminate => "terminate",
}
}
@@ -197,7 +208,7 @@ impl Record {
let body = self.msg.msg_str();
let mut m = Measurement::new("warnings");
m.add_tag("category", cat);
m.add_field("msg", Value::String(body));
m.add_field("msg", InfluentValue::String(body));
m.set_timestamp(nanos(self.time) as i64);
m
}
@@ -209,6 +220,130 @@ impl Display for Record {
}
}

pub type SlogResult = Result<(), slog::Error>;

#[derive(Debug, Clone, PartialEq)]
pub enum Value {
String(String),
Float(f64),
Integer(i64),
Boolean(bool)
}

impl Value {
pub fn to_influent<'a>(&'a self) -> InfluentValue<'a> {
match self {
&Value::String(ref s) => InfluentValue::String(s),
&Value::Float(n) => InfluentValue::Float(n),
&Value::Integer(i) => InfluentValue::Integer(i),
&Value::Boolean(b) => InfluentValue::Boolean(b),
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct MeasurementRecord {
fields: Vec<(Key, Value)>,
//measurement: &'a mut Measurement<'a>,
tags: Vec<(Key, String)>,
}

impl MeasurementRecord {
pub fn new() -> Self {
MeasurementRecord {
fields: Vec::new(),
tags: Vec::new(),
}
}

pub fn add_field(&mut self, key: Key, val: Value) -> SlogResult {
self.fields.push((key, val));
Ok(())
}

pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult {
self.tags.push((key, val));
Ok(())
}

pub fn meas<'a>(&'a self) -> Measurement<'a> {
let fields: BTreeMap<&'a str, InfluentValue<'a>> =
self.fields.iter()
.map(|&(k, ref v)| {
(k, v.to_influent())
}).collect();

let tags: BTreeMap<&'a str, &'a str> =
self.tags.iter()
.map(|&(k, ref v)| {
(k, v.as_ref())
}).collect();

Measurement {
key: "warnings",
timestamp: Some(nanos(Utc::now()) as i64),
fields,
tags,
}

}
}

impl slog::Serializer for MeasurementRecord {
fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)); Ok(()) }
//fn emit_char(&mut self, key: Key, val: char) -> SlogResult { seld(key, Value::String(val.into())); Ok(()) }
fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_i16(&mut self, key: Key, val: i16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_u32(&mut self, key: Key, val: u32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_i32(&mut self, key: Key, val: i32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_f32(&mut self, key: Key, val: f32) -> SlogResult { self.add_field(key, Value::Float(val as f64)) }
fn emit_u64(&mut self, key: Key, val: u64) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
fn emit_i64(&mut self, key: Key, val: i64) -> SlogResult { self.add_field(key, Value::Integer(val)) }
fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) }
fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_tag(key, val.to_string()) }
fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) }
fn emit_none(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::String("none".into())) }
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_tag(key, val.to_string()) }
}

pub struct WarningsDrain<D: Drain> {
tx: Arc<Mutex<Sender<Warning>>>,
drain: D
}

impl WarningsDrain<slog::Discard> {
pub fn new(tx: Sender<Warning>) -> Self {
let tx = Arc::new(Mutex::new(tx));
let drain = slog::Discard;
WarningsDrain { tx, drain }
}
}


impl<D: Drain> Drain for WarningsDrain<D> {
type Ok = ();
type Err = D::Err;

fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
//let mut meas = Measurement::new("warnings");
//println!("{:?}", values);
let mut ser = MeasurementRecord::new();
//values.serialize(record, &mut ser);
record.kv().serialize(record, &mut ser);
//println!("{:?}", ser);
let msg = record.msg().to_string();
if let Ok(lock) = self.tx.lock() {
lock.send(Warning::Debug { msg, kv: ser });
}
Ok(())
}
}


#[derive(Debug)]
pub struct WarningsManager {
pub tx: Sender<Warning>,
@@ -231,6 +366,17 @@ impl WarningsManager {
println!("warnings manager terminating");
break;
}

Warning::Debug { msg, kv } => {
let mut meas = kv.meas();
meas.add_field("msg", InfluentValue::String(msg.as_ref()));
meas.add_tag("category", "debug");
influx::serialize(&meas, &mut buf);
socket.send_str(&buf, 0);
buf.clear();
// and don't push to warnings
// bc it's debug
}
other => {
let rec = Record::new(other);
{
@@ -266,4 +412,46 @@ impl Drop for WarningsManager {
}


mod tests {
use super::*;
use test::{black_box, Bencher};

#[test]
fn it_creates_a_logger() {
let wm = WarningsManager::new();
let im = influx::writer(wm.tx.clone());
let drain = WarningsDrain { tx: Arc::new(Mutex::new(wm.tx.clone())), drain: slog::Discard };
let logger = slog::Logger::root(drain, o!("build" => "0.1.3"));
//for _ in 0..60 {
// debug!(logger, "test 123"; "exchange" => "plnx");
//}
}

#[bench]
fn it_sends_integers_with_a_sender_behind_a_mutex(b: &mut Bencher) {
let (tx, rx) = channel();
enum Msg {
Val(usize),
Terminate
}
let worker = thread::spawn(move || {
let mut xs = Vec::new();
loop {
match rx.recv().unwrap() {
Msg::Val(x) => { xs.push(x); }
Msg::Terminate => break,
}
}
xs.len()
});
let tx = Arc::new(Mutex::new(tx));
b.iter(|| {
let lock = tx.lock().unwrap();
lock.send(Msg::Val(1));
});
tx.lock().unwrap().send(Msg::Terminate);
let len = worker.join().unwrap();
println!("{}", len);

}
}

Loading…
Cancel
Save