diff --git a/Cargo.toml b/Cargo.toml index a3fadd3..f3e75df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "influx-writer" -version = "0.7.0" +version = "0.8.0" authors = ["Jonathan Strong "] edition = "2018" @@ -8,6 +8,11 @@ edition = "2018" name = "influx_writer" path = "src/lib.rs" +[[example]] +name = "write" +path = "examples/write.rs" +required-features = ["signal-hook"] + [dependencies] chrono = { version = "0.4", features = ["serde"] } hyper = "0.10" @@ -18,6 +23,7 @@ slog-async = "2" smallvec = "0.6" crossbeam-channel = "0.3" pretty_toa = "1.0.0" +signal-hook = { version = "0.1.15", optional = true } decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } decimal-macros = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } diff --git a/examples/write.rs b/examples/write.rs new file mode 100644 index 0000000..56867da --- /dev/null +++ b/examples/write.rs @@ -0,0 +1,56 @@ +#[macro_use] +extern crate slog; + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::*; +use std::thread; +use slog::Drain; +use pretty_toa::ThousandsSep; +use chrono::prelude::*; +use influx_writer::{InfluxWriter, measure}; + +const DELAY: Duration = Duration::from_millis(1); +const N_PER: usize = 567; + +fn main() { + let start = Instant::now(); + let term = Arc::new(AtomicBool::new(false)); + signal_hook::flag::register(signal_hook::SIGINT, Arc::clone(&term)).unwrap(); + signal_hook::flag::register(signal_hook::SIGTERM, Arc::clone(&term)).unwrap(); + signal_hook::flag::register(signal_hook::SIGQUIT, Arc::clone(&term)).unwrap(); + + + let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); + let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); + let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); + let root = slog::Logger::root(drain, o!("version" => "0.1")); + + let logger = root.new(o!("thread" => "main")); + + let influx = InfluxWriter::with_logger("localhost", "test", &root); + + let mut n = 0; + + loop { + if term.load(Ordering::Relaxed) { + info!(logger, "exiting..."); + break + } + + let mut now = Utc::now().timestamp_nanos(); + for _ in 0..N_PER { + measure!(influx, example, i(n, 1), tm(now)); + now += 1; + n += 1; + } + + thread::sleep(DELAY); + } + drop(influx); + + let took = Instant::now() - start; + + info!(logger, "wrote {} measurements in {:?}", n.thousands_sep(), took); + +} diff --git a/src/lib.rs b/src/lib.rs index b6b730f..0618232 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -369,12 +369,12 @@ impl InfluxWriter { const N_BUFFER_LINES: usize = 1024; const MAX_PENDING: Duration = Duration::from_secs(3); const INITIAL_BUFFER_CAPACITY: usize = 4096; - const INITIAL_BACKLOG: usize = 128; const MAX_BACKLOG: usize = 1024; const MAX_OUTSTANDING_HTTP: usize = 64; const DEBUG_HB_EVERY: usize = 1024 * 96; const INFO_HB_EVERY: usize = 1024 * 1024; const N_HTTP_ATTEMPTS: u32 = 15; + const INITIAL_BACKLOG: usize = MAX_OUTSTANDING_HTTP * 2; let client = Arc::new(Client::new()); @@ -398,7 +398,7 @@ impl InfluxWriter { let mut backlog: VecDeque = VecDeque::with_capacity(INITIAL_BACKLOG); for _ in 0..INITIAL_BACKLOG { - spares.push_back(String::with_capacity(1024)); + spares.push_back(String::with_capacity(INITIAL_BUFFER_CAPACITY)); } struct Resp { @@ -420,9 +420,11 @@ impl InfluxWriter { let mut count = 0; let mut extras = 0; // any new Strings we intro to the system let mut n_rcvd = 0; + let mut in_flight_buffer_bytes = 0; let mut last = Instant::now(); let mut active: bool; let mut last_clear = Instant::now(); + let mut last_memory_check = Instant::now(); let mut loop_time = Instant::now(); let n_out = |s: &VecDeque, b: &VecDeque, extras: usize| -> usize { @@ -431,15 +433,22 @@ impl InfluxWriter { assert_eq!(n_out(&spares, &backlog, extras), 0); - let send = |mut buf: String, backlog: &mut VecDeque, n_outstanding: usize| { + let count_allocated_memory = |spares: &VecDeque, backlog: &VecDeque, in_flight_buffer_bytes: &usize| -> usize { + spares.iter().map(|x| x.capacity()).sum::() + + backlog.iter().map(|x| x.capacity()).sum::() + + (*in_flight_buffer_bytes) + }; + + let send = |mut buf: String, backlog: &mut VecDeque, n_outstanding: usize, in_flight_buffer_bytes: &mut usize| { if n_outstanding >= MAX_OUTSTANDING_HTTP { backlog.push_back(buf); return } let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url let tx = http_tx.clone(); - let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure + let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "in flight req at spawn time" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure let client = Arc::clone(&client); + *in_flight_buffer_bytes = *in_flight_buffer_bytes + buf.capacity(); debug!(logger, "launching http thread"); let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { let logger = thread_logger; @@ -550,6 +559,18 @@ impl InfluxWriter { 'event: loop { loop_time = Instant::now(); active = false; + + if loop_time - last_memory_check > Duration::from_secs(60) { + let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); + let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; + info!(logger, "allocated memory: {:.1}MB", allocated_mb; + "allocated bytes" => allocated_bytes, + "in flight buffer bytes" => in_flight_buffer_bytes, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len(), + ); + last_memory_check = loop_time; + } match rx.recv() { Ok(Some(mut meas)) => { n_rcvd += 1; @@ -557,21 +578,27 @@ impl InfluxWriter { if n_rcvd % INFO_HB_EVERY == 0 { let n_outstanding = n_out(&spares, &backlog, extras); + let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); + let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, "n_active_buf" => count, "db_health" => %format_args!("{:?}", db_health.mean), + "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), "backlog.len()" => backlog.len()); } else if n_rcvd % DEBUG_HB_EVERY == 0 { let n_outstanding = n_out(&spares, &backlog, extras); + let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); + let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, "n_active_buf" => count, "db_health" => %format_args!("{:?}", db_health.mean), + "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), "backlog.len()" => backlog.len()); } @@ -623,7 +650,13 @@ impl InfluxWriter { } } else { extras += 1; - info!(logger, "allocating new buffer: zero spares avail"; "n_outstanding" => n_outstanding, "extras" => extras); + let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY; + let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; + info!(logger, "allocating new buffer: zero spares avail"; + "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), + "n_outstanding" => n_outstanding, + "extras" => extras, + ); String::with_capacity(INITIAL_BUFFER_CAPACITY) } } @@ -632,7 +665,7 @@ impl InfluxWriter { // mem::swap(&mut buf, &mut next); let n_outstanding = n_out(&spares, &backlog, extras); - send(next, &mut backlog, n_outstanding); + send(next, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes); last = loop_time; count } @@ -650,7 +683,7 @@ impl InfluxWriter { let n_outstanding = n_out(&spares, &backlog, extras); let mut placeholder = spares.pop_front().unwrap_or_else(String::new); mem::swap(&mut buf, &mut placeholder); - send(placeholder, &mut backlog, n_outstanding); + send(placeholder, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes); } let mut n_ok = 0; let mut n_err = 0; @@ -688,7 +721,7 @@ impl InfluxWriter { "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, "n_outstanding" => n_outstanding); - send(buf, &mut backlog, n_outstanding); + send(buf, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes); last_clear = loop_time; } @@ -696,6 +729,7 @@ impl InfluxWriter { match http_rx.try_recv() { Ok(Ok(Resp { buf, .. })) => { n_ok += 1; + in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity()); if spares.len() <= INITIAL_BACKLOG { spares.push_back(buf); // needed so `n_outstanding` count remains accurate } else { @@ -705,6 +739,7 @@ impl InfluxWriter { Ok(Err(Resp { buf, .. })) => { warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); n_err += 1; + in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity()); backlog.push_front(buf); } Err(chan::TryRecvError::Disconnected) => { @@ -734,7 +769,7 @@ impl InfluxWriter { if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { if let Some(queued) = backlog.pop_front() { let n_outstanding = n_out(&spares, &backlog, extras); - send(queued, &mut backlog, n_outstanding); + send(queued, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes); active = true; } } @@ -743,13 +778,17 @@ impl InfluxWriter { match http_rx.try_recv() { Ok(Ok(Resp { buf, took })) => { db_health.add(loop_time, took); + let in_flight_before = in_flight_buffer_bytes.clone(); + in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity()); if spares.len() <= INITIAL_BACKLOG { spares.push_back(buf); } else { extras = extras.saturating_sub(1); - info!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size"; + debug!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size"; "spares.len()" => spares.len(), "extras" => extras, + "in flight before" => in_flight_before, + "in in_flight_buffer_bytes" => in_flight_buffer_bytes, ); } @@ -759,6 +798,7 @@ impl InfluxWriter { Ok(Err(Resp { buf, took })) => { db_health.add(loop_time, took); + in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity()); backlog.push_front(buf); active = true; }