From 25f6d4c20615c23b38c313d8c1006897ee86afd8 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Sun, 7 Oct 2018 01:48:02 -0400 Subject: [PATCH] add logging/anal retentiveness to kill routine --- src/influx.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/influx.rs b/src/influx.rs index 5896d5e..8e287e0 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -296,6 +296,8 @@ impl InfluxWriter { .expect("influx writer url should parse"); let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { + const MAX_PENDING: Duration = Duration::from_secs(1); + let client = Client::new(); debug!(logger, "initializing buffers"); @@ -338,7 +340,7 @@ impl InfluxWriter { 1 } - n if n < buffer_size && *loop_time - *last < Duration::from_secs(2) => { + n if n < buffer_size && *loop_time - *last < MAX_PENDING => { buf.push_str("\n"); serialize_owned(m, buf); n + 1 @@ -373,10 +375,19 @@ impl InfluxWriter { } Ok(None) => { + warn!(logger, "terminate signal rcvd"; "count" => count); if buf.len() > 0 { - debug!(logger, "sending buffer to influx"; "len" => count); - send(&buf) + info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); + let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); + count = next(buffer_size, &meas, &mut buf, &loop_time, &mut last); + info!(logger, "triggered send of remaining buffer"; "count" => count); + if !buf.is_empty() { + warn!(logger, "buffer sill isn't empty after 'wtrterm' meas"; + "count" => count, "buf.len()" => buf.len()); + send(&buf); + } } + info!(logger, "exiting loop"; "count" => count, "buf.len()" => buf.len()); break }