diff --git a/src/influx.rs b/src/influx.rs index 5ce43d2..8d24d3a 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -105,67 +105,151 @@ macro_rules! measure { }}; } -#[test] -fn it_checks_color_tag_error_in_non_doctest() { - let (tx, rx) = channel(); - measure!(tx, test, tag[color;"red"], int[n;1]); - let meas: OwnedMeasurement = rx.recv().unwrap(); - assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas); +/// exactly like `writer`, but also returns a `Sender` and accepts +/// incoming `Measurement`s that way *in addition* to the old socket/`String` +/// method +/// +pub struct InfluxWriter { + kill_switch: Sender<()>, + thread: Option>, } -#[test] -fn it_uses_the_measure_macro() { - let (tx, rx) = channel(); - measure!(tx, test_measurement, - tag [ one => "a" ], - tag [ two => "b" ], - int [ three => 2 ], - float [ four => 1.2345 ], - string [ five => String::from("d") ], - bool [ six => true ], - int [ seven => { 1 + 2 } ], - time [ 1 ] - ); - thread::sleep_ms(10); - let meas: OwnedMeasurement = rx.try_recv().unwrap(); - assert_eq!(meas.key, "test_measurement"); - assert_eq!(meas.tags.get("one"), Some(&"a")); - assert_eq!(meas.tags.get("two"), Some(&"b")); - assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); - assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); - assert_eq!(meas.timestamp, Some(1)); -} +impl InfluxWriter { -#[test] -fn it_uses_the_measure_macro_alt_syntax() { - - let (tx, rx) = channel(); - measure!(tx, test_measurement, - tag[one; "a"], - tag[two; "b"], - int[three; 2], - float[four; 1.2345], - string[five; String::from("d")], - bool [ six => true ], - int[seven; { 1 + 2 }], - time[1] - ); - - thread::sleep_ms(10); - let meas: OwnedMeasurement = rx.try_recv().unwrap(); - assert_eq!(meas.key, "test_measurement"); - assert_eq!(meas.tags.get("one"), Some(&"a")); - assert_eq!(meas.tags.get("two"), Some(&"b")); - assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); - assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); - assert_eq!(meas.timestamp, Some(1)); + pub fn new(log_path: &str, warnings: Sender) -> (Self, Sender) { + let (kill_switch, terminate) = channel(); + let (tx, rx) = channel(); + let logger = file_logger(log_path, Severity::Info); + let thread = thread::spawn(move || { + info!(logger, "initializing zmq"); + let _ = fs::create_dir("/tmp/mm"); + let ctx = zmq::Context::new(); + let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); + info!(logger, "initializing url"; + "DB_HOST" => DB_HOST, + "DB_NAME" => DB_NAME); + let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let client = Client::new(); + info!(logger, "initializing buffers"); + let mut meas_buf = String::with_capacity(4096); + let mut buf = String::with_capacity(4096); + let mut server_resp = String::with_capacity(4096); + let mut count = 0; + + let next = |prev: u8, s: &str, buf: &mut String| -> u8 { + trace!(logger, "appending serialized measurement to buffer"; + "prev" => prev, + "buf.len()" => buf.len()); + match prev { + 0 => { + buf.push_str(s); + 1 + } + + n @ 1 ... N_BUFFER => { + buf.push_str("\n"); + buf.push_str(s); + n + 1 + } + + _ => { + buf.push_str("\n"); + if s.len() > 0 { + buf.push_str(s); + } + trace!(logger, "sending buffer to influx"; + "buf.len()" => buf.len()); + + let resp = client.post(url.clone()) + .body(buf.as_str()) + .send(); + match resp { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => { + debug!(logger, "server responded ok: 204 NoContent"); + } + + Ok(mut resp) => { + let mut server_resp = String::with_capacity(1024); + //server_resp.push_str(&format!("sent at {}:\n", Utc::now())); + //server_resp.push_str(&buf); + //server_resp.push_str("\nreceived:\n"); + resp.read_to_string(&mut server_resp); //.unwrap_or(0); + error!(logger, "influx server error"; + "status" => resp.status.to_string(), + "body" => server_resp); + } + + Err(why) => { + error!(logger, "http request failed: {:?}", why); + // warnings.send( + // Warning::Error( + // format!("Influx write error: {}", why))); + } + } + buf.clear(); + 0 + } + } + }; + + let mut rcvd_msg = false; + + loop { + rcvd_msg = false; + rx.try_recv() + .map(|meas| { + debug!(logger, "rcvd new OwnedMeasurement"; + "count" => count); + serialize_owned(&meas, &mut meas_buf); + count = next(count, &meas_buf, &mut buf); + meas_buf.clear(); + rcvd_msg = true; + }); + + socket.recv_bytes(zmq::DONTWAIT).ok() + .and_then(|bytes| { + String::from_utf8(bytes).ok() + }).map(|s| { + debug!(logger, "rcvd new serialized"; + "count" => count); + count = next(count, &s, &mut buf); + rcvd_msg = true; + }); + + let end = terminate.try_recv() + .map(|_| { + let _ = next(::std::u8::MAX, "", &mut buf); + true + }).unwrap_or(false); + + if end { break } + + #[cfg(feature = "no-thrash")] + { + if !rcvd_msg { + thread::sleep(Duration::new(0, 5000)); + } + } + } + + crit!(logger, "goodbye"); + }); + let writer = InfluxWriter { + kill_switch, + thread: Some(thread) + }; + (writer, tx) + } } -#[test] -fn try_to_break_measure_macro() { - let (tx, _) = channel(); - measure!(tx, one, tag[x=>"y"], int[n;1]); - measure!(tx, one, tag[x;"y"], int[n;1],); +impl Drop for InfluxWriter { + fn drop(&mut self) { + let _ = self.kill_switch.send(()).unwrap(); + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } + } } pub fn pull(ctx: &zmq::Context) -> Result { @@ -281,6 +365,11 @@ pub fn serialize(measurement: &Measurement, line: &mut String) { } } +/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. +/// +/// The serialized measurement is appended to the end of the string without +/// any regard for what exited in it previously. +/// pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { line.push_str(&escape_tag(measurement.key)); @@ -449,156 +538,74 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 { //pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) } -/// exactly like `writer`, but also returns a `Sender` and accepts -/// incoming `Measurement`s that way *in addition* to the old socket/`String` -/// method -/// -pub struct InfluxWriter { - kill_switch: Sender<()>, - thread: Option>, -} - -impl InfluxWriter { - - pub fn new(log_path: &str, warnings: Sender) -> (Self, Sender) { - let (kill_switch, terminate) = channel(); - let (tx, rx) = channel(); - let logger = file_logger(log_path, Severity::Info); - let thread = thread::spawn(move || { - info!(logger, "initializing zmq"); - let _ = fs::create_dir("/tmp/mm"); - let ctx = zmq::Context::new(); - let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); - info!(logger, "initializing url"; - "DB_HOST" => DB_HOST, - "DB_NAME" => DB_NAME); - let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); - let client = Client::new(); - info!(logger, "initializing buffers"); - let mut meas_buf = String::with_capacity(4096); - let mut buf = String::with_capacity(4096); - let mut server_resp = String::with_capacity(4096); - let mut count = 0; - - let next = |prev: u8, s: &str, buf: &mut String| -> u8 { - trace!(logger, "appending serialized measurement to buffer"; - "prev" => prev, - "buf.len()" => buf.len()); - match prev { - 0 => { - buf.push_str(s); - 1 - } - - n @ 1 ... N_BUFFER => { - buf.push_str("\n"); - buf.push_str(s); - n + 1 - } - - _ => { - buf.push_str("\n"); - if s.len() > 0 { - buf.push_str(s); - } - trace!(logger, "sending buffer to influx"; - "buf.len()" => buf.len()); - - let resp = client.post(url.clone()) - .body(buf.as_str()) - .send(); - match resp { - - Ok(Response { status, .. }) if status == StatusCode::NoContent => { - debug!(logger, "server responded ok: 204 NoContent"); - } - - Ok(mut resp) => { - let mut server_resp = String::with_capacity(1024); - //server_resp.push_str(&format!("sent at {}:\n", Utc::now())); - //server_resp.push_str(&buf); - //server_resp.push_str("\nreceived:\n"); - resp.read_to_string(&mut server_resp); //.unwrap_or(0); - error!(logger, "influx server error"; - "status" => resp.status.to_string(), - "body" => server_resp); - } - - Err(why) => { - error!(logger, "http request failed: {:?}", why); - // warnings.send( - // Warning::Error( - // format!("Influx write error: {}", why))); - } - } - buf.clear(); - 0 - } - } - }; - - let mut rcvd_msg = false; - loop { - rcvd_msg = false; - rx.try_recv() - .map(|meas| { - debug!(logger, "rcvd new OwnedMeasurement"; - "count" => count); - serialize_owned(&meas, &mut meas_buf); - count = next(count, &meas_buf, &mut buf); - meas_buf.clear(); - rcvd_msg = true; - }); - socket.recv_bytes(zmq::DONTWAIT).ok() - .and_then(|bytes| { - String::from_utf8(bytes).ok() - }).map(|s| { - debug!(logger, "rcvd new serialized"; - "count" => count); - count = next(count, &s, &mut buf); - rcvd_msg = true; - }); +mod tests { + use super::*; + use test::{black_box, Bencher}; - let end = terminate.try_recv() - .map(|_| { - let _ = next(::std::u8::MAX, "", &mut buf); - true - }).unwrap_or(false); + #[test] + fn it_checks_color_tag_error_in_non_doctest() { + let (tx, rx) = channel(); + measure!(tx, test, tag[color;"red"], int[n;1]); + let meas: OwnedMeasurement = rx.recv().unwrap(); + assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas); + } - if end { break } + #[test] + fn it_uses_the_measure_macro() { + let (tx, rx) = channel(); + measure!(tx, test_measurement, + tag [ one => "a" ], + tag [ two => "b" ], + int [ three => 2 ], + float [ four => 1.2345 ], + string [ five => String::from("d") ], + bool [ six => true ], + int [ seven => { 1 + 2 } ], + time [ 1 ] + ); + thread::sleep_ms(10); + let meas: OwnedMeasurement = rx.try_recv().unwrap(); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.tags.get("one"), Some(&"a")); + assert_eq!(meas.tags.get("two"), Some(&"b")); + assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); + assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); + assert_eq!(meas.timestamp, Some(1)); + } - #[cfg(feature = "no-thrash")] - { - if !rcvd_msg { - thread::sleep(Duration::new(0, 5000)); - } - } - } + #[test] + fn it_uses_the_measure_macro_alt_syntax() { - crit!(logger, "goodbye"); - }); - let writer = InfluxWriter { - kill_switch, - thread: Some(thread) - }; - (writer, tx) + let (tx, rx) = channel(); + measure!(tx, test_measurement, + tag[one; "a"], + tag[two; "b"], + int[three; 2], + float[four; 1.2345], + string[five; String::from("d")], + bool [ six => true ], + int[seven; { 1 + 2 }], + time[1] + ); + + thread::sleep_ms(10); + let meas: OwnedMeasurement = rx.try_recv().unwrap(); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.tags.get("one"), Some(&"a")); + assert_eq!(meas.tags.get("two"), Some(&"b")); + assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); + assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); + assert_eq!(meas.timestamp, Some(1)); } -} -impl Drop for InfluxWriter { - fn drop(&mut self) { - let _ = self.kill_switch.send(()).unwrap(); - if let Some(thread) = self.thread.take() { - let _ = thread.join(); - } + #[test] + fn try_to_break_measure_macro() { + let (tx, _) = channel(); + measure!(tx, one, tag[x=>"y"], int[n;1]); + measure!(tx, one, tag[x;"y"], int[n;1],); } -} - -mod tests { - use super::*; - use test::{black_box, Bencher}; #[bench] fn measure_macro_small(b: &mut Bencher) {