diff --git a/Cargo.toml b/Cargo.toml index 6f19817..3e67568 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ slog-async = "2" smallvec = "0.6" num = "0.1" dirs = "1" +crossbeam-channel = "0.3" sloggers = { path = "../sloggers" } diff --git a/src/influx.rs b/src/influx.rs index 754c0dc..d04a47f 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -4,11 +4,10 @@ use std::io::Read; use std::sync::Arc; use std::sync::mpsc::{Sender, Receiver, channel, SendError}; -use std::thread; -#[cfg(feature = "warnings")] -use std::fs; -use std::time::{Instant, Duration}; +use std::{thread, fs, mem}; +use std::time::*; use std::hash::BuildHasherDefault; +use std::collections::VecDeque; use hyper::status::StatusCode; use hyper::client::response::Response; @@ -205,6 +204,58 @@ macro_rules! measure { }}; } +#[derive(Clone, Debug)] +pub struct Point { + pub time: T, + pub value: V +} +pub struct DurationWindow { + pub size: Duration, + pub mean: Duration, + pub sum: Duration, + pub count: u32, + pub items: VecDeque> +} + +impl DurationWindow { + #[inline] + pub fn update(&mut self, time: Instant, value: Duration) { + self.add(time, value); + self.refresh(time); + } + + #[inline] + pub fn refresh(&mut self, t: Instant) -> &Self { + if !self.items.is_empty() { + let (n_remove, sum, count) = + self.items.iter() + .take_while(|x| t - x.time > self.size) + .fold((0, self.sum, self.count), |(n_remove, sum, count), x| { + (n_remove + 1, sum - x.value, count - 1) + }); + self.sum = sum; + self.count = count; + for _ in 0..n_remove { + self.items.pop_front(); + } + } + + if self.count > 0 { + self.mean = self.sum / self.count.into(); + } + + self + } + + #[inline] + pub fn add(&mut self, time: Instant, value: Duration) { + let p = Point { time, value }; + self.sum += p.value; + self.count += 1; + self.items.push_back(p); + } +} + /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` /// measurements have accumulated. @@ -310,69 +361,165 @@ impl InfluxWriter { .expect("influx writer url should parse"); let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { - const MAX_PENDING: Duration = Duration::from_secs(1); + use std::collections::VecDeque; + use std::time::*; + use crossbeam_channel as chan; - let client = Client::new(); + const MAX_PENDING: Duration = Duration::from_secs(2); + const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32; + const MAX_BACKLOG: usize = 512; + const MAX_OUTSTANDING_HTTP: usize = 16; - debug!(logger, "initializing buffers"); - let mut buf = String::with_capacity(32 * 32 * 32); + let client = Arc::new(Client::new()); + + info!(logger, "initializing buffers"; + "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, + "MAX_BACKLOG" => MAX_BACKLOG); + + // pre-allocated buffers ready for use if the active one is stasheed + // during an outage + let mut spares: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); + + // queue failed sends here until problem resolved, then send again. in worst + // case scenario, loop back around on buffers queued in `backlog`, writing + // over the oldest first. + // + let mut backlog: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); + + for _ in 0..MAX_BACKLOG { + spares.push_back(String::with_capacity(32 * 32 * 32)); + } + + struct Resp { + pub buf: String, + pub took: Duration, + } + + let mut db_health = DurationWindow { + size: Duration::from_secs(120), + mean: Duration::new(10, 0), + sum: Duration::new(0, 0), + count: 0, + items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP), + }; + + let (http_tx, http_rx) = chan::bounded(32); + + let mut buf = spares.pop_front().unwrap(); let mut count = 0; + let mut extras = 0; // any new Strings we intro to the system let mut last = Instant::now(); + let mut last_clear = Instant::now(); let mut loop_time = Instant::now(); - let send = |buf: &str| { - let resp = client.post(url.clone()) - .body(buf) - .send(); - match resp { + let n_out = |s: &VecDeque, b: &VecDeque, extras: usize| -> usize { + MAX_BACKLOG + extras - s.len() - b.len() - 1 + }; - Ok(Response { status, .. }) if status == StatusCode::NoContent => { - debug!(logger, "server responded ok: 204 NoContent"); - } + assert_eq!(n_out(&spares, &backlog, extras), 0); - Ok(mut resp) => { - let mut server_resp = String::with_capacity(32 * 1024); // need to allocate here bc will be - // sent to logging thread + let send = |mut buf: String, backlog: &mut VecDeque, n_outstanding: usize| { + if n_outstanding >= MAX_OUTSTANDING_HTTP { + backlog.push_back(buf); + return + } + let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url + let tx = http_tx.clone(); + let thread_logger = logger.new(o!("n_outstanding" => n_outstanding)); + let client = Arc::clone(&client); + let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { + let logger = thread_logger; + let start = Instant::now(); + 'a: for n_req in 0..5u32 { + let sent = Instant::now(); + let resp = client.post(url.clone()) + .body(buf.as_str()) + .send(); + let rcvd = Instant::now(); + let took = rcvd - sent; + let mut n_tx = 0u32; + match resp { + Ok(Response { status, .. }) if status == StatusCode::NoContent => { + debug!(logger, "server responded ok: 204 NoContent"); + buf.clear(); + let mut resp = Some(Ok(Resp { buf, took })); + 'b: loop { + n_tx += 1; + match tx.try_send(resp.take().unwrap()) { + Ok(_) => return, + + Err(chan::TrySendError::Full(r)) => { + let throttle = Duration::from_millis(1000) * n_tx; + warn!(logger, "channel full: InfluxWriter http thread failed to return buf"; + "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle)); + resp = Some(r); + thread::sleep(throttle); + } + + Err(chan::TrySendError::Disconnected(_)) => { + warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return"; + "n_tx" => n_tx, "n_req" => n_req); + return + } + } + } + } - let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); + Ok(mut resp) => { + let mut server_resp = String::new(); + let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); + error!(logger, "influx server error (request took {:?})", took; + "status" => %resp.status, + "body" => server_resp); + } - error!(logger, "influx server error"; - "status" => resp.status.to_string(), - "body" => server_resp); - } + Err(e) => { + error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e); + } + } - Err(why) => { - error!(logger, "http request failed: {:?}", why); + let throttle = Duration::from_secs(2) * n_req * n_req; + if n_req > 0 { + warn!(logger, "InfluxWriter http thread: pausing before next request"; + "n_req" => n_req, + "throttle" => %format_args!("{:?}", throttle), + "took" => %format_args!("{:?}", took)); + } + thread::sleep(throttle); // 0, 2, 8, 16, 32 } + let took = Instant::now() - start; + warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer"; + "took" => %format_args!("{:?}", took)); + tx.send(Err(Resp { buf, took })).unwrap(); // failure here is unrecoverable + }); + + if let Err(e) = thread_res { + crit!(logger, "failed to spawn thread: {}", e); } }; - let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: &Instant, last: &mut Instant| -> usize { + let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result { match prev { 0 if buffer_size > 0 => { serialize_owned(m, buf); - 1 + Ok(1) } - n if n < buffer_size && *loop_time - *last < MAX_PENDING => { + n if n < buffer_size && loop_time - last < MAX_PENDING => { buf.push_str("\n"); serialize_owned(m, buf); - n + 1 + Ok(n + 1) } n => { buf.push_str("\n"); serialize_owned(m, buf); - debug!(logger, "sending buffer to influx"; "len" => n); - send(buf); - *last = *loop_time; - buf.clear(); - 0 + Err(n + 1) } } }; - loop { + 'event: loop { loop_time = Instant::now(); match rx.recv() { Ok(Some(mut meas)) => { @@ -385,7 +532,51 @@ impl InfluxWriter { //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } } - count = next(count, &meas, &mut buf, &loop_time, &mut last); + count = match next(count, &meas, &mut buf, loop_time, last) { + Ok(n) => n, + Err(n) => { + let mut count = 0; + let mut next: String = match spares.pop_front() { + Some(x) => x, + + None => { + let n_outstanding = n_out(&spares, &backlog, extras); + crit!(logger, "no available buffers in `spares`, pulling from backlog"; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len()); + match backlog.pop_front() { + // Note: this does not clear the backlog buffer, + // instead we will just write more and more until + // we are out of memory. I expect that will never + // happen. + // + Some(x) => { + count = 1; // otherwise, no '\n' added in `next(..)` - we are + // sending a "full" buffer to be extended + x + } + + None => { + extras += 1; + crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len(), + "extras" => extras); + String::new() + } + } + } + }; + // after swap, buf in next, so want to send next + // + mem::swap(&mut buf, &mut next); + let n_outstanding = n_out(&spares, &backlog, extras); + send(next, &mut backlog, n_outstanding); + count + } + }; } Ok(None) => { @@ -393,20 +584,111 @@ impl InfluxWriter { if buf.len() > 0 { info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); - count = next(buffer_size, &meas, &mut buf, &loop_time, &mut last); - info!(logger, "triggered send of remaining buffer"; "count" => count); - if !buf.is_empty() { - warn!(logger, "buffer sill isn't empty after 'wtrterm' meas"; - "count" => count, "buf.len()" => buf.len()); - send(&buf); + let _ = next(buffer_size, &meas, &mut buf, loop_time, last); + let n_outstanding = n_out(&spares, &backlog, extras); + send(buf, &mut backlog, n_outstanding); + } + let start = Instant::now(); + let mut hb = start; + let mut n_ok = 0; + let mut n_err = 0; + loop { + let loop_time = Instant::now(); + let n_outstanding = n_out(&spares, &backlog, extras); + if backlog.is_empty() && n_outstanding < 1 { + info!(logger, "cleared any remaining backlog"; + "n_outstanding" => n_outstanding, + "backlog.len()" => backlog.len(), + "n_cleared_ok" => n_ok, + "n_cleared_err" => n_err, + "extras" => extras, + "elapsed" => %format_args!("{:?}", start - loop_time)); + break 'event + } + if loop_time - hb > Duration::from_secs(5) { + info!(logger, "InfluxWriter still clearing backlog .."; + "n_outstanding" => n_outstanding, + "backlog.len()" => backlog.len(), + "n_cleared_ok" => n_ok, + "n_cleared_err" => n_err, + "extras" => extras, + "elapsed" => %format_args!("{:?}", start - loop_time)); + hb = loop_time; } + if let Some(buf) = backlog.pop_front() { + let n_outstanding = n_out(&spares, &backlog, extras); + send(buf, &mut backlog, n_outstanding); + } + + 'rx: loop { + match http_rx.try_recv() { + Ok(Ok(Resp { buf, .. })) => { + n_ok += 1; + spares.push_back(buf); // needed so `n_outstanding` count remains accurate + } + Ok(Err(Resp { buf, .. })) => { + n_err += 1; + spares.push_back(buf); // needed so `n_outstanding` count remains accurate + } + Err(chan::TryRecvError::Disconnected) => { + crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting"; + "n_outstanding" => n_outstanding, + "backlog.len()" => backlog.len(), + "n_cleared_ok" => n_ok, + "n_cleared_err" => n_err, + "extras" => extras, + "elapsed" => %format_args!("{:?}", start - loop_time)); + break 'event + } + Err(_) => break 'rx + } + } + thread::sleep(Duration::from_millis(100)); } - info!(logger, "exiting loop"; "count" => count, "buf.len()" => buf.len()); - break } _ => { - thread::sleep(Duration::new(0, 1)) + let mut active = false; + db_health.refresh(loop_time); + let n_outstanding = n_out(&spares, &backlog, extras); + let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200); + if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { + if let Some(queued) = backlog.pop_front() { + let n_outstanding = n_out(&spares, &backlog, extras); + send(queued, &mut backlog, n_outstanding); + active = true; + } + } + + loop { + match http_rx.try_recv() { + Ok(Ok(Resp { buf, took })) => { + db_health.add(loop_time, took); + spares.push_back(buf); + active = true; + } + + Ok(Err(Resp { buf, took })) => { + db_health.add(loop_time, took); + backlog.push_front(buf); + active = true; + } + + Err(chan::TryRecvError::Disconnected) => { + crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting"; + "n_outstanding" => n_outstanding, + "backlog.len()" => backlog.len(), + "extras" => extras); + break 'event + } + + Err(_) => break + } + } + + if !active { + thread::sleep(Duration::new(0, 1)) + } } } } @@ -1140,6 +1422,31 @@ mod tests { }); } + #[bench] + fn clone_url_for_thread(b: &mut Bencher) { + let host = "ahmes"; + let db = "mlp"; + let url = + Url::parse_with_params(&format!("http://{}:8086/write", host), + &[("db", db), ("precision", "ns")]).unwrap(); + b.iter(|| { + url.clone() + }) + } + + #[bench] + fn clone_arc_url_for_thread(b: &mut Bencher) { + let host = "ahmes"; + let db = "mlp"; + let url = + Url::parse_with_params(&format!("http://{}:8086/write", host), + &[("db", db), ("precision", "ns")]).unwrap(); + let url = Arc::new(url); + b.iter(|| { + Arc::clone(&url) + }) + } + #[test] fn it_serializes_a_hard_to_serialize_message_from_owned() { let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; diff --git a/src/lib.rs b/src/lib.rs index f3ffd93..da7bfda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ extern crate hdrhistogram; extern crate smallvec; extern crate num; extern crate dirs; +extern crate crossbeam_channel; #[cfg(feature = "zmq")] extern crate zmq; #[cfg(feature = "latency")]