diff --git a/Cargo.toml b/Cargo.toml index 599b4c3..4c632d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "influx-writer" -version = "0.9.1" +version = "0.10.0" authors = ["Jonathan Strong "] edition = "2018" diff --git a/src/lib.rs b/src/lib.rs index 33cfadd..9282403 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -586,7 +586,7 @@ impl InfluxWriter { if loop_time - last_memory_check > Duration::from_secs(300) { let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; - info!(logger, "allocated memory: {:.1}MB", allocated_mb; + info!(logger, "InfluxWriter: allocated memory: {:.1}MB", allocated_mb; "allocated bytes" => allocated_bytes, "in flight buffer bytes" => in_flight_buffer_bytes, "spares.len()" => spares.len(), @@ -603,7 +603,7 @@ impl InfluxWriter { let n_outstanding = n_out(&spares, &backlog, extras); let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; - info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); + info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep(); "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, @@ -615,7 +615,7 @@ impl InfluxWriter { let n_outstanding = n_out(&spares, &backlog, extras); let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes); let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; - debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); + debug!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep(); "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, @@ -643,7 +643,7 @@ impl InfluxWriter { None => { let n_outstanding = n_out(&spares, &backlog, extras); if n_outstanding > MAX_BACKLOG { - warn!(logger, "no available buffers in `spares`, pulling from backlog"; + warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog"; "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, @@ -662,7 +662,7 @@ impl InfluxWriter { None => { extras += 1; - crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; + crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "backlog.len()" => backlog.len(), @@ -675,7 +675,7 @@ impl InfluxWriter { extras += 1; let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY; let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0; - info!(logger, "allocating new buffer: zero spares avail"; + info!(logger, "InfluxWriter: allocating new buffer: zero spares avail"; "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb), "n_outstanding" => n_outstanding, "extras" => extras, @@ -700,7 +700,7 @@ impl InfluxWriter { let mut hb = Instant::now(); warn!(logger, "terminate signal rcvd"; "count" => count); if buf.len() > 0 { - info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); + info!(logger, "InfluxWriter: sending remaining buffer to influx on terminate"; "count" => count); let meas = OwnedMeasurement::new("influx_writer").add_field("n", OwnedValue::Integer(1)); let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last); let n_outstanding = n_out(&spares, &backlog, extras); @@ -714,7 +714,7 @@ impl InfluxWriter { loop_time = Instant::now(); let n_outstanding = n_out(&spares, &backlog, extras); if backlog.is_empty() && n_outstanding < 1 { - info!(logger, "cleared any remaining backlog"; + info!(logger, "InfluxWriter: cleared any remaining backlog"; "n_outstanding" => n_outstanding, "spares.len()" => spares.len(), "backlog.len()" => backlog.len(), @@ -739,7 +739,7 @@ impl InfluxWriter { } if let Some(buf) = backlog.pop_front() { let n_outstanding = n_out(&spares, &backlog, extras); - debug!(logger, "resending queued buffer from backlog"; + debug!(logger, "InfluxWriter: resending queued buffer from backlog"; "backlog.len()" => backlog.len(), "spares.len()" => spares.len(), "n_rcvd" => n_rcvd, @@ -759,13 +759,13 @@ impl InfluxWriter { } } Ok(Err(Resp { buf, .. })) => { - warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); + warn!(logger, "InfluxWriter: requeueing failed request"; "buf.len()" => buf.len()); n_err += 1; in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity()); backlog.push_front(buf); } Err(chan::TryRecvError::Disconnected) => { - crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting"; + crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting"; "n_outstanding" => n_outstanding, "backlog.len()" => backlog.len(), "n_cleared_ok" => n_ok, @@ -810,7 +810,7 @@ impl InfluxWriter { spares.push_back(buf); } else { extras = extras.saturating_sub(1); - debug!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size"; + debug!(logger, "InfluxWriter: dropping buffer to reduce memory back to INITIAL_BACKLOG size"; "spares.len()" => spares.len(), "extras" => extras, "in flight before" => in_flight_before, @@ -830,7 +830,7 @@ impl InfluxWriter { } Err(chan::TryRecvError::Disconnected) => { - crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting"; + crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting"; "n_outstanding" => n_outstanding, "backlog.len()" => backlog.len(), "n_rcvd" => n_rcvd,