Browse Source

revamped InfluxWriter passed its tests with flying colors

this thing is a fucking rock now
master
Jonathan Strong 6 years ago
parent
commit
4830a27c3b
5 changed files with 191 additions and 97 deletions
  1. +5
    -3
      Cargo.toml
  2. +0
    -1
      examples/hist-interval.rs
  3. +62
    -0
      examples/precipice.rs
  4. +119
    -73
      src/influx.rs
  5. +5
    -20
      src/lib.rs

+ 5
- 3
Cargo.toml View File

@@ -1,7 +1,8 @@
[package] [package]
name = "logging" name = "logging"
version = "0.4.7"
version = "0.5.0"
authors = ["Jonathan Strong <jonathan.strong@gmail.com>"] authors = ["Jonathan Strong <jonathan.strong@gmail.com>"]
edition = "2018"


[[example]] [[example]]
name = "zmq-logger" name = "zmq-logger"
@@ -29,6 +30,7 @@ smallvec = "0.6"
num = "0.1" num = "0.1"
dirs = "1" dirs = "1"
crossbeam-channel = "0.3" crossbeam-channel = "0.3"
pretty_toa = "1.0.0"


sloggers = { path = "../sloggers" } sloggers = { path = "../sloggers" }


@@ -41,8 +43,8 @@ pubsub = { path = "../pubsub", optional = true }
[features] [features]
default = ["inlines"] default = ["inlines"]
no-thrash = [] no-thrash = []
trace = []
debug = []
trace = ["slog/release_max_level_trace", "slog/max_level_trace"]
debug = ["slog/release_max_level_debug", "slog/max_level_debug"]
test = [] test = []
localhost = [] localhost = []
harrison = [] harrison = []


+ 0
- 1
examples/hist-interval.rs View File

@@ -1,4 +1,3 @@
#![feature(duration_from_micros)]
#![allow(unused)] #![allow(unused)]


extern crate logging; extern crate logging;


+ 62
- 0
examples/precipice.rs View File

@@ -0,0 +1,62 @@
#![allow(unused_imports)]

#[macro_use]
extern crate slog;
#[macro_use]
extern crate logging;

use std::io::{self, prelude::*};
use std::thread;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::time::*;
use chrono::Utc;
use slog::Drain;
use pretty_toa::ThousandsSep;
use logging::influx::InfluxWriter;

const INTERVAL: Duration = Duration::from_micros(1); //from_millis(1);
const HB_EVERY: usize = 1_000_000;

fn main() {
let to_file = logging::truncating_file_logger("var/log/precipice.log", sloggers::types::Severity::Debug);
let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
let drain = slog_term::CompactFormat::new(decorator).use_utc_timestamp().build().fuse();
let drain = slog_async::Async::new(drain).chan_size(8192).thread_name("recv".into()).build().fuse();
let drain = slog::Duplicate::new(drain, to_file).fuse();
let root = slog::Logger::root(drain, o!());
let logger = root.new(o!("thread" => "main"));
info!(logger, "initializing...");
let influx = InfluxWriter::with_logger("localhost", "precipice", 1024, root.new(o!("thread" => "InfluxWriter")));
let stop = Arc::new(AtomicBool::new(false));
let thread = {
let stop = Arc::clone(&stop);
let logger = root.new(o!("thread" => "blaster"));
let influx = influx.clone();
thread::spawn(move || {
let mut i = 0;
let mut sum = 0;
while !stop.load(Ordering::Relaxed) {
measure!(influx, xs, i(i), tm(logging::inanos(Utc::now())));
sum += i;
i += 1;
if i % HB_EVERY == 0 {
info!(logger, "sent {} measurements", i.thousands_sep());
}
thread::sleep(INTERVAL);
}
info!(logger, "exiting"; "n_sent" => i, "sum" => sum);
})
};

let mut keys = String::new();
loop {
if let Ok(_) = io::stdin().read_line(&mut keys) {
break
}
thread::sleep(Duration::from_millis(1));
}
stop.store(true, Ordering::Relaxed);
let _ = thread.join();
}



+ 119
- 73
src/influx.rs View File

@@ -4,7 +4,7 @@
use std::io::Read; use std::io::Read;
use std::sync::Arc; use std::sync::Arc;
use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use std::sync::mpsc::{Sender, Receiver, channel, SendError};
use std::{thread, fs, mem};
use std::{thread, mem};
use std::time::*; use std::time::*;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
use std::collections::VecDeque; use std::collections::VecDeque;
@@ -24,6 +24,7 @@ use decimal::d128;
use uuid::Uuid; use uuid::Uuid;
use smallvec::SmallVec; use smallvec::SmallVec;
use slog::Logger; use slog::Logger;
use pretty_toa::ThousandsSep;


use super::{nanos, file_logger, LOG_LEVEL}; use super::{nanos, file_logger, LOG_LEVEL};
#[cfg(feature = "warnings")] #[cfg(feature = "warnings")]
@@ -346,33 +347,36 @@ impl InfluxWriter {


#[allow(unused_assignments)] #[allow(unused_assignments)]
pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self { pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self {
let logger = logger.new(o!(
"host" => host.to_string(),
"db" => db.to_string()));
let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel(); let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();

let buffer_size = INFLUX_WRITER_MAX_BUFFER;

#[cfg(feature = "no-influx-buffer")]
let buffer_size = 0usize;

debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);

let url = let url =
Url::parse_with_params(&format!("http://{}:8086/write", host), Url::parse_with_params(&format!("http://{}:8086/write", host),
&[("db", db), ("precision", "ns")]) &[("db", db), ("precision", "ns")])
.expect("influx writer url should parse"); .expect("influx writer url should parse");

let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
use std::collections::VecDeque; use std::collections::VecDeque;
use std::time::*; use std::time::*;
use crossbeam_channel as chan; use crossbeam_channel as chan;


const MAX_PENDING: Duration = Duration::from_secs(2);
#[cfg(feature = "no-influx-buffer")]
const N_BUFFER_LINES: usize = 0;

const N_BUFFER_LINES: usize = 8192;
const MAX_PENDING: Duration = Duration::from_secs(3);
const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32; const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32;
const MAX_BACKLOG: usize = 512; const MAX_BACKLOG: usize = 512;
const MAX_OUTSTANDING_HTTP: usize = 16;
const MAX_OUTSTANDING_HTTP: usize = 32;
const HB_EVERY: usize = 100_000;
const N_HTTP_ATTEMPTS: u32 = 5;


let client = Arc::new(Client::new()); let client = Arc::new(Client::new());


info!(logger, "initializing buffers";
info!(logger, "initializing InfluxWriter ...";
"N_BUFFER_LINES" => N_BUFFER_LINES,
"MAX_PENDING" => %format_args!("{:?}", MAX_PENDING),
"MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP,
"INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
"MAX_BACKLOG" => MAX_BACKLOG); "MAX_BACKLOG" => MAX_BACKLOG);


@@ -408,8 +412,10 @@ impl InfluxWriter {
let mut buf = spares.pop_front().unwrap(); let mut buf = spares.pop_front().unwrap();
let mut count = 0; let mut count = 0;
let mut extras = 0; // any new Strings we intro to the system let mut extras = 0; // any new Strings we intro to the system
let last = Instant::now();
let last_clear = Instant::now();
let mut n_rcvd = 0;
let mut last = Instant::now();
let mut active: bool;
let mut last_clear = Instant::now();
let mut loop_time = Instant::now(); let mut loop_time = Instant::now();


let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize { let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
@@ -425,12 +431,22 @@ impl InfluxWriter {
} }
let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
let tx = http_tx.clone(); let tx = http_tx.clone();
let thread_logger = logger.new(o!("n_outstanding" => n_outstanding));
let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding));
let client = Arc::clone(&client); let client = Arc::clone(&client);
debug!(logger, "launching http thread");
let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
let logger = thread_logger; let logger = thread_logger;
debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len());
let start = Instant::now(); let start = Instant::now();
'a: for n_req in 0..5u32 {
'a: for n_req in 0..N_HTTP_ATTEMPTS {
let throttle = Duration::from_secs(2) * n_req * n_req;
if n_req > 0 {
warn!(logger, "InfluxWriter http thread: pausing before next request";
"n_req" => n_req,
"throttle" => %format_args!("{:?}", throttle),
"elapsed" => %format_args!("{:?}", Instant::now() - start));
thread::sleep(throttle); // 0, 2, 8, 16, 32
}
let sent = Instant::now(); let sent = Instant::now();
let resp = client.post(url.clone()) let resp = client.post(url.clone())
.body(buf.as_str()) .body(buf.as_str())
@@ -478,19 +494,16 @@ impl InfluxWriter {
} }
} }


let throttle = Duration::from_secs(2) * n_req * n_req;
if n_req > 0 {
warn!(logger, "InfluxWriter http thread: pausing before next request";
"n_req" => n_req,
"throttle" => %format_args!("{:?}", throttle),
"took" => %format_args!("{:?}", took));
}
thread::sleep(throttle); // 0, 2, 8, 16, 32
} }
let took = Instant::now() - start; let took = Instant::now() - start;
warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer"; warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
"took" => %format_args!("{:?}", took)); "took" => %format_args!("{:?}", took));
tx.send(Err(Resp { buf, took })).unwrap(); // failure here is unrecoverable
let buflen = buf.len();
let n_lines = buf.lines().count();
if let Err(e) = tx.send(Err(Resp { buf, took })) {
crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e;
"err" => %e, "buf.len()" => buflen, "n_lines" => n_lines);
}
}); });


if let Err(e) = thread_res { if let Err(e) = thread_res {
@@ -500,12 +513,12 @@ impl InfluxWriter {


let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> { let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
match prev { match prev {
0 if buffer_size > 0 => {
0 if N_BUFFER_LINES > 0 => {
serialize_owned(m, buf); serialize_owned(m, buf);
Ok(1) Ok(1)
} }


n if n < buffer_size && loop_time - last < MAX_PENDING => {
n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => {
buf.push_str("\n"); buf.push_str("\n");
serialize_owned(m, buf); serialize_owned(m, buf);
Ok(n + 1) Ok(n + 1)
@@ -521,8 +534,22 @@ impl InfluxWriter {


'event: loop { 'event: loop {
loop_time = Instant::now(); loop_time = Instant::now();
active = false;
match rx.recv() { match rx.recv() {
Ok(Some(mut meas)) => { Ok(Some(mut meas)) => {
n_rcvd += 1;
active = true;

if n_rcvd % HB_EVERY == 0 {
let n_outstanding = n_out(&spares, &backlog, extras);
info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep();
"n_outstanding" => n_outstanding,
"spares.len()" => spares.len(),
"n_rcvd" => n_rcvd,
"n_active_buf" => count,
"db_health" => %format_args!("{:?}", db_health.mean),
"backlog.len()" => backlog.len());
}


if meas.timestamp.is_none() { meas.timestamp = Some(now()) } if meas.timestamp.is_none() { meas.timestamp = Some(now()) }


@@ -544,6 +571,7 @@ impl InfluxWriter {
crit!(logger, "no available buffers in `spares`, pulling from backlog"; crit!(logger, "no available buffers in `spares`, pulling from backlog";
"n_outstanding" => n_outstanding, "n_outstanding" => n_outstanding,
"spares.len()" => spares.len(), "spares.len()" => spares.len(),
"n_rcvd" => n_rcvd,
"backlog.len()" => backlog.len()); "backlog.len()" => backlog.len());
match backlog.pop_front() { match backlog.pop_front() {
// Note: this does not clear the backlog buffer, // Note: this does not clear the backlog buffer,
@@ -563,6 +591,7 @@ impl InfluxWriter {
"n_outstanding" => n_outstanding, "n_outstanding" => n_outstanding,
"spares.len()" => spares.len(), "spares.len()" => spares.len(),
"backlog.len()" => backlog.len(), "backlog.len()" => backlog.len(),
"n_rcvd" => n_rcvd,
"extras" => extras); "extras" => extras);
String::new() String::new()
} }
@@ -574,50 +603,63 @@ impl InfluxWriter {
mem::swap(&mut buf, &mut next); mem::swap(&mut buf, &mut next);
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
send(next, &mut backlog, n_outstanding); send(next, &mut backlog, n_outstanding);
last = loop_time;
count count
} }
}; };
} }


Ok(None) => { Ok(None) => {
let start = Instant::now();
let mut hb = Instant::now();
warn!(logger, "terminate signal rcvd"; "count" => count); warn!(logger, "terminate signal rcvd"; "count" => count);
if buf.len() > 0 { if buf.len() > 0 {
info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); info!(logger, "sending remaining buffer to influx on terminate"; "count" => count);
let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1));
let _ = next(buffer_size, &meas, &mut buf, loop_time, last);
let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last);
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
send(buf, &mut backlog, n_outstanding);
let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
mem::swap(&mut buf, &mut placeholder);
send(placeholder, &mut backlog, n_outstanding);
} }
let start = Instant::now();
let mut hb = start;
let mut n_ok = 0; let mut n_ok = 0;
let mut n_err = 0; let mut n_err = 0;
loop { loop {
let loop_time = Instant::now();
loop_time = Instant::now();
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
if backlog.is_empty() && n_outstanding < 1 { if backlog.is_empty() && n_outstanding < 1 {
info!(logger, "cleared any remaining backlog"; info!(logger, "cleared any remaining backlog";
"n_outstanding" => n_outstanding, "n_outstanding" => n_outstanding,
"spares.len()" => spares.len(),
"backlog.len()" => backlog.len(), "backlog.len()" => backlog.len(),
"n_cleared_ok" => n_ok, "n_cleared_ok" => n_ok,
"n_cleared_err" => n_err, "n_cleared_err" => n_err,
"n_rcvd" => n_rcvd,
"extras" => extras, "extras" => extras,
"elapsed" => %format_args!("{:?}", start - loop_time));
"elapsed" => %format_args!("{:?}", loop_time - start));
break 'event break 'event
} }
if loop_time - hb > Duration::from_secs(5) { if loop_time - hb > Duration::from_secs(5) {
info!(logger, "InfluxWriter still clearing backlog .."; info!(logger, "InfluxWriter still clearing backlog ..";
"n_outstanding" => n_outstanding, "n_outstanding" => n_outstanding,
"spares.len()" => spares.len(),
"backlog.len()" => backlog.len(), "backlog.len()" => backlog.len(),
"n_cleared_ok" => n_ok, "n_cleared_ok" => n_ok,
"n_cleared_err" => n_err, "n_cleared_err" => n_err,
"extras" => extras, "extras" => extras,
"elapsed" => %format_args!("{:?}", start - loop_time));
"n_rcvd" => n_rcvd,
"elapsed" => %format_args!("{:?}", loop_time - start));
hb = loop_time; hb = loop_time;
} }
if let Some(buf) = backlog.pop_front() { if let Some(buf) = backlog.pop_front() {
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
debug!(logger, "resending queued buffer from backlog";
"backlog.len()" => backlog.len(),
"spares.len()" => spares.len(),
"n_rcvd" => n_rcvd,
"n_outstanding" => n_outstanding);
send(buf, &mut backlog, n_outstanding); send(buf, &mut backlog, n_outstanding);
last_clear = loop_time;
} }


'rx: loop { 'rx: loop {
@@ -627,8 +669,9 @@ impl InfluxWriter {
spares.push_back(buf); // needed so `n_outstanding` count remains accurate spares.push_back(buf); // needed so `n_outstanding` count remains accurate
} }
Ok(Err(Resp { buf, .. })) => { Ok(Err(Resp { buf, .. })) => {
warn!(logger, "requeueing failed request"; "buf.len()" => buf.len());
n_err += 1; n_err += 1;
spares.push_back(buf); // needed so `n_outstanding` count remains accurate
backlog.push_front(buf);
} }
Err(chan::TryRecvError::Disconnected) => { Err(chan::TryRecvError::Disconnected) => {
crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting"; crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting";
@@ -637,7 +680,8 @@ impl InfluxWriter {
"n_cleared_ok" => n_ok, "n_cleared_ok" => n_ok,
"n_cleared_err" => n_err, "n_cleared_err" => n_err,
"extras" => extras, "extras" => extras,
"elapsed" => %format_args!("{:?}", start - loop_time));
"n_rcvd" => n_rcvd,
"elapsed" => %format_args!("{:?}", loop_time - start));
break 'event break 'event
} }
Err(_) => break 'rx Err(_) => break 'rx
@@ -647,51 +691,53 @@ impl InfluxWriter {
} }
} }


_ => {
let mut active = false;
db_health.refresh(loop_time);
let n_outstanding = n_out(&spares, &backlog, extras);
let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy {
if let Some(queued) = backlog.pop_front() {
let n_outstanding = n_out(&spares, &backlog, extras);
send(queued, &mut backlog, n_outstanding);
active = true;
}
}

loop {
match http_rx.try_recv() {
Ok(Ok(Resp { buf, took })) => {
db_health.add(loop_time, took);
spares.push_back(buf);
active = true;
}
_ => {}
}


Ok(Err(Resp { buf, took })) => {
db_health.add(loop_time, took);
backlog.push_front(buf);
active = true;
}
db_health.refresh(loop_time);
let n_outstanding = n_out(&spares, &backlog, extras);
let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy {
if let Some(queued) = backlog.pop_front() {
let n_outstanding = n_out(&spares, &backlog, extras);
send(queued, &mut backlog, n_outstanding);
active = true;
}
}


Err(chan::TryRecvError::Disconnected) => {
crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting";
"n_outstanding" => n_outstanding,
"backlog.len()" => backlog.len(),
"extras" => extras);
break 'event
}
loop {
match http_rx.try_recv() {
Ok(Ok(Resp { buf, took })) => {
db_health.add(loop_time, took);
spares.push_back(buf);
active = true;
}


Err(_) => break
}
Ok(Err(Resp { buf, took })) => {
db_health.add(loop_time, took);
backlog.push_front(buf);
active = true;
} }


if !active {
thread::sleep(Duration::new(0, 1))
Err(chan::TryRecvError::Disconnected) => {
crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting";
"n_outstanding" => n_outstanding,
"backlog.len()" => backlog.len(),
"n_rcvd" => n_rcvd,
"extras" => extras);
break 'event
} }

Err(_) => break
} }
} }

if !active {
thread::sleep(Duration::new(0, 1))
}
} }
info!(logger, "waiting 1s before exiting thread");
thread::sleep(Duration::from_secs(1));
}).unwrap(); }).unwrap();


InfluxWriter { InfluxWriter {


+ 5
- 20
src/lib.rs View File

@@ -3,28 +3,13 @@


#![feature(test)] #![feature(test)]


#[macro_use] extern crate slog;
#[macro_use]
extern crate slog;
#[allow(unused_imports)] #[allow(unused_imports)]
#[macro_use] extern crate money;

#[macro_use]
extern crate money;
#[cfg(test)]
extern crate test; extern crate test;
extern crate influent;
extern crate chrono;
extern crate hyper;
extern crate termion;
extern crate sloggers;
extern crate slog_term;
extern crate slog_async;
extern crate fnv;
extern crate ordermap;
extern crate decimal;
extern crate uuid;
extern crate hdrhistogram;
extern crate smallvec;
extern crate num;
extern crate dirs;
extern crate crossbeam_channel;
#[cfg(feature = "zmq")] #[cfg(feature = "zmq")]
extern crate zmq; extern crate zmq;
#[cfg(feature = "latency")] #[cfg(feature = "latency")]


Loading…
Cancel
Save