From dae7d8067d2bbb710fb4ef4424f61750e1a970d2 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Tue, 8 Jun 2021 20:10:29 -0400 Subject: [PATCH] log total pts rcvd (sum of number of fields in each measurement) --- src/lib.rs | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 56914c1..12e0c64 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -461,6 +461,7 @@ impl InfluxWriter { let mut count = 0; let mut extras = 0; // any new Strings we intro to the system let mut n_rcvd = 0; + let mut n_pts: u64 = 0; let mut in_flight_buffer_bytes = 0; let mut last = Instant::now(); let mut active: bool; @@ -615,6 +616,7 @@ impl InfluxWriter { match rx.recv() { Ok(Some(mut meas)) => { n_rcvd += 1; + n_pts += meas.fields.len() as u64; active = true; if n_rcvd % INFO_HB_EVERY == 0 { @@ -622,6 +624,7 @@ impl InfluxWriter { let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep(); + "total pts written" => n_pts.thousands_sep(), "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, @@ -662,10 +665,11 @@ impl InfluxWriter { let n_outstanding = n_out(&spares, &backlog, extras); if n_outstanding > MAX_BACKLOG { warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog"; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "n_rcvd" => n_rcvd, - "backlog.len()" => backlog.len()); + "total pts written" => n_pts.thousands_sep(), + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "backlog.len()" => backlog.len()); match backlog.pop_front() { // Note: this does not clear the backlog buffer, // instead we will just write more and more until @@ -681,6 +685,7 @@ impl InfluxWriter { None => { extras += 1; crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; + "total pts written" => n_pts.thousands_sep(), "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "backlog.len()" => backlog.len(), @@ -694,6 +699,7 @@ impl InfluxWriter { let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY; let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; info!(logger, "InfluxWriter: allocating new buffer: zero spares avail"; + "total pts written" => n_pts.thousands_sep(), "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), "n_outstanding" => n_outstanding, "extras" => extras, @@ -733,6 +739,7 @@ impl InfluxWriter { let n_outstanding = n_out(&spares, &backlog, extras); if backlog.is_empty() && n_outstanding < 1 { info!(logger, "InfluxWriter: cleared any remaining backlog"; + "total pts written" => n_pts.thousands_sep(), "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "backlog.len()" => backlog.len(), @@ -746,6 +753,7 @@ impl InfluxWriter { if loop_time.saturating_duration_since(start) > DROP_DEADLINE { crit!(logger, "drop deadline exceeded! commencing dirty exit :( "; + "total pts written" => n_pts.thousands_sep(), "elapsed" => ?(loop_time.saturating_duration_since(start)), "n outstanding" => n_outstanding, "backlog.len()" => backlog.len(), @@ -755,14 +763,15 @@ impl InfluxWriter { if loop_time - hb > Duration::from_secs(5) { info!(logger, "InfluxWriter still clearing backlog .."; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "backlog.len()" => backlog.len(), - "n_cleared_ok" => n_ok, - "n_cleared_err" => n_err, - "extras" => extras, - "n_rcvd" => n_rcvd, - "elapsed" => %format_args!("{:?}", loop_time - start)); + "total pts written" => n_pts.thousands_sep(), + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len(), + "n_cleared_ok" => n_ok, + "n_cleared_err" => n_err, + "extras" => extras, + "n_rcvd" => n_rcvd, + "elapsed" => %format_args!("{:?}", loop_time - start)); hb = loop_time; } if let Some(buf) = backlog.pop_front() { @@ -794,6 +803,7 @@ impl InfluxWriter { } Err(chan::TryRecvError::Disconnected) => { crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting"; + "total pts written" => n_pts.thousands_sep(), "n_outstanding" => n_outstanding, "backlog.len()" => backlog.len(), "n_cleared_ok" => n_ok, @@ -859,6 +869,7 @@ impl InfluxWriter { Err(chan::TryRecvError::Disconnected) => { crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting"; + "total pts written" => n_pts.thousands_sep(), "n_outstanding" => n_outstanding, "backlog.len()" => backlog.len(), "n_rcvd" => n_rcvd,