diff --git a/.gitignore b/.gitignore index a72bc8b..7afe480 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ **/*.rs.bk Cargo.lock .*.swp +/var/*.log diff --git a/src/influx.rs b/src/influx.rs index 8d24d3a..e667917 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -3,7 +3,7 @@ use std::iter::FromIterator; use std::io::{Write, Read}; -use std::sync::mpsc::{Sender, Receiver, channel}; +use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use std::thread; use std::collections::HashMap; use std::fs::{self, OpenOptions}; @@ -29,7 +29,7 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write"; const ZMQ_RCV_HWM: i32 = 0; const ZMQ_SND_HWM: i32 = 0; -const N_BUFFER: u8 = 160; +const N_BUFFER: u8 = 80; /// Provides flexible and ergonomic use of `Sender`. /// @@ -89,7 +89,6 @@ macro_rules! measure { (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) }; - (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; @@ -105,41 +104,51 @@ macro_rules! measure { }}; } -/// exactly like `writer`, but also returns a `Sender` and accepts -/// incoming `Measurement`s that way *in addition* to the old socket/`String` -/// method -/// +/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s +/// it receives (over a SPSC channel) and inserts to influxdb via http when `N_BUFFER` +/// measurements have accumulated. +/// pub struct InfluxWriter { + host: &'static str, + db: &'static str, + tx: Sender, kill_switch: Sender<()>, thread: Option>, } +impl Default for InfluxWriter { + fn default() -> Self { + InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log") + } +} + impl InfluxWriter { + /// Sends the `OwnedMeasurement` to the serialization thread. + /// + pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError> { + self.tx.send(m) + } - pub fn new(log_path: &str, warnings: Sender) -> (Self, Sender) { + pub fn new(host: &'static str, db: &'static str, log_path: &str) -> Self { let (kill_switch, terminate) = channel(); let (tx, rx) = channel(); let logger = file_logger(log_path, Severity::Info); let thread = thread::spawn(move || { - info!(logger, "initializing zmq"); - let _ = fs::create_dir("/tmp/mm"); - let ctx = zmq::Context::new(); - let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); info!(logger, "initializing url"; - "DB_HOST" => DB_HOST, - "DB_NAME" => DB_NAME); - let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + "DB_HOST" => host, + "DB_NAME" => db); + let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse"); let client = Client::new(); info!(logger, "initializing buffers"); - let mut meas_buf = String::with_capacity(4096); - let mut buf = String::with_capacity(4096); - let mut server_resp = String::with_capacity(4096); + let mut meas_buf = String::with_capacity(32 * 32 * 32); + let mut buf = String::with_capacity(32 * 32 * 32); let mut count = 0; let next = |prev: u8, s: &str, buf: &mut String| -> u8 { trace!(logger, "appending serialized measurement to buffer"; "prev" => prev, "buf.len()" => buf.len()); + match prev { 0 => { buf.push_str(s); @@ -160,31 +169,28 @@ impl InfluxWriter { trace!(logger, "sending buffer to influx"; "buf.len()" => buf.len()); - let resp = client.post(url.clone()) - .body(buf.as_str()) - .send(); - match resp { + #[cfg(not(test))] + { + let resp = client.post(url.clone()) + .body(buf.as_str()) + .send(); + match resp { - Ok(Response { status, .. }) if status == StatusCode::NoContent => { - debug!(logger, "server responded ok: 204 NoContent"); - } + Ok(Response { status, .. }) if status == StatusCode::NoContent => { + trace!(logger, "server responded ok: 204 NoContent"); + } - Ok(mut resp) => { - let mut server_resp = String::with_capacity(1024); - //server_resp.push_str(&format!("sent at {}:\n", Utc::now())); - //server_resp.push_str(&buf); - //server_resp.push_str("\nreceived:\n"); - resp.read_to_string(&mut server_resp); //.unwrap_or(0); - error!(logger, "influx server error"; - "status" => resp.status.to_string(), - "body" => server_resp); - } + Ok(mut resp) => { + let mut server_resp = String::with_capacity(1024); + resp.read_to_string(&mut server_resp); //.unwrap_or(0); + error!(logger, "influx server error"; + "status" => resp.status.to_string(), + "body" => server_resp); + } - Err(why) => { - error!(logger, "http request failed: {:?}", why); - // warnings.send( - // Warning::Error( - // format!("Influx write error: {}", why))); + Err(why) => { + error!(logger, "http request failed: {:?}", why); + } } } buf.clear(); @@ -197,26 +203,15 @@ impl InfluxWriter { loop { rcvd_msg = false; - rx.try_recv() + rx.recv_timeout(Duration::from_millis(10)) .map(|meas| { - debug!(logger, "rcvd new OwnedMeasurement"; - "count" => count); + trace!(logger, "rcvd new OwnedMeasurement"; "count" => count); serialize_owned(&meas, &mut meas_buf); count = next(count, &meas_buf, &mut buf); meas_buf.clear(); rcvd_msg = true; }); - socket.recv_bytes(zmq::DONTWAIT).ok() - .and_then(|bytes| { - String::from_utf8(bytes).ok() - }).map(|s| { - debug!(logger, "rcvd new serialized"; - "count" => count); - count = next(count, &s, &mut buf); - rcvd_msg = true; - }); - let end = terminate.try_recv() .map(|_| { let _ = next(::std::u8::MAX, "", &mut buf); @@ -235,11 +230,14 @@ impl InfluxWriter { crit!(logger, "goodbye"); }); - let writer = InfluxWriter { + + InfluxWriter { + host, + db, + tx, kill_switch, thread: Some(thread) - }; - (writer, tx) + } } } @@ -536,14 +534,19 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 { (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64 } -//pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) } - - mod tests { use super::*; use test::{black_box, Bencher}; + #[bench] + fn influx_writer_send(b: &mut Bencher) { + let m = InfluxWriter::default(); + b.iter(|| { + measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]); + }); + } + #[test] fn it_checks_color_tag_error_in_non_doctest() { let (tx, rx) = channel();