|
@@ -461,6 +461,7 @@ impl InfluxWriter { |
|
|
let mut count = 0; |
|
|
let mut count = 0; |
|
|
let mut extras = 0; // any new Strings we intro to the system |
|
|
let mut extras = 0; // any new Strings we intro to the system |
|
|
let mut n_rcvd = 0; |
|
|
let mut n_rcvd = 0; |
|
|
|
|
|
let mut n_pts: u64 = 0; |
|
|
let mut in_flight_buffer_bytes = 0; |
|
|
let mut in_flight_buffer_bytes = 0; |
|
|
let mut last = Instant::now(); |
|
|
let mut last = Instant::now(); |
|
|
let mut active: bool; |
|
|
let mut active: bool; |
|
@@ -615,6 +616,7 @@ impl InfluxWriter { |
|
|
match rx.recv() { |
|
|
match rx.recv() { |
|
|
Ok(Some(mut meas)) => { |
|
|
Ok(Some(mut meas)) => { |
|
|
n_rcvd += 1; |
|
|
n_rcvd += 1; |
|
|
|
|
|
n_pts += meas.fields.len() as u64; |
|
|
active = true; |
|
|
active = true; |
|
|
|
|
|
|
|
|
if n_rcvd % INFO_HB_EVERY == 0 { |
|
|
if n_rcvd % INFO_HB_EVERY == 0 { |
|
@@ -622,6 +624,7 @@ impl InfluxWriter { |
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); |
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); |
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
"n_outstanding" => n_outstanding, |
|
|
"n_outstanding" => n_outstanding, |
|
|
"spares.len()" => spares.len(), |
|
|
"spares.len()" => spares.len(), |
|
|
"n_rcvd" => n_rcvd, |
|
|
"n_rcvd" => n_rcvd, |
|
@@ -662,10 +665,11 @@ impl InfluxWriter { |
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
if n_outstanding > MAX_BACKLOG { |
|
|
if n_outstanding > MAX_BACKLOG { |
|
|
warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog"; |
|
|
warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog"; |
|
|
"n_outstanding" => n_outstanding, |
|
|
|
|
|
"spares.len()" => spares.len(), |
|
|
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
|
|
"backlog.len()" => backlog.len()); |
|
|
|
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
|
|
"spares.len()" => spares.len(), |
|
|
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
|
|
"backlog.len()" => backlog.len()); |
|
|
match backlog.pop_front() { |
|
|
match backlog.pop_front() { |
|
|
// Note: this does not clear the backlog buffer, |
|
|
// Note: this does not clear the backlog buffer, |
|
|
// instead we will just write more and more until |
|
|
// instead we will just write more and more until |
|
@@ -681,6 +685,7 @@ impl InfluxWriter { |
|
|
None => { |
|
|
None => { |
|
|
extras += 1; |
|
|
extras += 1; |
|
|
crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; |
|
|
crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; |
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
"n_outstanding" => n_outstanding, |
|
|
"n_outstanding" => n_outstanding, |
|
|
"spares.len()" => spares.len(), |
|
|
"spares.len()" => spares.len(), |
|
|
"backlog.len()" => backlog.len(), |
|
|
"backlog.len()" => backlog.len(), |
|
@@ -694,6 +699,7 @@ impl InfluxWriter { |
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY; |
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY; |
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
info!(logger, "InfluxWriter: allocating new buffer: zero spares avail"; |
|
|
info!(logger, "InfluxWriter: allocating new buffer: zero spares avail"; |
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
"allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), |
|
|
"allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), |
|
|
"n_outstanding" => n_outstanding, |
|
|
"n_outstanding" => n_outstanding, |
|
|
"extras" => extras, |
|
|
"extras" => extras, |
|
@@ -733,6 +739,7 @@ impl InfluxWriter { |
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
if backlog.is_empty() && n_outstanding < 1 { |
|
|
if backlog.is_empty() && n_outstanding < 1 { |
|
|
info!(logger, "InfluxWriter: cleared any remaining backlog"; |
|
|
info!(logger, "InfluxWriter: cleared any remaining backlog"; |
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
"n_outstanding" => n_outstanding, |
|
|
"n_outstanding" => n_outstanding, |
|
|
"spares.len()" => spares.len(), |
|
|
"spares.len()" => spares.len(), |
|
|
"backlog.len()" => backlog.len(), |
|
|
"backlog.len()" => backlog.len(), |
|
@@ -746,6 +753,7 @@ impl InfluxWriter { |
|
|
|
|
|
|
|
|
if loop_time.saturating_duration_since(start) > DROP_DEADLINE { |
|
|
if loop_time.saturating_duration_since(start) > DROP_DEADLINE { |
|
|
crit!(logger, "drop deadline exceeded! commencing dirty exit :( "; |
|
|
crit!(logger, "drop deadline exceeded! commencing dirty exit :( "; |
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
"elapsed" => ?(loop_time.saturating_duration_since(start)), |
|
|
"elapsed" => ?(loop_time.saturating_duration_since(start)), |
|
|
"n outstanding" => n_outstanding, |
|
|
"n outstanding" => n_outstanding, |
|
|
"backlog.len()" => backlog.len(), |
|
|
"backlog.len()" => backlog.len(), |
|
@@ -755,14 +763,15 @@ impl InfluxWriter { |
|
|
|
|
|
|
|
|
if loop_time - hb > Duration::from_secs(5) { |
|
|
if loop_time - hb > Duration::from_secs(5) { |
|
|
info!(logger, "InfluxWriter still clearing backlog .."; |
|
|
info!(logger, "InfluxWriter still clearing backlog .."; |
|
|
"n_outstanding" => n_outstanding, |
|
|
|
|
|
"spares.len()" => spares.len(), |
|
|
|
|
|
"backlog.len()" => backlog.len(), |
|
|
|
|
|
"n_cleared_ok" => n_ok, |
|
|
|
|
|
"n_cleared_err" => n_err, |
|
|
|
|
|
"extras" => extras, |
|
|
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
|
|
"elapsed" => %format_args!("{:?}", loop_time - start)); |
|
|
|
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
|
|
"spares.len()" => spares.len(), |
|
|
|
|
|
"backlog.len()" => backlog.len(), |
|
|
|
|
|
"n_cleared_ok" => n_ok, |
|
|
|
|
|
"n_cleared_err" => n_err, |
|
|
|
|
|
"extras" => extras, |
|
|
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
|
|
"elapsed" => %format_args!("{:?}", loop_time - start)); |
|
|
hb = loop_time; |
|
|
hb = loop_time; |
|
|
} |
|
|
} |
|
|
if let Some(buf) = backlog.pop_front() { |
|
|
if let Some(buf) = backlog.pop_front() { |
|
@@ -794,6 +803,7 @@ impl InfluxWriter { |
|
|
} |
|
|
} |
|
|
Err(chan::TryRecvError::Disconnected) => { |
|
|
Err(chan::TryRecvError::Disconnected) => { |
|
|
crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting"; |
|
|
crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting"; |
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
"n_outstanding" => n_outstanding, |
|
|
"n_outstanding" => n_outstanding, |
|
|
"backlog.len()" => backlog.len(), |
|
|
"backlog.len()" => backlog.len(), |
|
|
"n_cleared_ok" => n_ok, |
|
|
"n_cleared_ok" => n_ok, |
|
@@ -859,6 +869,7 @@ impl InfluxWriter { |
|
|
|
|
|
|
|
|
Err(chan::TryRecvError::Disconnected) => { |
|
|
Err(chan::TryRecvError::Disconnected) => { |
|
|
crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting"; |
|
|
crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting"; |
|
|
|
|
|
"total pts written" => n_pts.thousands_sep(), |
|
|
"n_outstanding" => n_outstanding, |
|
|
"n_outstanding" => n_outstanding, |
|
|
"backlog.len()" => backlog.len(), |
|
|
"backlog.len()" => backlog.len(), |
|
|
"n_rcvd" => n_rcvd, |
|
|
"n_rcvd" => n_rcvd, |
|
|