Browse Source

feat(InfluxWriter): huge refactor so InfluxWriter queues a backlog on failed db requests

will need extensive testing before prod use
master
Jonathan Strong 6 years ago
parent
commit
11b214da7a
3 changed files with 355 additions and 46 deletions
  1. +1
    -0
      Cargo.toml
  2. +353
    -46
      src/influx.rs
  3. +1
    -0
      src/lib.rs

+ 1
- 0
Cargo.toml View File

@@ -28,6 +28,7 @@ slog-async = "2"
smallvec = "0.6"
num = "0.1"
dirs = "1"
crossbeam-channel = "0.3"

sloggers = { path = "../sloggers" }



+ 353
- 46
src/influx.rs View File

@@ -4,11 +4,10 @@
use std::io::Read;
use std::sync::Arc;
use std::sync::mpsc::{Sender, Receiver, channel, SendError};
use std::thread;
#[cfg(feature = "warnings")]
use std::fs;
use std::time::{Instant, Duration};
use std::{thread, fs, mem};
use std::time::*;
use std::hash::BuildHasherDefault;
use std::collections::VecDeque;

use hyper::status::StatusCode;
use hyper::client::response::Response;
@@ -205,6 +204,58 @@ macro_rules! measure {
}};
}

#[derive(Clone, Debug)]
pub struct Point<T, V> {
pub time: T,
pub value: V
}
pub struct DurationWindow {
pub size: Duration,
pub mean: Duration,
pub sum: Duration,
pub count: u32,
pub items: VecDeque<Point<Instant, Duration>>
}

impl DurationWindow {
#[inline]
pub fn update(&mut self, time: Instant, value: Duration) {
self.add(time, value);
self.refresh(time);
}

#[inline]
pub fn refresh(&mut self, t: Instant) -> &Self {
if !self.items.is_empty() {
let (n_remove, sum, count) =
self.items.iter()
.take_while(|x| t - x.time > self.size)
.fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
(n_remove + 1, sum - x.value, count - 1)
});
self.sum = sum;
self.count = count;
for _ in 0..n_remove {
self.items.pop_front();
}
}

if self.count > 0 {
self.mean = self.sum / self.count.into();
}

self
}

#[inline]
pub fn add(&mut self, time: Instant, value: Duration) {
let p = Point { time, value };
self.sum += p.value;
self.count += 1;
self.items.push_back(p);
}
}

/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
/// measurements have accumulated.
@@ -310,69 +361,165 @@ impl InfluxWriter {
.expect("influx writer url should parse");

let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
const MAX_PENDING: Duration = Duration::from_secs(1);
use std::collections::VecDeque;
use std::time::*;
use crossbeam_channel as chan;

let client = Client::new();
const MAX_PENDING: Duration = Duration::from_secs(2);
const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32;
const MAX_BACKLOG: usize = 512;
const MAX_OUTSTANDING_HTTP: usize = 16;

debug!(logger, "initializing buffers");
let mut buf = String::with_capacity(32 * 32 * 32);
let client = Arc::new(Client::new());

info!(logger, "initializing buffers";
"INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
"MAX_BACKLOG" => MAX_BACKLOG);

// pre-allocated buffers ready for use if the active one is stasheed
// during an outage
let mut spares: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG);

// queue failed sends here until problem resolved, then send again. in worst
// case scenario, loop back around on buffers queued in `backlog`, writing
// over the oldest first.
//
let mut backlog: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG);

for _ in 0..MAX_BACKLOG {
spares.push_back(String::with_capacity(32 * 32 * 32));
}

struct Resp {
pub buf: String,
pub took: Duration,
}

let mut db_health = DurationWindow {
size: Duration::from_secs(120),
mean: Duration::new(10, 0),
sum: Duration::new(0, 0),
count: 0,
items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP),
};

let (http_tx, http_rx) = chan::bounded(32);

let mut buf = spares.pop_front().unwrap();
let mut count = 0;
let mut extras = 0; // any new Strings we intro to the system
let mut last = Instant::now();
let mut last_clear = Instant::now();
let mut loop_time = Instant::now();

let send = |buf: &str| {
let resp = client.post(url.clone())
.body(buf)
.send();
match resp {
let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
MAX_BACKLOG + extras - s.len() - b.len() - 1
};

Ok(Response { status, .. }) if status == StatusCode::NoContent => {
debug!(logger, "server responded ok: 204 NoContent");
}
assert_eq!(n_out(&spares, &backlog, extras), 0);

Ok(mut resp) => {
let mut server_resp = String::with_capacity(32 * 1024); // need to allocate here bc will be
// sent to logging thread
let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize| {
if n_outstanding >= MAX_OUTSTANDING_HTTP {
backlog.push_back(buf);
return
}
let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
let tx = http_tx.clone();
let thread_logger = logger.new(o!("n_outstanding" => n_outstanding));
let client = Arc::clone(&client);
let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
let logger = thread_logger;
let start = Instant::now();
'a: for n_req in 0..5u32 {
let sent = Instant::now();
let resp = client.post(url.clone())
.body(buf.as_str())
.send();
let rcvd = Instant::now();
let took = rcvd - sent;
let mut n_tx = 0u32;
match resp {
Ok(Response { status, .. }) if status == StatusCode::NoContent => {
debug!(logger, "server responded ok: 204 NoContent");
buf.clear();
let mut resp = Some(Ok(Resp { buf, took }));
'b: loop {
n_tx += 1;
match tx.try_send(resp.take().unwrap()) {
Ok(_) => return,

Err(chan::TrySendError::Full(r)) => {
let throttle = Duration::from_millis(1000) * n_tx;
warn!(logger, "channel full: InfluxWriter http thread failed to return buf";
"n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle));
resp = Some(r);
thread::sleep(throttle);
}

Err(chan::TrySendError::Disconnected(_)) => {
warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return";
"n_tx" => n_tx, "n_req" => n_req);
return
}
}
}
}

let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
Ok(mut resp) => {
let mut server_resp = String::new();
let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
error!(logger, "influx server error (request took {:?})", took;
"status" => %resp.status,
"body" => server_resp);
}

error!(logger, "influx server error";
"status" => resp.status.to_string(),
"body" => server_resp);
}
Err(e) => {
error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e);
}
}

Err(why) => {
error!(logger, "http request failed: {:?}", why);
let throttle = Duration::from_secs(2) * n_req * n_req;
if n_req > 0 {
warn!(logger, "InfluxWriter http thread: pausing before next request";
"n_req" => n_req,
"throttle" => %format_args!("{:?}", throttle),
"took" => %format_args!("{:?}", took));
}
thread::sleep(throttle); // 0, 2, 8, 16, 32
}
let took = Instant::now() - start;
warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
"took" => %format_args!("{:?}", took));
tx.send(Err(Resp { buf, took })).unwrap(); // failure here is unrecoverable
});

if let Err(e) = thread_res {
crit!(logger, "failed to spawn thread: {}", e);
}
};

let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: &Instant, last: &mut Instant| -> usize {
let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
match prev {
0 if buffer_size > 0 => {
serialize_owned(m, buf);
1
Ok(1)
}

n if n < buffer_size && *loop_time - *last < MAX_PENDING => {
n if n < buffer_size && loop_time - last < MAX_PENDING => {
buf.push_str("\n");
serialize_owned(m, buf);
n + 1
Ok(n + 1)
}

n => {
buf.push_str("\n");
serialize_owned(m, buf);
debug!(logger, "sending buffer to influx"; "len" => n);
send(buf);
*last = *loop_time;
buf.clear();
0
Err(n + 1)
}
}
};

loop {
'event: loop {
loop_time = Instant::now();
match rx.recv() {
Ok(Some(mut meas)) => {
@@ -385,7 +532,51 @@ impl InfluxWriter {

//#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }

count = next(count, &meas, &mut buf, &loop_time, &mut last);
count = match next(count, &meas, &mut buf, loop_time, last) {
Ok(n) => n,
Err(n) => {
let mut count = 0;
let mut next: String = match spares.pop_front() {
Some(x) => x,

None => {
let n_outstanding = n_out(&spares, &backlog, extras);
crit!(logger, "no available buffers in `spares`, pulling from backlog";
"n_outstanding" => n_outstanding,
"spares.len()" => spares.len(),
"backlog.len()" => backlog.len());
match backlog.pop_front() {
// Note: this does not clear the backlog buffer,
// instead we will just write more and more until
// we are out of memory. I expect that will never
// happen.
//
Some(x) => {
count = 1; // otherwise, no '\n' added in `next(..)` - we are
// sending a "full" buffer to be extended
x
}

None => {
extras += 1;
crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String";
"n_outstanding" => n_outstanding,
"spares.len()" => spares.len(),
"backlog.len()" => backlog.len(),
"extras" => extras);
String::new()
}
}
}
};
// after swap, buf in next, so want to send next
//
mem::swap(&mut buf, &mut next);
let n_outstanding = n_out(&spares, &backlog, extras);
send(next, &mut backlog, n_outstanding);
count
}
};
}

Ok(None) => {
@@ -393,20 +584,111 @@ impl InfluxWriter {
if buf.len() > 0 {
info!(logger, "sending remaining buffer to influx on terminate"; "count" => count);
let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1));
count = next(buffer_size, &meas, &mut buf, &loop_time, &mut last);
info!(logger, "triggered send of remaining buffer"; "count" => count);
if !buf.is_empty() {
warn!(logger, "buffer sill isn't empty after 'wtrterm' meas";
"count" => count, "buf.len()" => buf.len());
send(&buf);
let _ = next(buffer_size, &meas, &mut buf, loop_time, last);
let n_outstanding = n_out(&spares, &backlog, extras);
send(buf, &mut backlog, n_outstanding);
}
let start = Instant::now();
let mut hb = start;
let mut n_ok = 0;
let mut n_err = 0;
loop {
let loop_time = Instant::now();
let n_outstanding = n_out(&spares, &backlog, extras);
if backlog.is_empty() && n_outstanding < 1 {
info!(logger, "cleared any remaining backlog";
"n_outstanding" => n_outstanding,
"backlog.len()" => backlog.len(),
"n_cleared_ok" => n_ok,
"n_cleared_err" => n_err,
"extras" => extras,
"elapsed" => %format_args!("{:?}", start - loop_time));
break 'event
}
if loop_time - hb > Duration::from_secs(5) {
info!(logger, "InfluxWriter still clearing backlog ..";
"n_outstanding" => n_outstanding,
"backlog.len()" => backlog.len(),
"n_cleared_ok" => n_ok,
"n_cleared_err" => n_err,
"extras" => extras,
"elapsed" => %format_args!("{:?}", start - loop_time));
hb = loop_time;
}
if let Some(buf) = backlog.pop_front() {
let n_outstanding = n_out(&spares, &backlog, extras);
send(buf, &mut backlog, n_outstanding);
}

'rx: loop {
match http_rx.try_recv() {
Ok(Ok(Resp { buf, .. })) => {
n_ok += 1;
spares.push_back(buf); // needed so `n_outstanding` count remains accurate
}
Ok(Err(Resp { buf, .. })) => {
n_err += 1;
spares.push_back(buf); // needed so `n_outstanding` count remains accurate
}
Err(chan::TryRecvError::Disconnected) => {
crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting";
"n_outstanding" => n_outstanding,
"backlog.len()" => backlog.len(),
"n_cleared_ok" => n_ok,
"n_cleared_err" => n_err,
"extras" => extras,
"elapsed" => %format_args!("{:?}", start - loop_time));
break 'event
}
Err(_) => break 'rx
}
}
thread::sleep(Duration::from_millis(100));
}
info!(logger, "exiting loop"; "count" => count, "buf.len()" => buf.len());
break
}

_ => {
thread::sleep(Duration::new(0, 1))
let mut active = false;
db_health.refresh(loop_time);
let n_outstanding = n_out(&spares, &backlog, extras);
let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy {
if let Some(queued) = backlog.pop_front() {
let n_outstanding = n_out(&spares, &backlog, extras);
send(queued, &mut backlog, n_outstanding);
active = true;
}
}

loop {
match http_rx.try_recv() {
Ok(Ok(Resp { buf, took })) => {
db_health.add(loop_time, took);
spares.push_back(buf);
active = true;
}

Ok(Err(Resp { buf, took })) => {
db_health.add(loop_time, took);
backlog.push_front(buf);
active = true;
}

Err(chan::TryRecvError::Disconnected) => {
crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting";
"n_outstanding" => n_outstanding,
"backlog.len()" => backlog.len(),
"extras" => extras);
break 'event
}

Err(_) => break
}
}

if !active {
thread::sleep(Duration::new(0, 1))
}
}
}
}
@@ -1140,6 +1422,31 @@ mod tests {
});
}

#[bench]
fn clone_url_for_thread(b: &mut Bencher) {
let host = "ahmes";
let db = "mlp";
let url =
Url::parse_with_params(&format!("http://{}:8086/write", host),
&[("db", db), ("precision", "ns")]).unwrap();
b.iter(|| {
url.clone()
})
}

#[bench]
fn clone_arc_url_for_thread(b: &mut Bencher) {
let host = "ahmes";
let db = "mlp";
let url =
Url::parse_with_params(&format!("http://{}:8086/write", host),
&[("db", db), ("precision", "ns")]).unwrap();
let url = Arc::new(url);
b.iter(|| {
Arc::clone(&url)
})
}

#[test]
fn it_serializes_a_hard_to_serialize_message_from_owned() {
let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;


+ 1
- 0
src/lib.rs View File

@@ -24,6 +24,7 @@ extern crate hdrhistogram;
extern crate smallvec;
extern crate num;
extern crate dirs;
extern crate crossbeam_channel;
#[cfg(feature = "zmq")]
extern crate zmq;
#[cfg(feature = "latency")]


Loading…
Cancel
Save