From 4ce59763df2cc4f6ac02a940a3fd5cdbe9038aca Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 26 Jul 2017 21:06:19 -0400 Subject: [PATCH] initial commit --- .gitignore | 4 + Cargo.toml | 13 ++ src/influx.rs | 274 +++++++++++++++++++++++++++++++++++++++++ src/latency.rs | 321 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 30 +++++ src/warnings.rs | 223 +++++++++++++++++++++++++++++++++ 6 files changed, 865 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/influx.rs create mode 100644 src/latency.rs create mode 100644 src/lib.rs create mode 100644 src/warnings.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a72bc8b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target/ +**/*.rs.bk +Cargo.lock +.*.swp diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..157e2d8 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "logging" +version = "0.1.0" +authors = ["Jonathan Strong "] + +[dependencies] +zmq = "0.8" +influent = "0.4" +chrono = { version = "0.4", features = ["serde"] } +hyper = "0.10" +termion = "1.4.0" +windows = { path = "../windows" } +pub-sub = "2.0" diff --git a/src/influx.rs b/src/influx.rs new file mode 100644 index 0000000..7784577 --- /dev/null +++ b/src/influx.rs @@ -0,0 +1,274 @@ +//! Utilities to efficiently send data to influx +//! + +use std::iter::FromIterator; +use std::io::Read; +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::thread; + +use hyper::status::StatusCode; +use hyper::client::response::Response; +use hyper::Url; +use hyper::client::Client; +use influent::measurement::{Measurement, Value}; +use zmq; +use chrono::{DateTime, Utc, TimeZone}; + +use super::nanos; +use warnings::Warning; + +const WRITER_ADDR: &'static str = "ipc://mm-influx"; +//const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853"; +const DB_NAME: &'static str = "mm"; +const DB_HOST: &'static str = "http://localhost:8086/write"; +const ZMQ_RCV_HWM: i32 = 0; +const ZMQ_SND_HWM: i32 = 0; + +pub fn pull(ctx: &zmq::Context) -> Result { + let socket = ctx.socket(zmq::PULL)?; + socket.bind(WRITER_ADDR)?; + socket.set_rcvhwm(ZMQ_RCV_HWM)?; + Ok(socket) +} + +pub fn push(ctx: &zmq::Context) -> Result { + let socket = ctx.socket(zmq::PUSH)?; + socket.connect(WRITER_ADDR)?; + socket.set_sndhwm(ZMQ_SND_HWM)?; + Ok(socket) +} + +fn escape(s: &str) -> String { + s + .replace(" ", "\\ ") + .replace(",", "\\,") +} + +fn as_string(s: &str) -> String { + // the second replace removes double escapes + // + format!("\"{}\"", s.replace("\"", "\\\"") + .replace(r#"\\""#, r#"\""#)) +} + +#[test] +fn it_checks_as_string_does_not_double_escape() { + let raw = "this is \\\"an escaped string\\\" so it's problematic"; + let escaped = as_string(&raw); + assert_eq!(escaped, format!("\"{}\"", raw).as_ref()); +} + +fn as_integer(i: &i64) -> String { + format!("{}i", i) +} + +fn as_float(f: &f64) -> String { + f.to_string() +} + +fn as_boolean(b: &bool) -> &str { + if *b { "t" } else { "f" } +} + +pub fn now() -> i64 { + nanos(Utc::now()) as i64 +} + +/// Serialize the measurement into influx line protocol +/// and append to the buffer. +/// +/// # Examples +/// +/// ``` +/// extern crate influent; +/// extern crate logging; +/// +/// use influent::measurement::{Measurement, Value}; +/// use std::string::String; +/// use logging::influx::serialize; +/// +/// fn main() { +/// let mut buf = String::new(); +/// let mut m = Measurement::new("test"); +/// m.add_field("x", Value::Integer(1)); +/// serialize(&m, &mut buf); +/// } +/// +/// ``` +/// +pub fn serialize(measurement: &Measurement, line: &mut String) { + line.push_str(&escape(measurement.key)); + + for (tag, value) in measurement.tags.iter() { + line.push_str(","); + line.push_str(&escape(tag)); + line.push_str("="); + line.push_str(&escape(value)); + } + + let mut was_spaced = false; + + for (field, value) in measurement.fields.iter() { + line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }}); + line.push_str(&escape(field)); + line.push_str("="); + + match value { + &Value::String(ref s) => line.push_str(&as_string(s)), + &Value::Integer(ref i) => line.push_str(&as_integer(i)), + &Value::Float(ref f) => line.push_str(&as_float(f)), + &Value::Boolean(ref b) => line.push_str(as_boolean(b)) + }; + } + + match measurement.timestamp { + Some(t) => { + line.push_str(" "); + line.push_str(&t.to_string()); + } + _ => {} + } +} + + + +pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { + + let ctx = zmq::Context::new(); + + let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); + + let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + + let client = Client::new(); + + let mut buf = String::with_capacity(4096); + + let mut server_resp = String::with_capacity(4096); + + let mut count = 0; + + thread::spawn(move || { + loop { + if let Ok(bytes) = socket.recv_bytes(0) { + if let Ok(msg) = String::from_utf8(bytes) { + count = match count { + 0 => { + buf.push_str(&msg); + 1 + } + n @ 1...20 => { + buf.push_str("\n"); + buf.push_str(&msg); + n + 1 + } + _ => { + buf.push_str("\n"); + buf.push_str(&msg); + match client.post(url.clone()) + .body(&buf) + .send() { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => {} + + Ok(mut resp) => { + //let mut body = String::with_capacity(4096); + //let _ = + resp.read_to_string(&mut server_resp); //.unwrap_or(0); + //println!("Influx write error: Server responded {} (sent '{}' to {}):\n{}", + //warnings.send(Warning::Error(buf.clone())); + //print!("\n\n\n\n\n{}", buf); + warnings.send( + Warning::Error( + format!("Influx server: {}", server_resp))); + server_resp.clear(); + + //resp.status, String::from_utf8_lossy(&bytes), url, body); + } + + Err(why) => { + warnings.send( + Warning::Error( + format!("Influx write error: {}", why))); + } + } + buf.clear(); + 0 + } + } + } + } + } + }) +} + +mod tests { + use super::*; + + #[test] + fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() { + let ctx = zmq::Context::new(); + let socket = push(&ctx).unwrap(); + let (tx, rx) = channel(); + let w = writer(tx.clone()); + let mut buf = String::with_capacity(4096); + let mut meas = Measurement::new("rust_test"); + meas.add_tag("a", "t"); + meas.add_field("c", Value::Float(1.23456)); + let now = now(); + meas.set_timestamp(now); + serialize(&meas, &mut buf); + socket.send_str(&buf, 0); + drop(w); + } + + #[test] + fn it_serializes_a_measurement_in_place() { + let mut buf = String::with_capacity(4096); + let mut meas = Measurement::new("rust_test"); + meas.add_tag("a", "b"); + meas.add_field("c", Value::Float(1.0)); + let now = now(); + meas.set_timestamp(now); + serialize(&meas, &mut buf); + let ans = format!("rust_test,a=b c=1 {}", now); + assert_eq!(buf, ans); + } + + #[test] + fn it_serializes_a_hard_to_serialize_message() { + let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; + let mut buf = String::new(); + let mut server_resp = String::new(); + let mut m = Measurement::new("rust_test"); + m.add_field("s", Value::String(&raw)); + let now = now(); + m.set_timestamp(now); + serialize(&m, &mut buf); + println!("{}", buf); + buf.push_str("\n"); + let buf_copy = buf.clone(); + buf.push_str(&buf_copy); + println!("{}", buf); + + let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let client = Client::new(); + match client.post(url.clone()) + .body(&buf) + .send() { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => {} + + Ok(mut resp) => { + resp.read_to_string(&mut server_resp); //.unwrap_or(0); + panic!("{}", server_resp); + } + + Err(why) => { + panic!(why) + } + } + + } + +} diff --git a/src/latency.rs b/src/latency.rs new file mode 100644 index 0000000..6f31dc7 --- /dev/null +++ b/src/latency.rs @@ -0,0 +1,321 @@ +use std::thread::{self, JoinHandle}; +use std::sync::{Arc, Mutex, RwLock}; +use std::sync::mpsc::{self, Sender, Receiver, channel}; +use std::collections::VecDeque; +use std::fmt::{self, Display, Error as FmtError, Formatter, Write}; +use std::time::{Instant, Duration}; + +use chrono::{DateTime, Utc, TimeZone}; +use pub_sub::PubSub; +use zmq; +use influent::measurement::{Measurement, Value}; + +use windows::{DurationWindow, Incremental}; +use influx; + + + +pub type Nanos = u64; + +pub const SECOND: u64 = 1e9 as u64; +pub const MINUTE: u64 = SECOND * 60; +pub const HOUR: u64 = MINUTE * 60; +pub const MILLISECOND: u64 = SECOND / 1000; +pub const MICROSECOND: u64 = MILLISECOND / 1000; + +pub fn nanos(d: Duration) -> Nanos { + d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) +} + +pub fn dt_nanos(t: DateTime) -> i64 { + (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64) +} + +pub fn now() -> i64 { dt_nanos(Utc::now()) } + +pub fn tfmt(ns: Nanos) -> String { + let mut f = String::new(); + match ns { + t if t <= MICROSECOND => { + write!(f, "{}ns", t); + } + + t if t > MICROSECOND && t < MILLISECOND => { + write!(f, "{}u", t / MICROSECOND); + } + t if t > MILLISECOND && t < SECOND => { + write!(f, "{}ms", t / MILLISECOND); + } + + t => { + write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND); + } + } + f +} + +pub fn tfmt_dur(d: Duration) -> String { + tfmt(nanos(d)) +} + + +pub fn tfmt_write(ns: Nanos, f: &mut Formatter) { + match ns { + t if t <= MICROSECOND => { + write!(f, "{}ns", t); + } + + t if t > MICROSECOND && t < MILLISECOND => { + write!(f, "{}u", t / MICROSECOND); + } + t if t > MILLISECOND && t < SECOND => { + write!(f, "{}ms", t / MILLISECOND); + } + + t => { + write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND); + } + } +} + +#[derive(Debug)] +pub enum ExperiencedLatency { + + GdaxWebsocket(Duration), + + //GdaxWebsocketNoLock(Duration), + + GdaxHttpPublic(Duration), + + GdaxHttpPrivate(Duration), + + PlnxHttpPublic(Duration), + + PlnxHttpPrivate(Duration), + + PlnxOrderBook(Duration), + + ExmoHttpPublic(Duration), + + KrknHttpPublic(Duration), + + KrknHttpPrivate(Duration), + + KrknTrade(Duration), + + EventLoop(Duration), + + Terminate +} + +// impl Message for ExperiencedLatency { +// fn kill_switch() -> Self { +// ExperiencedLatency::Terminate +// } +// } + +/// represents over what period of time +/// the latency measurements were taken +pub trait MeasurementWindow { + fn duration(&self) -> Duration; +} + +#[derive(Debug, Clone, Copy)] +pub struct WThirty; + +impl Default for WThirty { + fn default() -> Self { WThirty {} } +} + +impl MeasurementWindow for WThirty { + fn duration(&self) -> Duration { Duration::from_secs(30) } +} + +#[derive(Debug, Clone, Copy)] +pub struct WTen; + +impl Default for WTen { + fn default() -> Self { WTen {} } +} + +impl MeasurementWindow for WTen { + fn duration(&self) -> Duration { Duration::from_secs(10) } +} + +#[derive(Debug, Clone, Default)] +pub struct LatencyUpdate + where W: MeasurementWindow +{ + pub gdax_ws: Nanos, + //pub gdax_ws_nolock: Nanos, + pub krkn_pub: Nanos, + pub krkn_priv: Nanos, + pub plnx_pub: Nanos, + pub plnx_priv: Nanos, + pub plnx_order: Nanos, + pub krkn_trade_30_mean: Nanos, + pub krkn_trade_30_max: Nanos, + + pub krkn_trade_300_mean: Nanos, + pub krkn_trade_300_max: Nanos, + + //pub event_loop: Nanos, + + pub size: W, +} + +impl Display for LatencyUpdate + where W: MeasurementWindow +{ + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, " Latency\n gdax ws: "); + tfmt_write(self.gdax_ws, f); + write!(f, "\n krkn pub: "); + tfmt_write(self.krkn_pub, f); + write!(f, "\n krkn priv: "); + tfmt_write(self.krkn_priv, f); + + write!(f, "\n krkn trade 30 mean: "); + tfmt_write(self.krkn_trade_30_mean, f); + + write!(f, "\n krkn trade 30 max: "); + tfmt_write(self.krkn_trade_30_max, f); + + write!(f, "\n krkn trade 300 mean: "); + tfmt_write(self.krkn_trade_300_mean, f); + + write!(f, "\n krkn trade 300 max: "); + tfmt_write(self.krkn_trade_300_max, f); + + write!(f, "\n plnx pub: "); + tfmt_write(self.plnx_pub, f); + write!(f, "\n plnx priv: "); + tfmt_write(self.plnx_priv, f); + write!(f, "\n plnx orderbook loop: "); + tfmt_write(self.plnx_order, f); + + //write!(f, "\n gdax ws nolock: "); + //tfmt_write(self.gdax_ws_nolock, f); + //write!(f, "\n event loop: "); + //tfmt(self.event_loop, f); + write!(f,"") + } +} + +impl LatencyUpdate { + pub fn measurement_window(&self) -> Duration { + self.size.duration() + } + +} + +pub struct LatencyManager + where W: MeasurementWindow + Clone + Send + Sync +{ + pub tx: Sender, + pub channel: PubSub>, + thread: Option>, +} + +//impl LatencyManager { +impl LatencyManager { + pub fn new(w: WTen) -> Self { + let (tx, rx) = channel(); + let tx_copy = tx.clone(); + let channel = PubSub::new(); + let channel_copy = channel.clone(); + let w = w.clone(); + + let thread = Some(thread::spawn(move || { + let ctx = zmq::Context::new(); + let socket = influx::push(&ctx).unwrap(); + let mut buf = String::with_capacity(4096); + let w = w.clone(); + let mut gdax_ws = DurationWindow::new(w.duration()); + let mut gdax_priv = DurationWindow::new(w.duration()); + let mut krkn_pub = DurationWindow::new(w.duration()); + let mut krkn_priv = DurationWindow::new(w.duration()); + let mut plnx_pub = DurationWindow::new(w.duration()); + let mut plnx_priv = DurationWindow::new(w.duration()); + let mut plnx_order = DurationWindow::new(w.duration()); + + /// yes I am intentionally breaking from the hard-typed duration + /// window ... that was a stupid idea + /// + let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30)); + let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300)); + //let mut gdax_ws_nolock = DurationWindow::new(w.duration()); + //let mut event_loop = DurationWindow::new(w.duration()); + let mut last_broadcast = Instant::now(); + loop { + if let Ok(msg) = rx.recv() { + match msg { + ExperiencedLatency::Terminate => { + println!("latency manager terminating"); + break; + } + ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(Instant::now(), d), + //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(Instant::now(), d), + ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(Instant::now(), d), + ExperiencedLatency::KrknHttpPublic(d) => krkn_pub.update(Instant::now(), d), + ExperiencedLatency::KrknHttpPrivate(d) => krkn_priv.update(Instant::now(), d), + ExperiencedLatency::PlnxHttpPublic(d) => plnx_pub.update(Instant::now(), d), + ExperiencedLatency::PlnxHttpPrivate(d) => plnx_priv.update(Instant::now(), d), + ExperiencedLatency::PlnxOrderBook(d) => plnx_order.update(Instant::now(), d), + ExperiencedLatency::KrknTrade(d) => { + let n = DurationWindow::nanos(d); + krkn_trade_30.update(Instant::now(), d); + krkn_trade_300.update(Instant::now(), d); + let mut m = Measurement::new("krkn_trade_api"); + m.add_field("nanos", Value::Integer(n as i64)); + m.set_timestamp(now()); + influx::serialize(&m, &mut buf); + socket.send_str(&buf, 0); + buf.clear(); + } + //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d), + other => {} + } + } + + if Instant::now() - last_broadcast > Duration::from_millis(100) { + let now = Instant::now(); + krkn_trade_30.refresh(&now); + krkn_trade_300.refresh(&now); + let update = LatencyUpdate { + gdax_ws: gdax_ws.refresh(&now).mean_nanos(), + //gdax_ws_nolock: gdax_ws_nolock.refresh(&now).mean_nanos(), + krkn_pub: krkn_pub.refresh(&now).mean_nanos(), + krkn_priv: krkn_priv.refresh(&now).mean_nanos(), + plnx_pub: plnx_pub.refresh(&now).mean_nanos(), + plnx_priv: plnx_priv.refresh(&now).mean_nanos(), + plnx_order: plnx_order.refresh(&now).mean_nanos(), + + krkn_trade_30_mean: krkn_trade_30.mean_nanos(), + krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0), + + krkn_trade_300_mean: krkn_trade_300.mean_nanos(), + krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0), + + //event_loop: event_loop.refresh(&now).mean_nanos(), + size: w.clone(), + }; + channel.send(update); + last_broadcast = now; + } + } + })); + + LatencyManager { + tx: tx_copy, + channel: channel_copy, + thread + } + } +} + + + + + + diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..8dcb0b6 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,30 @@ +//! Tools to record and display what's happening in your program +//! + +extern crate zmq; +extern crate influent; +extern crate chrono; +extern crate hyper; +extern crate termion; +extern crate pub_sub; + +extern crate windows; + +use chrono::{DateTime, Utc}; + +pub mod influx; +pub mod warnings; +pub mod latency; + +/// converts a chrono::DateTime to an integer timestamp (ns) +/// +pub fn nanos(t: DateTime) -> u64 { + (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64) +} + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + } +} diff --git a/src/warnings.rs b/src/warnings.rs new file mode 100644 index 0000000..bee0289 --- /dev/null +++ b/src/warnings.rs @@ -0,0 +1,223 @@ +//! An object to handle everyone's errors +//! + +use std::thread::{self, JoinHandle}; +use std::sync::{Arc, Mutex, RwLock}; +use std::sync::mpsc::{self, Sender, Receiver, channel}; +use std::collections::VecDeque; +use std::fmt::{Display, Error as FmtError, Formatter}; + +use zmq; +use chrono::{DateTime, Utc, TimeZone}; +use termion::color::{self, Fg, Bg}; +use influent::measurement::{Measurement, Value}; + +use super::nanos; +use influx; + +const N_WARNINGS: usize = 150; + +/// represents a non-fatal error somewhere in +/// the system to report either to the program interface +/// or in logs. +/// +#[derive(Debug, Clone, PartialEq)] +pub enum Warning { + Notice(String), + + Error(String), + + DegradedService(String), + + Critical(String), + + Confirmed(String), + + Awesome(String), + + Terminate +} + +impl Warning { + pub fn msg(&self) -> String { + match *self { + Warning::Notice(ref s) | Warning::Error(ref s) | + Warning::DegradedService(ref s) | Warning::Critical(ref s) | + Warning::Awesome(ref s) | Warning::Confirmed(ref s) => + s.clone(), + + Warning::Terminate => "".to_owned() + } + } + pub fn msg_str(&self) -> &str { + match *self { + Warning::Notice(ref s) | Warning::Error(ref s) | + Warning::DegradedService(ref s) | Warning::Critical(ref s) | + Warning::Awesome(ref s) | Warning::Confirmed(ref s) => + s.as_ref(), + + Warning::Terminate => "Terminate" + } + } + + pub fn category_str(&self) -> &str { + match self { + &Warning::Notice(_) => "notice", + &Warning::Error(_) => "error", + &Warning::Critical(_) => "critical", + &Warning::DegradedService(_) => "degraded_service", + &Warning::Confirmed(_) => "confirmed", + &Warning::Awesome(_) => "awesome", + &Warning::Terminate => "terminate", + } + } + + pub fn category(&self, f: &mut Formatter) { + match self { + &Warning::Notice(_) => { + write!(f, "[ Notice ]"); + } + + &Warning::Error(_) => { + write!(f, "{yellow}[{title}]{reset}", + yellow = Fg(color::LightYellow), + title = " Error--", + reset = Fg(color::Reset)); + } + &Warning::Critical(_) => { + write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", + bg = Bg(color::Red), + fg = Fg(color::White), + title = " CRITICAL ", + resetbg = Bg(color::Reset), + resetfg = Fg(color::Reset)); + } + + &Warning::Awesome(_) => { + write!(f, "{color}[{title}]{reset}", + color = Fg(color::Green), + title = "Awesome!", + reset = Fg(color::Reset)); + } + + &Warning::DegradedService(_) => { + write!(f, "{color}[{title}] {reset}", + color = Fg(color::Blue), + title = "Degraded Service ", + reset = Fg(color::Reset)); + } + &Warning::Confirmed(_) => { + write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", + bg = Bg(color::Blue), + fg = Fg(color::White), + title = "Confirmed ", + resetbg = Bg(color::Reset), + resetfg = Fg(color::Reset)); + } + + + _ => {} + } + } +} + +impl Display for Warning { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + self.category(f); + write!(f, " {}", self.msg()) + } +} + +// impl Message for Warning { +// fn kill_switch() -> Self { +// Warning::Terminate +// } +// } + +#[derive(Debug, Clone)] +pub struct Record { + pub time: DateTime, + pub msg: Warning +} + +impl Record { + pub fn new(msg: Warning) -> Self { + let time = Utc::now(); + Record { time, msg } + } + + pub fn to_measurement(&self) -> Measurement { + let cat = self.msg.category_str(); + let body = self.msg.msg_str(); + let mut m = Measurement::new("warnings"); + m.add_tag("category", cat); + m.add_field("msg", Value::String(body)); + m.set_timestamp(nanos(self.time) as i64); + m + } +} + +impl Display for Record { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + write!(f, "{} | {}", self.time.format("%H:%M:%S"), self.msg) + } +} + +#[derive(Debug)] +pub struct WarningsManager { + pub tx: Sender, + pub warnings: Arc>>, + thread: Option> +} + +impl WarningsManager { + pub fn new() -> Self { + let warnings = Arc::new(RwLock::new(VecDeque::new())); + let warnings_copy = warnings.clone(); + let (tx, rx) = channel(); + let mut buf = String::with_capacity(4096); + let ctx = zmq::Context::new(); + let socket = influx::push(&ctx).unwrap(); + let thread = thread::spawn(move || { loop { + if let Ok(msg) = rx.recv() { + match msg { + Warning::Terminate => { + println!("warnings manager terminating"); + break; + } + other => { + let rec = Record::new(other); + { + let m = rec.to_measurement(); + influx::serialize(&m, &mut buf); + socket.send_str(&buf, 0); + buf.clear(); + } + if let Ok(mut lock) = warnings.write() { + lock.push_front(rec); + lock.truncate(N_WARNINGS); + } + } + } + } + } }); + + WarningsManager { + warnings: warnings_copy, + thread: Some(thread), + tx + } + } +} + +impl Drop for WarningsManager { + fn drop(&mut self) { + self.tx.send(Warning::Terminate); + if let Some(thread) = self.thread.take() { + thread.join(); + } + } +} + + +