diff --git a/Cargo.toml b/Cargo.toml index 1a1be43..a3fadd3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "influx-writer" -version = "0.6.1" +version = "0.7.0" authors = ["Jonathan Strong "] edition = "2018" diff --git a/src/lib.rs b/src/lib.rs index 27bf0c8..b6b730f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -369,31 +369,35 @@ impl InfluxWriter { const N_BUFFER_LINES: usize = 1024; const MAX_PENDING: Duration = Duration::from_secs(3); const INITIAL_BUFFER_CAPACITY: usize = 4096; - const MAX_BACKLOG: usize = 128; + const INITIAL_BACKLOG: usize = 128; + const MAX_BACKLOG: usize = 1024; const MAX_OUTSTANDING_HTTP: usize = 64; - const HB_EVERY: usize = 100_000; + const DEBUG_HB_EVERY: usize = 1024 * 96; + const INFO_HB_EVERY: usize = 1024 * 1024; const N_HTTP_ATTEMPTS: u32 = 15; let client = Arc::new(Client::new()); info!(logger, "initializing InfluxWriter ..."; - "N_BUFFER_LINES" => N_BUFFER_LINES, - "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING), - "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP, - "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, - "MAX_BACKLOG" => MAX_BACKLOG); + "N_BUFFER_LINES" => N_BUFFER_LINES, + "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING), + "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP, + "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, + "INITIAL_BACKLOG" => INITIAL_BACKLOG, + "MAX_BACKLOG" => MAX_BACKLOG, + ); // pre-allocated buffers ready for use if the active one is stasheed // during an outage - let mut spares: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); + let mut spares: VecDeque = VecDeque::with_capacity(INITIAL_BACKLOG); // queue failed sends here until problem resolved, then send again. in worst // case scenario, loop back around on buffers queued in `backlog`, writing // over the oldest first. // - let mut backlog: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); + let mut backlog: VecDeque = VecDeque::with_capacity(INITIAL_BACKLOG); - for _ in 0..MAX_BACKLOG { + for _ in 0..INITIAL_BACKLOG { spares.push_back(String::with_capacity(1024)); } @@ -422,7 +426,7 @@ impl InfluxWriter { let mut loop_time = Instant::now(); let n_out = |s: &VecDeque, b: &VecDeque, extras: usize| -> usize { - MAX_BACKLOG + extras - s.len() - b.len() - 1 + INITIAL_BACKLOG + extras - s.len() - b.len() - 1 }; assert_eq!(n_out(&spares, &backlog, extras), 0); @@ -551,7 +555,7 @@ impl InfluxWriter { n_rcvd += 1; active = true; - if n_rcvd % HB_EVERY == 0 { + if n_rcvd % INFO_HB_EVERY == 0 { let n_outstanding = n_out(&spares, &backlog, extras); info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); "n_outstanding" => n_outstanding, @@ -560,6 +564,15 @@ impl InfluxWriter { "n_active_buf" => count, "db_health" => %format_args!("{:?}", db_health.mean), "backlog.len()" => backlog.len()); + } else if n_rcvd % DEBUG_HB_EVERY == 0 { + let n_outstanding = n_out(&spares, &backlog, extras); + debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "n_active_buf" => count, + "db_health" => %format_args!("{:?}", db_health.mean), + "backlog.len()" => backlog.len()); } if meas.timestamp.is_none() { meas.timestamp = Some(now()) } @@ -579,33 +592,39 @@ impl InfluxWriter { None => { let n_outstanding = n_out(&spares, &backlog, extras); - crit!(logger, "no available buffers in `spares`, pulling from backlog"; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "n_rcvd" => n_rcvd, - "backlog.len()" => backlog.len()); - match backlog.pop_front() { - // Note: this does not clear the backlog buffer, - // instead we will just write more and more until - // we are out of memory. I expect that will never - // happen. - // - Some(x) => { - count = 1; // otherwise, no '\n' added in `next(..)` - we are - // sending a "full" buffer to be extended - x - } - - None => { - extras += 1; - crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "backlog.len()" => backlog.len(), - "n_rcvd" => n_rcvd, - "extras" => extras); - String::new() + if n_outstanding > MAX_BACKLOG { + warn!(logger, "no available buffers in `spares`, pulling from backlog"; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "backlog.len()" => backlog.len()); + match backlog.pop_front() { + // Note: this does not clear the backlog buffer, + // instead we will just write more and more until + // we are out of memory. I expect that will never + // happen. + // + Some(x) => { + count = 1; // otherwise, no '\n' added in `next(..)` - we are + // sending a "full" buffer to be extended + x + } + + None => { + extras += 1; + crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len(), + "n_rcvd" => n_rcvd, + "extras" => extras); + String::new() + } } + } else { + extras += 1; + info!(logger, "allocating new buffer: zero spares avail"; "n_outstanding" => n_outstanding, "extras" => extras); + String::with_capacity(INITIAL_BUFFER_CAPACITY) } } }; @@ -677,7 +696,11 @@ impl InfluxWriter { match http_rx.try_recv() { Ok(Ok(Resp { buf, .. })) => { n_ok += 1; - spares.push_back(buf); // needed so `n_outstanding` count remains accurate + if spares.len() <= INITIAL_BACKLOG { + spares.push_back(buf); // needed so `n_outstanding` count remains accurate + } else { + extras = extras.saturating_sub(1); + } } Ok(Err(Resp { buf, .. })) => { warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); @@ -720,7 +743,17 @@ impl InfluxWriter { match http_rx.try_recv() { Ok(Ok(Resp { buf, took })) => { db_health.add(loop_time, took); - spares.push_back(buf); + if spares.len() <= INITIAL_BACKLOG { + spares.push_back(buf); + } else { + extras = extras.saturating_sub(1); + info!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size"; + "spares.len()" => spares.len(), + "extras" => extras, + ); + } + + //spares.push_back(buf); active = true; }