|
|
@@ -586,7 +586,7 @@ impl InfluxWriter { |
|
|
|
if loop_time - last_memory_check > Duration::from_secs(300) { |
|
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); |
|
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
|
info!(logger, "allocated memory: {:.1}MB", allocated_mb; |
|
|
|
info!(logger, "InfluxWriter: allocated memory: {:.1}MB", allocated_mb; |
|
|
|
"allocated bytes" => allocated_bytes, |
|
|
|
"in flight buffer bytes" => in_flight_buffer_bytes, |
|
|
|
"spares.len()" => spares.len(), |
|
|
@@ -603,7 +603,7 @@ impl InfluxWriter { |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); |
|
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
|
info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
|
info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
@@ -615,7 +615,7 @@ impl InfluxWriter { |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); |
|
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
|
debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
|
debug!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
@@ -643,7 +643,7 @@ impl InfluxWriter { |
|
|
|
None => { |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
if n_outstanding > MAX_BACKLOG { |
|
|
|
warn!(logger, "no available buffers in `spares`, pulling from backlog"; |
|
|
|
warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
@@ -662,7 +662,7 @@ impl InfluxWriter { |
|
|
|
|
|
|
|
None => { |
|
|
|
extras += 1; |
|
|
|
crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; |
|
|
|
crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"backlog.len()" => backlog.len(), |
|
|
@@ -675,7 +675,7 @@ impl InfluxWriter { |
|
|
|
extras += 1; |
|
|
|
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY; |
|
|
|
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; |
|
|
|
info!(logger, "allocating new buffer: zero spares avail"; |
|
|
|
info!(logger, "InfluxWriter: allocating new buffer: zero spares avail"; |
|
|
|
"allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"extras" => extras, |
|
|
@@ -700,7 +700,7 @@ impl InfluxWriter { |
|
|
|
let mut hb = Instant::now(); |
|
|
|
warn!(logger, "terminate signal rcvd"; "count" => count); |
|
|
|
if buf.len() > 0 { |
|
|
|
info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); |
|
|
|
info!(logger, "InfluxWriter: sending remaining buffer to influx on terminate"; "count" => count); |
|
|
|
let meas = OwnedMeasurement::new("influx_writer").add_field("n", OwnedValue::Integer(1)); |
|
|
|
let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last); |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
@@ -714,7 +714,7 @@ impl InfluxWriter { |
|
|
|
loop_time = Instant::now(); |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
if backlog.is_empty() && n_outstanding < 1 { |
|
|
|
info!(logger, "cleared any remaining backlog"; |
|
|
|
info!(logger, "InfluxWriter: cleared any remaining backlog"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"backlog.len()" => backlog.len(), |
|
|
@@ -739,7 +739,7 @@ impl InfluxWriter { |
|
|
|
} |
|
|
|
if let Some(buf) = backlog.pop_front() { |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
debug!(logger, "resending queued buffer from backlog"; |
|
|
|
debug!(logger, "InfluxWriter: resending queued buffer from backlog"; |
|
|
|
"backlog.len()" => backlog.len(), |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
@@ -759,13 +759,13 @@ impl InfluxWriter { |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(Err(Resp { buf, .. })) => { |
|
|
|
warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); |
|
|
|
warn!(logger, "InfluxWriter: requeueing failed request"; "buf.len()" => buf.len()); |
|
|
|
n_err += 1; |
|
|
|
in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity()); |
|
|
|
backlog.push_front(buf); |
|
|
|
} |
|
|
|
Err(chan::TryRecvError::Disconnected) => { |
|
|
|
crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting"; |
|
|
|
crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"backlog.len()" => backlog.len(), |
|
|
|
"n_cleared_ok" => n_ok, |
|
|
@@ -810,7 +810,7 @@ impl InfluxWriter { |
|
|
|
spares.push_back(buf); |
|
|
|
} else { |
|
|
|
extras = extras.saturating_sub(1); |
|
|
|
debug!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size"; |
|
|
|
debug!(logger, "InfluxWriter: dropping buffer to reduce memory back to INITIAL_BACKLOG size"; |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"extras" => extras, |
|
|
|
"in flight before" => in_flight_before, |
|
|
@@ -830,7 +830,7 @@ impl InfluxWriter { |
|
|
|
} |
|
|
|
|
|
|
|
Err(chan::TryRecvError::Disconnected) => { |
|
|
|
crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting"; |
|
|
|
crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"backlog.len()" => backlog.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|