From 73ac545e1fccf61fcc4c128a70d7a2e9e18ab03c Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 7 Sep 2017 04:28:33 -0400 Subject: [PATCH] impl Drop for InfluxWriter and latency Manager --- src/influx.rs | 219 ++++++++++++++++++++++++++++--------------------- src/latency.rs | 9 ++ 2 files changed, 135 insertions(+), 93 deletions(-) diff --git a/src/influx.rs b/src/influx.rs index 6ebdf13..fe236c6 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -293,114 +293,147 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 { /// incoming `Measurement`s that way *in addition* to the old socket/`String` /// method /// -pub fn writer_str_or_meas(log_path: &str, warnings: Sender) -> (thread::JoinHandle<()>, Sender) { - let (tx, rx) = channel(); - let logger = file_logger(log_path, Severity::Info); - let thread = thread::spawn(move || { - info!(logger, "initializing zmq"); - let _ = fs::create_dir("/tmp/mm"); - let ctx = zmq::Context::new(); - let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); - info!(logger, "initializing url"; - "DB_HOST" => DB_HOST, - "DB_NAME" => DB_NAME); - let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); - let client = Client::new(); - info!(logger, "initializing buffers"); - let mut meas_buf = String::with_capacity(4096); - let mut buf = String::with_capacity(4096); - let mut server_resp = String::with_capacity(4096); - let mut count = 0; +pub struct InfluxWriter { + kill_switch: Sender<()>, + thread: Option>, +} - let next = |prev: u8, s: &str, buf: &mut String| -> u8 { - debug!(logger, "appending serialized measurement to buffer"; - "prev" => prev, - "buf.len()" => buf.len()); - match prev { - 0 => { - buf.push_str(s); - 1 - } +impl InfluxWriter { - n @ 1...80 => { - buf.push_str("\n"); - buf.push_str(s); - n + 1 - } + pub fn new(log_path: &str, warnings: Sender) -> (Self, Sender) { + let (kill_switch, terminate) = channel(); + let (tx, rx) = channel(); + let logger = file_logger(log_path, Severity::Info); + let thread = thread::spawn(move || { + info!(logger, "initializing zmq"); + let _ = fs::create_dir("/tmp/mm"); + let ctx = zmq::Context::new(); + let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); + info!(logger, "initializing url"; + "DB_HOST" => DB_HOST, + "DB_NAME" => DB_NAME); + let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let client = Client::new(); + info!(logger, "initializing buffers"); + let mut meas_buf = String::with_capacity(4096); + let mut buf = String::with_capacity(4096); + let mut server_resp = String::with_capacity(4096); + let mut count = 0; + + let next = |prev: u8, s: &str, buf: &mut String| -> u8 { + debug!(logger, "appending serialized measurement to buffer"; + "prev" => prev, + "buf.len()" => buf.len()); + match prev { + 0 => { + buf.push_str(s); + 1 + } - _ => { - buf.push_str("\n"); - buf.push_str(s); - debug!(logger, "sending buffer to influx"; - "buf.len()" => buf.len()); - let resp = client.post(url.clone()) - .body(buf.as_str()) - .send(); - match resp { - - Ok(Response { status, .. }) if status == StatusCode::NoContent => { - debug!(logger, "server responded ok: 204 NoContent"); - } + n @ 1...80 => { + buf.push_str("\n"); + buf.push_str(s); + n + 1 + } - Ok(mut resp) => { - let mut server_resp = String::with_capacity(1024); - //server_resp.push_str(&format!("sent at {}:\n", Utc::now())); - //server_resp.push_str(&buf); - //server_resp.push_str("\nreceived:\n"); - resp.read_to_string(&mut server_resp); //.unwrap_or(0); - error!(logger, "influx server error"; - "status" => resp.status.to_string(), - "body" => server_resp); + _ => { + buf.push_str("\n"); + if s.len() > 0 { + buf.push_str(s); } + debug!(logger, "sending buffer to influx"; + "buf.len()" => buf.len()); - Err(why) => { - error!(logger, "http request failed: {:?}", why); - // warnings.send( - // Warning::Error( - // format!("Influx write error: {}", why))); + let resp = client.post(url.clone()) + .body(buf.as_str()) + .send(); + match resp { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => { + debug!(logger, "server responded ok: 204 NoContent"); + } + + Ok(mut resp) => { + let mut server_resp = String::with_capacity(1024); + //server_resp.push_str(&format!("sent at {}:\n", Utc::now())); + //server_resp.push_str(&buf); + //server_resp.push_str("\nreceived:\n"); + resp.read_to_string(&mut server_resp); //.unwrap_or(0); + error!(logger, "influx server error"; + "status" => resp.status.to_string(), + "body" => server_resp); + } + + Err(why) => { + error!(logger, "http request failed: {:?}", why); + // warnings.send( + // Warning::Error( + // format!("Influx write error: {}", why))); + } } + buf.clear(); + 0 } - buf.clear(); - 0 + } + }; + + let mut rcvd_msg = false; + + loop { + rcvd_msg = false; + rx.try_recv() + .map(|meas| { + debug!(logger, "rcvd new OwnedMeasurement"; + "count" => count); + serialize_owned(&meas, &mut meas_buf); + count = next(count, &meas_buf, &mut buf); + meas_buf.clear(); + rcvd_msg = true; + }); + + socket.recv_bytes(zmq::DONTWAIT).ok() + .and_then(|bytes| { + String::from_utf8(bytes).ok() + }).map(|s| { + debug!(logger, "rcvd new serialized"; + "count" => count); + count = next(count, &s, &mut buf); + rcvd_msg = true; + }); + + let end = terminate.try_recv() + .map(|_| { + let _ = next(::std::u8::MAX, "", &mut buf); + true + }).unwrap_or(false); + + if end { break } + + if !rcvd_msg { + #[cfg(feature = "no-thrash")] + thread::sleep(Duration::from_millis(1) / 10); } } - }; - let mut rcvd_msg = false; + crit!(logger, "goodbye"); + }); + let writer = InfluxWriter { + kill_switch, + thread: Some(thread) + }; + (writer, tx) + } +} - loop { - rcvd_msg = false; - rx.try_recv() - .map(|meas| { - debug!(logger, "rcvd new OwnedMeasurement"; - "count" => count); - serialize_owned(&meas, &mut meas_buf); - count = next(count, &meas_buf, &mut buf); - meas_buf.clear(); - rcvd_msg = true; - }); - - socket.recv_bytes(zmq::DONTWAIT).ok() - .and_then(|bytes| { - String::from_utf8(bytes).ok() - }).map(|s| { - debug!(logger, "rcvd new serialized"; - "count" => count); - count = next(count, &s, &mut buf); - rcvd_msg = true; - }); - - if !rcvd_msg { - #[cfg(feature = "no-thrash")] - thread::sleep(Duration::from_millis(1) / 10); - } +impl Drop for InfluxWriter { + fn drop(&mut self) { + self.kill_switch.send(()); + if let Some(thread) = self.thread.take() { + let _ = thread.join(); } - crit!(logger, "goodbye"); - }); - (thread, tx) + } } - mod tests { use super::*; diff --git a/src/latency.rs b/src/latency.rs index 9b4d28a..04bbc1f 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -418,6 +418,15 @@ impl Manager { } } +impl Drop for Manager { + fn drop(&mut self) { + self.tx.send(Latency::Terminate); + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } + } +} + //impl LatencyManager {