From e9e6e01ebe54d60d1081d826cadbea8c5333a196 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 14 Feb 2018 03:49:55 -0500 Subject: [PATCH] InfluxWriter refactor (mostly) --- Cargo.toml | 1 + src/hist.rs | 2 +- src/influx.rs | 191 +++++++++++++++++++++++++------------------------- src/lib.rs | 19 +++-- 4 files changed, 109 insertions(+), 104 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9e82ac4..7055459 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,4 +29,5 @@ pubsub = { path = "../pubsub" } [features] no-thrash = [] trace = [] +debug = [] test = [] diff --git a/src/hist.rs b/src/hist.rs index 6dad6df..1519cb8 100644 --- a/src/hist.rs +++ b/src/hist.rs @@ -102,7 +102,7 @@ impl HistLog { let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs(); let path = dir.join(&format!("{}-interval-log-{}.v2z", series, seconds)); let file = fs::File::create(&path).unwrap(); - thread::Builder::new().name(format!("HistLog::scribe::{}", series)).spawn(move || { + thread::Builder::new().name(format!("mm:hist:{}", series)).spawn(move || { let mut buf = io::LineWriter::new(file); let mut wtr = IntervalLogWriterBuilder::new() diff --git a/src/influx.rs b/src/influx.rs index 2d029d1..456371e 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -2,7 +2,8 @@ //! use std::io::Read; -use std::sync::mpsc::{Sender, channel, SendError}; +use std::sync::Arc; +use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use std::thread; use std::fs; use std::time::Duration; @@ -22,7 +23,7 @@ use fnv::FnvHasher; use decimal::d128; use uuid::Uuid; -use super::{nanos, file_logger}; +use super::{nanos, file_logger, LOG_LEVEL}; use warnings::Warning; const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; @@ -33,7 +34,7 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write"; const ZMQ_RCV_HWM: i32 = 0; const ZMQ_SND_HWM: i32 = 0; -const BUFFER_SIZE: u8 = 80; +const BUFFER_SIZE: u16 = 80; pub use super::{dur_nanos, dt_nanos}; @@ -171,29 +172,35 @@ macro_rules! measure { /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` /// measurements have accumulated. /// +#[derive(Debug)] pub struct InfluxWriter { host: &'static str, db: &'static str, - tx: Sender, - kill_switch: Sender<()>, - thread: Option>, + tx: Sender>, + thread: Option>>, } impl Default for InfluxWriter { fn default() -> Self { - InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE) + if cfg!(any(test, feature = "test")) { + InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 1) + } else if cfg!(feature = "localhost") { + InfluxWriter::new("localhost", "mm2", "/home/jstrong/src/logging/var/log/influx-default.log", BUFFER_SIZE) + } else { + InfluxWriter::new("washington.0ptimus.internal", "mm2", "var/influx-default.log", BUFFER_SIZE) + } } } impl Clone for InfluxWriter { fn clone(&self) -> Self { - let (tx, _) = channel(); + debug_assert!(self.thread.is_some()); + let thread = self.thread.as_ref().map(|x| Arc::clone(x)); InfluxWriter { host: self.host, db: self.db, tx: self.tx.clone(), - kill_switch: tx, - thread: None, + thread, } } } @@ -201,145 +208,123 @@ impl Clone for InfluxWriter { impl InfluxWriter { /// Sends the `OwnedMeasurement` to the serialization thread. /// - pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError> { - self.tx.send(m) + pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError>> { + self.tx.send(Some(m)) } - pub fn tx(&self) -> Sender { + pub fn tx(&self) -> Sender> { self.tx.clone() } #[allow(unused_assignments)] - pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self { - let (kill_switch, terminate) = channel(); - let (tx, rx) = channel(); - let logger = file_logger(log_path, Severity::Info); - let thread = thread::spawn(move || { + pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u16) -> Self { + let (tx, rx): (Sender>, Receiver>) = channel(); + let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread + let buffer_size = if cfg!(feature = "trace") { 0u16 } else { buffer_size }; + let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { debug!(logger, "initializing url"; "DB_HOST" => host, "DB_NAME" => db); - - #[cfg(not(test))] let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse"); - #[cfg(not(test))] let client = Client::new(); debug!(logger, "initializing buffers"); - let mut meas_buf = String::with_capacity(32 * 32 * 32); + //let mut meas_buf = String::with_capacity(32 * 32 * 32); let mut buf = String::with_capacity(32 * 32 * 32); let mut count = 0; + let send = |buf: &str| { + let resp = client.post(url.clone()) + .body(buf) + .send(); + match resp { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => { + debug!(logger, "server responded ok: 204 NoContent"); + } + + Ok(mut resp) => { + let mut server_resp = String::with_capacity(1024); + let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); + error!(logger, "influx server error"; + "status" => resp.status.to_string(), + "body" => server_resp); + } + + Err(why) => { + error!(logger, "http request failed: {:?}", why); + } + } + }; - let next = |prev: u8, s: &str, buf: &mut String| -> u8 { - trace!(logger, "appending serialized measurement to buffer"; - "prev" => prev, - "buf.len()" => buf.len()); + let next2 = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 { + // trace!(logger, "appending serialized measurement to buffer"; + // "prev" => prev, + // "buf.len()" => buf.len()); match prev { 0 if buffer_size > 0 => { - buf.push_str(s); + serialize_owned(m, buf); 1 } n if n < buffer_size => { buf.push_str("\n"); - buf.push_str(s); + serialize_owned(m, buf); n + 1 } - _ => { + n => { buf.push_str("\n"); - if s.len() > 0 { - buf.push_str(s); - } - trace!(logger, "sending buffer to influx"; - "buf.len()" => buf.len()); - - #[cfg(not(test))] - { - let resp = client.post(url.clone()) - .body(buf.as_str()) - .send(); - match resp { - - Ok(Response { status, .. }) if status == StatusCode::NoContent => { - trace!(logger, "server responded ok: 204 NoContent"); - } - - Ok(mut resp) => { - let mut server_resp = String::with_capacity(1024); - let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); - error!(logger, "influx server error"; - "status" => resp.status.to_string(), - "body" => server_resp); - } - - Err(why) => { - error!(logger, "http request failed: {:?}", why); - } - } - } + serialize_owned(m, buf); + //if s.len() > 0 { buf.push_str(s); } + debug!(logger, "sending buffer to influx"; "len" => n); + send(buf); buf.clear(); 0 } } }; - let mut rcvd_msg = false; - loop { - rcvd_msg = false; - let _ = rx.recv_timeout(Duration::from_millis(10)) - .map(|mut meas: OwnedMeasurement| { - // if we didn't set the timestamp, it would end up - // being whenever we accumulated `BUFFER_SIZE` messages, - // which might be some period of time after we received - // the message. - // + match rx.try_recv() { + Ok(Some(mut meas)) => { if meas.timestamp.is_none() { meas.timestamp = Some(now()); } trace!(logger, "rcvd new OwnedMeasurement"; "count" => count); - serialize_owned(&meas, &mut meas_buf); - count = next(count, &meas_buf, &mut buf); - meas_buf.clear(); - rcvd_msg = true; - }); - - let end = terminate.try_recv() - .map(|_| { - let _ = next(::std::u8::MAX, "", &mut buf); - true - }).unwrap_or(false); - - if end { break } - - #[cfg(feature = "no-thrash")] - { - if !rcvd_msg { - thread::sleep(Duration::new(0, 5000)); + count = next2(count, &meas, &mut buf); + } + + Ok(None) => { + if buf.len() > 0 { send(&buf) } + break + } + + _ => { + #[cfg(feature = "no-thrash")] + thread::sleep(Duration::new(0, 0)) } } } - - debug!(logger, "goodbye"); - }); + }).unwrap(); InfluxWriter { host, db, tx, - kill_switch, - thread: Some(thread) + thread: Some(Arc::new(thread)) } } } impl Drop for InfluxWriter { fn drop(&mut self) { - let _ = self.kill_switch.send(()).unwrap(); - if let Some(thread) = self.thread.take() { - let _ = thread.join(); + if let Some(arc) = self.thread.take() { + if let Ok(thread) = Arc::try_unwrap(arc) { + let _ = self.tx.send(None); + let _ = thread.join(); + } } } } @@ -664,11 +649,23 @@ mod tests { a.f(); } + #[test] + fn it_clones_an_influx_writer_to_check_both_drop() { + let influx = InfluxWriter::default(); + measure!(influx, drop_test, i(a, 1), i(b, 2)); + { + let influx = influx.clone(); + thread::spawn(move || { + measure!(influx, drop_test, i(a, 3), i(b, 4)); + }); + } + } + #[bench] fn influx_writer_send_basic(b: &mut Bencher) { - let m = InfluxWriter::default(); + let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); b.iter(|| { - measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]); + measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]); }); } diff --git a/src/lib.rs b/src/lib.rs index 5d13c4c..b9b53f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,13 +41,20 @@ pub mod warnings; pub mod latency; pub mod hist; +#[cfg(feature = "trace")] +pub const LOG_LEVEL : Severity = Severity::Trace; +#[cfg(all(feature = "debug", not(feature = "trace")))] +pub const LOG_LEVEL : Severity = Severity::Debug; +#[cfg(not(any(feature = "debug", feature = "trace")))] +pub const LOG_LEVEL : Severity = Severity::Info; + /// converts a chrono::DateTime to an integer timestamp (ns) /// pub fn nanos(t: DateTime) -> u64 { (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64) } -#[cfg(not(any(test, feature = "test")))] +//#[cfg(not(any(test, feature = "test")))] pub fn file_logger(path: &str, level: Severity) -> slog::Logger { let mut builder = FileLoggerBuilder::new(path); builder.level(level); @@ -55,11 +62,11 @@ pub fn file_logger(path: &str, level: Severity) -> slog::Logger { builder.build().unwrap() } -#[cfg(any(test, feature = "test"))] -pub fn file_logger(_: &str, _: Severity) -> slog::Logger { - use slog::*; - Logger::root(Discard, o!()) -} +// #[cfg(any(test, feature = "test"))] +// pub fn file_logger(_: &str, _: Severity) -> slog::Logger { +// use slog::*; +// Logger::root(Discard, o!()) +// } pub fn async_file_logger(path: &str, level: Severity) -> slog::Logger { let drain = file_logger(path, level);