@@ -0,0 +1,4 @@ | |||
/target/ | |||
**/*.rs.bk | |||
Cargo.lock | |||
.*.swp |
@@ -0,0 +1,13 @@ | |||
[package] | |||
name = "logging" | |||
version = "0.1.0" | |||
authors = ["Jonathan Strong <jstrong@legis.io>"] | |||
[dependencies] | |||
zmq = "0.8" | |||
influent = "0.4" | |||
chrono = { version = "0.4", features = ["serde"] } | |||
hyper = "0.10" | |||
termion = "1.4.0" | |||
windows = { path = "../windows" } | |||
pub-sub = "2.0" |
@@ -0,0 +1,274 @@ | |||
//! Utilities to efficiently send data to influx | |||
//! | |||
use std::iter::FromIterator; | |||
use std::io::Read; | |||
use std::sync::mpsc::{Sender, Receiver, channel}; | |||
use std::thread; | |||
use hyper::status::StatusCode; | |||
use hyper::client::response::Response; | |||
use hyper::Url; | |||
use hyper::client::Client; | |||
use influent::measurement::{Measurement, Value}; | |||
use zmq; | |||
use chrono::{DateTime, Utc, TimeZone}; | |||
use super::nanos; | |||
use warnings::Warning; | |||
const WRITER_ADDR: &'static str = "ipc://mm-influx"; | |||
//const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853"; | |||
const DB_NAME: &'static str = "mm"; | |||
const DB_HOST: &'static str = "http://localhost:8086/write"; | |||
const ZMQ_RCV_HWM: i32 = 0; | |||
const ZMQ_SND_HWM: i32 = 0; | |||
pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { | |||
let socket = ctx.socket(zmq::PULL)?; | |||
socket.bind(WRITER_ADDR)?; | |||
socket.set_rcvhwm(ZMQ_RCV_HWM)?; | |||
Ok(socket) | |||
} | |||
pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { | |||
let socket = ctx.socket(zmq::PUSH)?; | |||
socket.connect(WRITER_ADDR)?; | |||
socket.set_sndhwm(ZMQ_SND_HWM)?; | |||
Ok(socket) | |||
} | |||
fn escape(s: &str) -> String { | |||
s | |||
.replace(" ", "\\ ") | |||
.replace(",", "\\,") | |||
} | |||
fn as_string(s: &str) -> String { | |||
// the second replace removes double escapes | |||
// | |||
format!("\"{}\"", s.replace("\"", "\\\"") | |||
.replace(r#"\\""#, r#"\""#)) | |||
} | |||
#[test] | |||
fn it_checks_as_string_does_not_double_escape() { | |||
let raw = "this is \\\"an escaped string\\\" so it's problematic"; | |||
let escaped = as_string(&raw); | |||
assert_eq!(escaped, format!("\"{}\"", raw).as_ref()); | |||
} | |||
fn as_integer(i: &i64) -> String { | |||
format!("{}i", i) | |||
} | |||
fn as_float(f: &f64) -> String { | |||
f.to_string() | |||
} | |||
fn as_boolean(b: &bool) -> &str { | |||
if *b { "t" } else { "f" } | |||
} | |||
pub fn now() -> i64 { | |||
nanos(Utc::now()) as i64 | |||
} | |||
/// Serialize the measurement into influx line protocol | |||
/// and append to the buffer. | |||
/// | |||
/// # Examples | |||
/// | |||
/// ``` | |||
/// extern crate influent; | |||
/// extern crate logging; | |||
/// | |||
/// use influent::measurement::{Measurement, Value}; | |||
/// use std::string::String; | |||
/// use logging::influx::serialize; | |||
/// | |||
/// fn main() { | |||
/// let mut buf = String::new(); | |||
/// let mut m = Measurement::new("test"); | |||
/// m.add_field("x", Value::Integer(1)); | |||
/// serialize(&m, &mut buf); | |||
/// } | |||
/// | |||
/// ``` | |||
/// | |||
pub fn serialize(measurement: &Measurement, line: &mut String) { | |||
line.push_str(&escape(measurement.key)); | |||
for (tag, value) in measurement.tags.iter() { | |||
line.push_str(","); | |||
line.push_str(&escape(tag)); | |||
line.push_str("="); | |||
line.push_str(&escape(value)); | |||
} | |||
let mut was_spaced = false; | |||
for (field, value) in measurement.fields.iter() { | |||
line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }}); | |||
line.push_str(&escape(field)); | |||
line.push_str("="); | |||
match value { | |||
&Value::String(ref s) => line.push_str(&as_string(s)), | |||
&Value::Integer(ref i) => line.push_str(&as_integer(i)), | |||
&Value::Float(ref f) => line.push_str(&as_float(f)), | |||
&Value::Boolean(ref b) => line.push_str(as_boolean(b)) | |||
}; | |||
} | |||
match measurement.timestamp { | |||
Some(t) => { | |||
line.push_str(" "); | |||
line.push_str(&t.to_string()); | |||
} | |||
_ => {} | |||
} | |||
} | |||
pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> { | |||
let ctx = zmq::Context::new(); | |||
let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); | |||
let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); | |||
let client = Client::new(); | |||
let mut buf = String::with_capacity(4096); | |||
let mut server_resp = String::with_capacity(4096); | |||
let mut count = 0; | |||
thread::spawn(move || { | |||
loop { | |||
if let Ok(bytes) = socket.recv_bytes(0) { | |||
if let Ok(msg) = String::from_utf8(bytes) { | |||
count = match count { | |||
0 => { | |||
buf.push_str(&msg); | |||
1 | |||
} | |||
n @ 1...20 => { | |||
buf.push_str("\n"); | |||
buf.push_str(&msg); | |||
n + 1 | |||
} | |||
_ => { | |||
buf.push_str("\n"); | |||
buf.push_str(&msg); | |||
match client.post(url.clone()) | |||
.body(&buf) | |||
.send() { | |||
Ok(Response { status, .. }) if status == StatusCode::NoContent => {} | |||
Ok(mut resp) => { | |||
//let mut body = String::with_capacity(4096); | |||
//let _ = | |||
resp.read_to_string(&mut server_resp); //.unwrap_or(0); | |||
//println!("Influx write error: Server responded {} (sent '{}' to {}):\n{}", | |||
//warnings.send(Warning::Error(buf.clone())); | |||
//print!("\n\n\n\n\n{}", buf); | |||
warnings.send( | |||
Warning::Error( | |||
format!("Influx server: {}", server_resp))); | |||
server_resp.clear(); | |||
//resp.status, String::from_utf8_lossy(&bytes), url, body); | |||
} | |||
Err(why) => { | |||
warnings.send( | |||
Warning::Error( | |||
format!("Influx write error: {}", why))); | |||
} | |||
} | |||
buf.clear(); | |||
0 | |||
} | |||
} | |||
} | |||
} | |||
} | |||
}) | |||
} | |||
mod tests { | |||
use super::*; | |||
#[test] | |||
fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() { | |||
let ctx = zmq::Context::new(); | |||
let socket = push(&ctx).unwrap(); | |||
let (tx, rx) = channel(); | |||
let w = writer(tx.clone()); | |||
let mut buf = String::with_capacity(4096); | |||
let mut meas = Measurement::new("rust_test"); | |||
meas.add_tag("a", "t"); | |||
meas.add_field("c", Value::Float(1.23456)); | |||
let now = now(); | |||
meas.set_timestamp(now); | |||
serialize(&meas, &mut buf); | |||
socket.send_str(&buf, 0); | |||
drop(w); | |||
} | |||
#[test] | |||
fn it_serializes_a_measurement_in_place() { | |||
let mut buf = String::with_capacity(4096); | |||
let mut meas = Measurement::new("rust_test"); | |||
meas.add_tag("a", "b"); | |||
meas.add_field("c", Value::Float(1.0)); | |||
let now = now(); | |||
meas.set_timestamp(now); | |||
serialize(&meas, &mut buf); | |||
let ans = format!("rust_test,a=b c=1 {}", now); | |||
assert_eq!(buf, ans); | |||
} | |||
#[test] | |||
fn it_serializes_a_hard_to_serialize_message() { | |||
let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; | |||
let mut buf = String::new(); | |||
let mut server_resp = String::new(); | |||
let mut m = Measurement::new("rust_test"); | |||
m.add_field("s", Value::String(&raw)); | |||
let now = now(); | |||
m.set_timestamp(now); | |||
serialize(&m, &mut buf); | |||
println!("{}", buf); | |||
buf.push_str("\n"); | |||
let buf_copy = buf.clone(); | |||
buf.push_str(&buf_copy); | |||
println!("{}", buf); | |||
let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); | |||
let client = Client::new(); | |||
match client.post(url.clone()) | |||
.body(&buf) | |||
.send() { | |||
Ok(Response { status, .. }) if status == StatusCode::NoContent => {} | |||
Ok(mut resp) => { | |||
resp.read_to_string(&mut server_resp); //.unwrap_or(0); | |||
panic!("{}", server_resp); | |||
} | |||
Err(why) => { | |||
panic!(why) | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,321 @@ | |||
use std::thread::{self, JoinHandle}; | |||
use std::sync::{Arc, Mutex, RwLock}; | |||
use std::sync::mpsc::{self, Sender, Receiver, channel}; | |||
use std::collections::VecDeque; | |||
use std::fmt::{self, Display, Error as FmtError, Formatter, Write}; | |||
use std::time::{Instant, Duration}; | |||
use chrono::{DateTime, Utc, TimeZone}; | |||
use pub_sub::PubSub; | |||
use zmq; | |||
use influent::measurement::{Measurement, Value}; | |||
use windows::{DurationWindow, Incremental}; | |||
use influx; | |||
pub type Nanos = u64; | |||
pub const SECOND: u64 = 1e9 as u64; | |||
pub const MINUTE: u64 = SECOND * 60; | |||
pub const HOUR: u64 = MINUTE * 60; | |||
pub const MILLISECOND: u64 = SECOND / 1000; | |||
pub const MICROSECOND: u64 = MILLISECOND / 1000; | |||
pub fn nanos(d: Duration) -> Nanos { | |||
d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) | |||
} | |||
pub fn dt_nanos(t: DateTime<Utc>) -> i64 { | |||
(t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64) | |||
} | |||
pub fn now() -> i64 { dt_nanos(Utc::now()) } | |||
pub fn tfmt(ns: Nanos) -> String { | |||
let mut f = String::new(); | |||
match ns { | |||
t if t <= MICROSECOND => { | |||
write!(f, "{}ns", t); | |||
} | |||
t if t > MICROSECOND && t < MILLISECOND => { | |||
write!(f, "{}u", t / MICROSECOND); | |||
} | |||
t if t > MILLISECOND && t < SECOND => { | |||
write!(f, "{}ms", t / MILLISECOND); | |||
} | |||
t => { | |||
write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND); | |||
} | |||
} | |||
f | |||
} | |||
pub fn tfmt_dur(d: Duration) -> String { | |||
tfmt(nanos(d)) | |||
} | |||
pub fn tfmt_write(ns: Nanos, f: &mut Formatter) { | |||
match ns { | |||
t if t <= MICROSECOND => { | |||
write!(f, "{}ns", t); | |||
} | |||
t if t > MICROSECOND && t < MILLISECOND => { | |||
write!(f, "{}u", t / MICROSECOND); | |||
} | |||
t if t > MILLISECOND && t < SECOND => { | |||
write!(f, "{}ms", t / MILLISECOND); | |||
} | |||
t => { | |||
write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND); | |||
} | |||
} | |||
} | |||
#[derive(Debug)] | |||
pub enum ExperiencedLatency { | |||
GdaxWebsocket(Duration), | |||
//GdaxWebsocketNoLock(Duration), | |||
GdaxHttpPublic(Duration), | |||
GdaxHttpPrivate(Duration), | |||
PlnxHttpPublic(Duration), | |||
PlnxHttpPrivate(Duration), | |||
PlnxOrderBook(Duration), | |||
ExmoHttpPublic(Duration), | |||
KrknHttpPublic(Duration), | |||
KrknHttpPrivate(Duration), | |||
KrknTrade(Duration), | |||
EventLoop(Duration), | |||
Terminate | |||
} | |||
// impl Message for ExperiencedLatency { | |||
// fn kill_switch() -> Self { | |||
// ExperiencedLatency::Terminate | |||
// } | |||
// } | |||
/// represents over what period of time | |||
/// the latency measurements were taken | |||
pub trait MeasurementWindow { | |||
fn duration(&self) -> Duration; | |||
} | |||
#[derive(Debug, Clone, Copy)] | |||
pub struct WThirty; | |||
impl Default for WThirty { | |||
fn default() -> Self { WThirty {} } | |||
} | |||
impl MeasurementWindow for WThirty { | |||
fn duration(&self) -> Duration { Duration::from_secs(30) } | |||
} | |||
#[derive(Debug, Clone, Copy)] | |||
pub struct WTen; | |||
impl Default for WTen { | |||
fn default() -> Self { WTen {} } | |||
} | |||
impl MeasurementWindow for WTen { | |||
fn duration(&self) -> Duration { Duration::from_secs(10) } | |||
} | |||
#[derive(Debug, Clone, Default)] | |||
pub struct LatencyUpdate<W> | |||
where W: MeasurementWindow | |||
{ | |||
pub gdax_ws: Nanos, | |||
//pub gdax_ws_nolock: Nanos, | |||
pub krkn_pub: Nanos, | |||
pub krkn_priv: Nanos, | |||
pub plnx_pub: Nanos, | |||
pub plnx_priv: Nanos, | |||
pub plnx_order: Nanos, | |||
pub krkn_trade_30_mean: Nanos, | |||
pub krkn_trade_30_max: Nanos, | |||
pub krkn_trade_300_mean: Nanos, | |||
pub krkn_trade_300_max: Nanos, | |||
//pub event_loop: Nanos, | |||
pub size: W, | |||
} | |||
impl<W> Display for LatencyUpdate<W> | |||
where W: MeasurementWindow | |||
{ | |||
fn fmt(&self, f: &mut Formatter) -> fmt::Result { | |||
write!(f, " Latency\n gdax ws: "); | |||
tfmt_write(self.gdax_ws, f); | |||
write!(f, "\n krkn pub: "); | |||
tfmt_write(self.krkn_pub, f); | |||
write!(f, "\n krkn priv: "); | |||
tfmt_write(self.krkn_priv, f); | |||
write!(f, "\n krkn trade 30 mean: "); | |||
tfmt_write(self.krkn_trade_30_mean, f); | |||
write!(f, "\n krkn trade 30 max: "); | |||
tfmt_write(self.krkn_trade_30_max, f); | |||
write!(f, "\n krkn trade 300 mean: "); | |||
tfmt_write(self.krkn_trade_300_mean, f); | |||
write!(f, "\n krkn trade 300 max: "); | |||
tfmt_write(self.krkn_trade_300_max, f); | |||
write!(f, "\n plnx pub: "); | |||
tfmt_write(self.plnx_pub, f); | |||
write!(f, "\n plnx priv: "); | |||
tfmt_write(self.plnx_priv, f); | |||
write!(f, "\n plnx orderbook loop: "); | |||
tfmt_write(self.plnx_order, f); | |||
//write!(f, "\n gdax ws nolock: "); | |||
//tfmt_write(self.gdax_ws_nolock, f); | |||
//write!(f, "\n event loop: "); | |||
//tfmt(self.event_loop, f); | |||
write!(f,"") | |||
} | |||
} | |||
impl<W: MeasurementWindow> LatencyUpdate<W> { | |||
pub fn measurement_window(&self) -> Duration { | |||
self.size.duration() | |||
} | |||
} | |||
pub struct LatencyManager<W> | |||
where W: MeasurementWindow + Clone + Send + Sync | |||
{ | |||
pub tx: Sender<ExperiencedLatency>, | |||
pub channel: PubSub<LatencyUpdate<W>>, | |||
thread: Option<JoinHandle<()>>, | |||
} | |||
//impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> { | |||
impl LatencyManager<WTen> { | |||
pub fn new(w: WTen) -> Self { | |||
let (tx, rx) = channel(); | |||
let tx_copy = tx.clone(); | |||
let channel = PubSub::new(); | |||
let channel_copy = channel.clone(); | |||
let w = w.clone(); | |||
let thread = Some(thread::spawn(move || { | |||
let ctx = zmq::Context::new(); | |||
let socket = influx::push(&ctx).unwrap(); | |||
let mut buf = String::with_capacity(4096); | |||
let w = w.clone(); | |||
let mut gdax_ws = DurationWindow::new(w.duration()); | |||
let mut gdax_priv = DurationWindow::new(w.duration()); | |||
let mut krkn_pub = DurationWindow::new(w.duration()); | |||
let mut krkn_priv = DurationWindow::new(w.duration()); | |||
let mut plnx_pub = DurationWindow::new(w.duration()); | |||
let mut plnx_priv = DurationWindow::new(w.duration()); | |||
let mut plnx_order = DurationWindow::new(w.duration()); | |||
/// yes I am intentionally breaking from the hard-typed duration | |||
/// window ... that was a stupid idea | |||
/// | |||
let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30)); | |||
let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300)); | |||
//let mut gdax_ws_nolock = DurationWindow::new(w.duration()); | |||
//let mut event_loop = DurationWindow::new(w.duration()); | |||
let mut last_broadcast = Instant::now(); | |||
loop { | |||
if let Ok(msg) = rx.recv() { | |||
match msg { | |||
ExperiencedLatency::Terminate => { | |||
println!("latency manager terminating"); | |||
break; | |||
} | |||
ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(Instant::now(), d), | |||
//ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(Instant::now(), d), | |||
ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(Instant::now(), d), | |||
ExperiencedLatency::KrknHttpPublic(d) => krkn_pub.update(Instant::now(), d), | |||
ExperiencedLatency::KrknHttpPrivate(d) => krkn_priv.update(Instant::now(), d), | |||
ExperiencedLatency::PlnxHttpPublic(d) => plnx_pub.update(Instant::now(), d), | |||
ExperiencedLatency::PlnxHttpPrivate(d) => plnx_priv.update(Instant::now(), d), | |||
ExperiencedLatency::PlnxOrderBook(d) => plnx_order.update(Instant::now(), d), | |||
ExperiencedLatency::KrknTrade(d) => { | |||
let n = DurationWindow::nanos(d); | |||
krkn_trade_30.update(Instant::now(), d); | |||
krkn_trade_300.update(Instant::now(), d); | |||
let mut m = Measurement::new("krkn_trade_api"); | |||
m.add_field("nanos", Value::Integer(n as i64)); | |||
m.set_timestamp(now()); | |||
influx::serialize(&m, &mut buf); | |||
socket.send_str(&buf, 0); | |||
buf.clear(); | |||
} | |||
//ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d), | |||
other => {} | |||
} | |||
} | |||
if Instant::now() - last_broadcast > Duration::from_millis(100) { | |||
let now = Instant::now(); | |||
krkn_trade_30.refresh(&now); | |||
krkn_trade_300.refresh(&now); | |||
let update = LatencyUpdate { | |||
gdax_ws: gdax_ws.refresh(&now).mean_nanos(), | |||
//gdax_ws_nolock: gdax_ws_nolock.refresh(&now).mean_nanos(), | |||
krkn_pub: krkn_pub.refresh(&now).mean_nanos(), | |||
krkn_priv: krkn_priv.refresh(&now).mean_nanos(), | |||
plnx_pub: plnx_pub.refresh(&now).mean_nanos(), | |||
plnx_priv: plnx_priv.refresh(&now).mean_nanos(), | |||
plnx_order: plnx_order.refresh(&now).mean_nanos(), | |||
krkn_trade_30_mean: krkn_trade_30.mean_nanos(), | |||
krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0), | |||
krkn_trade_300_mean: krkn_trade_300.mean_nanos(), | |||
krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0), | |||
//event_loop: event_loop.refresh(&now).mean_nanos(), | |||
size: w.clone(), | |||
}; | |||
channel.send(update); | |||
last_broadcast = now; | |||
} | |||
} | |||
})); | |||
LatencyManager { | |||
tx: tx_copy, | |||
channel: channel_copy, | |||
thread | |||
} | |||
} | |||
} | |||
@@ -0,0 +1,30 @@ | |||
//! Tools to record and display what's happening in your program | |||
//! | |||
extern crate zmq; | |||
extern crate influent; | |||
extern crate chrono; | |||
extern crate hyper; | |||
extern crate termion; | |||
extern crate pub_sub; | |||
extern crate windows; | |||
use chrono::{DateTime, Utc}; | |||
pub mod influx; | |||
pub mod warnings; | |||
pub mod latency; | |||
/// converts a chrono::DateTime to an integer timestamp (ns) | |||
/// | |||
pub fn nanos(t: DateTime<Utc>) -> u64 { | |||
(t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64) | |||
} | |||
#[cfg(test)] | |||
mod tests { | |||
#[test] | |||
fn it_works() { | |||
} | |||
} |
@@ -0,0 +1,223 @@ | |||
//! An object to handle everyone's errors | |||
//! | |||
use std::thread::{self, JoinHandle}; | |||
use std::sync::{Arc, Mutex, RwLock}; | |||
use std::sync::mpsc::{self, Sender, Receiver, channel}; | |||
use std::collections::VecDeque; | |||
use std::fmt::{Display, Error as FmtError, Formatter}; | |||
use zmq; | |||
use chrono::{DateTime, Utc, TimeZone}; | |||
use termion::color::{self, Fg, Bg}; | |||
use influent::measurement::{Measurement, Value}; | |||
use super::nanos; | |||
use influx; | |||
const N_WARNINGS: usize = 150; | |||
/// represents a non-fatal error somewhere in | |||
/// the system to report either to the program interface | |||
/// or in logs. | |||
/// | |||
#[derive(Debug, Clone, PartialEq)] | |||
pub enum Warning { | |||
Notice(String), | |||
Error(String), | |||
DegradedService(String), | |||
Critical(String), | |||
Confirmed(String), | |||
Awesome(String), | |||
Terminate | |||
} | |||
impl Warning { | |||
pub fn msg(&self) -> String { | |||
match *self { | |||
Warning::Notice(ref s) | Warning::Error(ref s) | | |||
Warning::DegradedService(ref s) | Warning::Critical(ref s) | | |||
Warning::Awesome(ref s) | Warning::Confirmed(ref s) => | |||
s.clone(), | |||
Warning::Terminate => "".to_owned() | |||
} | |||
} | |||
pub fn msg_str(&self) -> &str { | |||
match *self { | |||
Warning::Notice(ref s) | Warning::Error(ref s) | | |||
Warning::DegradedService(ref s) | Warning::Critical(ref s) | | |||
Warning::Awesome(ref s) | Warning::Confirmed(ref s) => | |||
s.as_ref(), | |||
Warning::Terminate => "Terminate" | |||
} | |||
} | |||
pub fn category_str(&self) -> &str { | |||
match self { | |||
&Warning::Notice(_) => "notice", | |||
&Warning::Error(_) => "error", | |||
&Warning::Critical(_) => "critical", | |||
&Warning::DegradedService(_) => "degraded_service", | |||
&Warning::Confirmed(_) => "confirmed", | |||
&Warning::Awesome(_) => "awesome", | |||
&Warning::Terminate => "terminate", | |||
} | |||
} | |||
pub fn category(&self, f: &mut Formatter) { | |||
match self { | |||
&Warning::Notice(_) => { | |||
write!(f, "[ Notice ]"); | |||
} | |||
&Warning::Error(_) => { | |||
write!(f, "{yellow}[{title}]{reset}", | |||
yellow = Fg(color::LightYellow), | |||
title = " Error--", | |||
reset = Fg(color::Reset)); | |||
} | |||
&Warning::Critical(_) => { | |||
write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", | |||
bg = Bg(color::Red), | |||
fg = Fg(color::White), | |||
title = " CRITICAL ", | |||
resetbg = Bg(color::Reset), | |||
resetfg = Fg(color::Reset)); | |||
} | |||
&Warning::Awesome(_) => { | |||
write!(f, "{color}[{title}]{reset}", | |||
color = Fg(color::Green), | |||
title = "Awesome!", | |||
reset = Fg(color::Reset)); | |||
} | |||
&Warning::DegradedService(_) => { | |||
write!(f, "{color}[{title}] {reset}", | |||
color = Fg(color::Blue), | |||
title = "Degraded Service ", | |||
reset = Fg(color::Reset)); | |||
} | |||
&Warning::Confirmed(_) => { | |||
write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", | |||
bg = Bg(color::Blue), | |||
fg = Fg(color::White), | |||
title = "Confirmed ", | |||
resetbg = Bg(color::Reset), | |||
resetfg = Fg(color::Reset)); | |||
} | |||
_ => {} | |||
} | |||
} | |||
} | |||
impl Display for Warning { | |||
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { | |||
self.category(f); | |||
write!(f, " {}", self.msg()) | |||
} | |||
} | |||
// impl Message for Warning { | |||
// fn kill_switch() -> Self { | |||
// Warning::Terminate | |||
// } | |||
// } | |||
#[derive(Debug, Clone)] | |||
pub struct Record { | |||
pub time: DateTime<Utc>, | |||
pub msg: Warning | |||
} | |||
impl Record { | |||
pub fn new(msg: Warning) -> Self { | |||
let time = Utc::now(); | |||
Record { time, msg } | |||
} | |||
pub fn to_measurement(&self) -> Measurement { | |||
let cat = self.msg.category_str(); | |||
let body = self.msg.msg_str(); | |||
let mut m = Measurement::new("warnings"); | |||
m.add_tag("category", cat); | |||
m.add_field("msg", Value::String(body)); | |||
m.set_timestamp(nanos(self.time) as i64); | |||
m | |||
} | |||
} | |||
impl Display for Record { | |||
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { | |||
write!(f, "{} | {}", self.time.format("%H:%M:%S"), self.msg) | |||
} | |||
} | |||
#[derive(Debug)] | |||
pub struct WarningsManager { | |||
pub tx: Sender<Warning>, | |||
pub warnings: Arc<RwLock<VecDeque<Record>>>, | |||
thread: Option<JoinHandle<()>> | |||
} | |||
impl WarningsManager { | |||
pub fn new() -> Self { | |||
let warnings = Arc::new(RwLock::new(VecDeque::new())); | |||
let warnings_copy = warnings.clone(); | |||
let (tx, rx) = channel(); | |||
let mut buf = String::with_capacity(4096); | |||
let ctx = zmq::Context::new(); | |||
let socket = influx::push(&ctx).unwrap(); | |||
let thread = thread::spawn(move || { loop { | |||
if let Ok(msg) = rx.recv() { | |||
match msg { | |||
Warning::Terminate => { | |||
println!("warnings manager terminating"); | |||
break; | |||
} | |||
other => { | |||
let rec = Record::new(other); | |||
{ | |||
let m = rec.to_measurement(); | |||
influx::serialize(&m, &mut buf); | |||
socket.send_str(&buf, 0); | |||
buf.clear(); | |||
} | |||
if let Ok(mut lock) = warnings.write() { | |||
lock.push_front(rec); | |||
lock.truncate(N_WARNINGS); | |||
} | |||
} | |||
} | |||
} | |||
} }); | |||
WarningsManager { | |||
warnings: warnings_copy, | |||
thread: Some(thread), | |||
tx | |||
} | |||
} | |||
} | |||
impl Drop for WarningsManager { | |||
fn drop(&mut self) { | |||
self.tx.send(Warning::Terminate); | |||
if let Some(thread) = self.thread.take() { | |||
thread.join(); | |||
} | |||
} | |||
} | |||