diff --git a/Cargo.toml b/Cargo.toml index 3e67568..fada33b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "logging" -version = "0.4.7" +version = "0.5.0" authors = ["Jonathan Strong "] +edition = "2018" [[example]] name = "zmq-logger" @@ -29,6 +30,7 @@ smallvec = "0.6" num = "0.1" dirs = "1" crossbeam-channel = "0.3" +pretty_toa = "1.0.0" sloggers = { path = "../sloggers" } @@ -41,8 +43,8 @@ pubsub = { path = "../pubsub", optional = true } [features] default = ["inlines"] no-thrash = [] -trace = [] -debug = [] +trace = ["slog/release_max_level_trace", "slog/max_level_trace"] +debug = ["slog/release_max_level_debug", "slog/max_level_debug"] test = [] localhost = [] harrison = [] diff --git a/examples/hist-interval.rs b/examples/hist-interval.rs index 10428cd..dbb3880 100644 --- a/examples/hist-interval.rs +++ b/examples/hist-interval.rs @@ -1,4 +1,3 @@ -#![feature(duration_from_micros)] #![allow(unused)] extern crate logging; diff --git a/examples/precipice.rs b/examples/precipice.rs new file mode 100644 index 0000000..1c18b76 --- /dev/null +++ b/examples/precipice.rs @@ -0,0 +1,62 @@ +#![allow(unused_imports)] + +#[macro_use] +extern crate slog; +#[macro_use] +extern crate logging; + +use std::io::{self, prelude::*}; +use std::thread; +use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; +use std::time::*; +use chrono::Utc; +use slog::Drain; +use pretty_toa::ThousandsSep; +use logging::influx::InfluxWriter; + +const INTERVAL: Duration = Duration::from_micros(1); //from_millis(1); +const HB_EVERY: usize = 1_000_000; + +fn main() { + let to_file = logging::truncating_file_logger("var/log/precipice.log", sloggers::types::Severity::Debug); + let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); + let drain = slog_term::CompactFormat::new(decorator).use_utc_timestamp().build().fuse(); + let drain = slog_async::Async::new(drain).chan_size(8192).thread_name("recv".into()).build().fuse(); + let drain = slog::Duplicate::new(drain, to_file).fuse(); + let root = slog::Logger::root(drain, o!()); + let logger = root.new(o!("thread" => "main")); + info!(logger, "initializing..."); + let influx = InfluxWriter::with_logger("localhost", "precipice", 1024, root.new(o!("thread" => "InfluxWriter"))); + let stop = Arc::new(AtomicBool::new(false)); + let thread = { + let stop = Arc::clone(&stop); + let logger = root.new(o!("thread" => "blaster")); + let influx = influx.clone(); + thread::spawn(move || { + let mut i = 0; + let mut sum = 0; + while !stop.load(Ordering::Relaxed) { + measure!(influx, xs, i(i), tm(logging::inanos(Utc::now()))); + sum += i; + i += 1; + if i % HB_EVERY == 0 { + info!(logger, "sent {} measurements", i.thousands_sep()); + } + thread::sleep(INTERVAL); + } + info!(logger, "exiting"; "n_sent" => i, "sum" => sum); + }) + }; + + let mut keys = String::new(); + loop { + if let Ok(_) = io::stdin().read_line(&mut keys) { + break + } + thread::sleep(Duration::from_millis(1)); + } + stop.store(true, Ordering::Relaxed); + let _ = thread.join(); +} + + diff --git a/src/influx.rs b/src/influx.rs index a6860a7..4c1f300 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -4,7 +4,7 @@ use std::io::Read; use std::sync::Arc; use std::sync::mpsc::{Sender, Receiver, channel, SendError}; -use std::{thread, fs, mem}; +use std::{thread, mem}; use std::time::*; use std::hash::BuildHasherDefault; use std::collections::VecDeque; @@ -24,6 +24,7 @@ use decimal::d128; use uuid::Uuid; use smallvec::SmallVec; use slog::Logger; +use pretty_toa::ThousandsSep; use super::{nanos, file_logger, LOG_LEVEL}; #[cfg(feature = "warnings")] @@ -346,33 +347,36 @@ impl InfluxWriter { #[allow(unused_assignments)] pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self { + let logger = logger.new(o!( + "host" => host.to_string(), + "db" => db.to_string())); let (tx, rx): (Sender>, Receiver>) = channel(); - - let buffer_size = INFLUX_WRITER_MAX_BUFFER; - - #[cfg(feature = "no-influx-buffer")] - let buffer_size = 0usize; - - debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size); - let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]) .expect("influx writer url should parse"); - let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { use std::collections::VecDeque; use std::time::*; use crossbeam_channel as chan; - const MAX_PENDING: Duration = Duration::from_secs(2); + #[cfg(feature = "no-influx-buffer")] + const N_BUFFER_LINES: usize = 0; + + const N_BUFFER_LINES: usize = 8192; + const MAX_PENDING: Duration = Duration::from_secs(3); const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32; const MAX_BACKLOG: usize = 512; - const MAX_OUTSTANDING_HTTP: usize = 16; + const MAX_OUTSTANDING_HTTP: usize = 32; + const HB_EVERY: usize = 100_000; + const N_HTTP_ATTEMPTS: u32 = 5; let client = Arc::new(Client::new()); - info!(logger, "initializing buffers"; + info!(logger, "initializing InfluxWriter ..."; + "N_BUFFER_LINES" => N_BUFFER_LINES, + "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING), + "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP, "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, "MAX_BACKLOG" => MAX_BACKLOG); @@ -408,8 +412,10 @@ impl InfluxWriter { let mut buf = spares.pop_front().unwrap(); let mut count = 0; let mut extras = 0; // any new Strings we intro to the system - let last = Instant::now(); - let last_clear = Instant::now(); + let mut n_rcvd = 0; + let mut last = Instant::now(); + let mut active: bool; + let mut last_clear = Instant::now(); let mut loop_time = Instant::now(); let n_out = |s: &VecDeque, b: &VecDeque, extras: usize| -> usize { @@ -425,12 +431,22 @@ impl InfluxWriter { } let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url let tx = http_tx.clone(); - let thread_logger = logger.new(o!("n_outstanding" => n_outstanding)); + let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding)); let client = Arc::clone(&client); + debug!(logger, "launching http thread"); let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { let logger = thread_logger; + debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len()); let start = Instant::now(); - 'a: for n_req in 0..5u32 { + 'a: for n_req in 0..N_HTTP_ATTEMPTS { + let throttle = Duration::from_secs(2) * n_req * n_req; + if n_req > 0 { + warn!(logger, "InfluxWriter http thread: pausing before next request"; + "n_req" => n_req, + "throttle" => %format_args!("{:?}", throttle), + "elapsed" => %format_args!("{:?}", Instant::now() - start)); + thread::sleep(throttle); // 0, 2, 8, 16, 32 + } let sent = Instant::now(); let resp = client.post(url.clone()) .body(buf.as_str()) @@ -478,19 +494,16 @@ impl InfluxWriter { } } - let throttle = Duration::from_secs(2) * n_req * n_req; - if n_req > 0 { - warn!(logger, "InfluxWriter http thread: pausing before next request"; - "n_req" => n_req, - "throttle" => %format_args!("{:?}", throttle), - "took" => %format_args!("{:?}", took)); - } - thread::sleep(throttle); // 0, 2, 8, 16, 32 } let took = Instant::now() - start; warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer"; "took" => %format_args!("{:?}", took)); - tx.send(Err(Resp { buf, took })).unwrap(); // failure here is unrecoverable + let buflen = buf.len(); + let n_lines = buf.lines().count(); + if let Err(e) = tx.send(Err(Resp { buf, took })) { + crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e; + "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines); + } }); if let Err(e) = thread_res { @@ -500,12 +513,12 @@ impl InfluxWriter { let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result { match prev { - 0 if buffer_size > 0 => { + 0 if N_BUFFER_LINES > 0 => { serialize_owned(m, buf); Ok(1) } - n if n < buffer_size && loop_time - last < MAX_PENDING => { + n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => { buf.push_str("\n"); serialize_owned(m, buf); Ok(n + 1) @@ -521,8 +534,22 @@ impl InfluxWriter { 'event: loop { loop_time = Instant::now(); + active = false; match rx.recv() { Ok(Some(mut meas)) => { + n_rcvd += 1; + active = true; + + if n_rcvd % HB_EVERY == 0 { + let n_outstanding = n_out(&spares, &backlog, extras); + info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "n_active_buf" => count, + "db_health" => %format_args!("{:?}", db_health.mean), + "backlog.len()" => backlog.len()); + } if meas.timestamp.is_none() { meas.timestamp = Some(now()) } @@ -544,6 +571,7 @@ impl InfluxWriter { crit!(logger, "no available buffers in `spares`, pulling from backlog"; "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, "backlog.len()" => backlog.len()); match backlog.pop_front() { // Note: this does not clear the backlog buffer, @@ -563,6 +591,7 @@ impl InfluxWriter { "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "backlog.len()" => backlog.len(), + "n_rcvd" => n_rcvd, "extras" => extras); String::new() } @@ -574,50 +603,63 @@ impl InfluxWriter { mem::swap(&mut buf, &mut next); let n_outstanding = n_out(&spares, &backlog, extras); send(next, &mut backlog, n_outstanding); + last = loop_time; count } }; } Ok(None) => { + let start = Instant::now(); + let mut hb = Instant::now(); warn!(logger, "terminate signal rcvd"; "count" => count); if buf.len() > 0 { info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); - let _ = next(buffer_size, &meas, &mut buf, loop_time, last); + let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last); let n_outstanding = n_out(&spares, &backlog, extras); - send(buf, &mut backlog, n_outstanding); + let mut placeholder = spares.pop_front().unwrap_or_else(String::new); + mem::swap(&mut buf, &mut placeholder); + send(placeholder, &mut backlog, n_outstanding); } - let start = Instant::now(); - let mut hb = start; let mut n_ok = 0; let mut n_err = 0; loop { - let loop_time = Instant::now(); + loop_time = Instant::now(); let n_outstanding = n_out(&spares, &backlog, extras); if backlog.is_empty() && n_outstanding < 1 { info!(logger, "cleared any remaining backlog"; "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), "backlog.len()" => backlog.len(), "n_cleared_ok" => n_ok, "n_cleared_err" => n_err, + "n_rcvd" => n_rcvd, "extras" => extras, - "elapsed" => %format_args!("{:?}", start - loop_time)); + "elapsed" => %format_args!("{:?}", loop_time - start)); break 'event } if loop_time - hb > Duration::from_secs(5) { info!(logger, "InfluxWriter still clearing backlog .."; "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), "backlog.len()" => backlog.len(), "n_cleared_ok" => n_ok, "n_cleared_err" => n_err, "extras" => extras, - "elapsed" => %format_args!("{:?}", start - loop_time)); + "n_rcvd" => n_rcvd, + "elapsed" => %format_args!("{:?}", loop_time - start)); hb = loop_time; } if let Some(buf) = backlog.pop_front() { let n_outstanding = n_out(&spares, &backlog, extras); + debug!(logger, "resending queued buffer from backlog"; + "backlog.len()" => backlog.len(), + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "n_outstanding" => n_outstanding); send(buf, &mut backlog, n_outstanding); + last_clear = loop_time; } 'rx: loop { @@ -627,8 +669,9 @@ impl InfluxWriter { spares.push_back(buf); // needed so `n_outstanding` count remains accurate } Ok(Err(Resp { buf, .. })) => { + warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); n_err += 1; - spares.push_back(buf); // needed so `n_outstanding` count remains accurate + backlog.push_front(buf); } Err(chan::TryRecvError::Disconnected) => { crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting"; @@ -637,7 +680,8 @@ impl InfluxWriter { "n_cleared_ok" => n_ok, "n_cleared_err" => n_err, "extras" => extras, - "elapsed" => %format_args!("{:?}", start - loop_time)); + "n_rcvd" => n_rcvd, + "elapsed" => %format_args!("{:?}", loop_time - start)); break 'event } Err(_) => break 'rx @@ -647,51 +691,53 @@ impl InfluxWriter { } } - _ => { - let mut active = false; - db_health.refresh(loop_time); - let n_outstanding = n_out(&spares, &backlog, extras); - let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200); - if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { - if let Some(queued) = backlog.pop_front() { - let n_outstanding = n_out(&spares, &backlog, extras); - send(queued, &mut backlog, n_outstanding); - active = true; - } - } - - loop { - match http_rx.try_recv() { - Ok(Ok(Resp { buf, took })) => { - db_health.add(loop_time, took); - spares.push_back(buf); - active = true; - } + _ => {} + } - Ok(Err(Resp { buf, took })) => { - db_health.add(loop_time, took); - backlog.push_front(buf); - active = true; - } + db_health.refresh(loop_time); + let n_outstanding = n_out(&spares, &backlog, extras); + let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200); + if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { + if let Some(queued) = backlog.pop_front() { + let n_outstanding = n_out(&spares, &backlog, extras); + send(queued, &mut backlog, n_outstanding); + active = true; + } + } - Err(chan::TryRecvError::Disconnected) => { - crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting"; - "n_outstanding" => n_outstanding, - "backlog.len()" => backlog.len(), - "extras" => extras); - break 'event - } + loop { + match http_rx.try_recv() { + Ok(Ok(Resp { buf, took })) => { + db_health.add(loop_time, took); + spares.push_back(buf); + active = true; + } - Err(_) => break - } + Ok(Err(Resp { buf, took })) => { + db_health.add(loop_time, took); + backlog.push_front(buf); + active = true; } - if !active { - thread::sleep(Duration::new(0, 1)) + Err(chan::TryRecvError::Disconnected) => { + crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting"; + "n_outstanding" => n_outstanding, + "backlog.len()" => backlog.len(), + "n_rcvd" => n_rcvd, + "extras" => extras); + break 'event } + + Err(_) => break } } + + if !active { + thread::sleep(Duration::new(0, 1)) + } } + info!(logger, "waiting 1s before exiting thread"); + thread::sleep(Duration::from_secs(1)); }).unwrap(); InfluxWriter { diff --git a/src/lib.rs b/src/lib.rs index da7bfda..d91afe8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,28 +3,13 @@ #![feature(test)] -#[macro_use] extern crate slog; - +#[macro_use] +extern crate slog; #[allow(unused_imports)] -#[macro_use] extern crate money; - +#[macro_use] +extern crate money; +#[cfg(test)] extern crate test; -extern crate influent; -extern crate chrono; -extern crate hyper; -extern crate termion; -extern crate sloggers; -extern crate slog_term; -extern crate slog_async; -extern crate fnv; -extern crate ordermap; -extern crate decimal; -extern crate uuid; -extern crate hdrhistogram; -extern crate smallvec; -extern crate num; -extern crate dirs; -extern crate crossbeam_channel; #[cfg(feature = "zmq")] extern crate zmq; #[cfg(feature = "latency")]