|
@@ -296,6 +296,8 @@ impl InfluxWriter { |
|
|
.expect("influx writer url should parse"); |
|
|
.expect("influx writer url should parse"); |
|
|
|
|
|
|
|
|
let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { |
|
|
let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { |
|
|
|
|
|
const MAX_PENDING: Duration = Duration::from_secs(1); |
|
|
|
|
|
|
|
|
let client = Client::new(); |
|
|
let client = Client::new(); |
|
|
|
|
|
|
|
|
debug!(logger, "initializing buffers"); |
|
|
debug!(logger, "initializing buffers"); |
|
@@ -338,7 +340,7 @@ impl InfluxWriter { |
|
|
1 |
|
|
1 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
n if n < buffer_size && *loop_time - *last < Duration::from_secs(2) => { |
|
|
|
|
|
|
|
|
n if n < buffer_size && *loop_time - *last < MAX_PENDING => { |
|
|
buf.push_str("\n"); |
|
|
buf.push_str("\n"); |
|
|
serialize_owned(m, buf); |
|
|
serialize_owned(m, buf); |
|
|
n + 1 |
|
|
n + 1 |
|
@@ -373,10 +375,19 @@ impl InfluxWriter { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Ok(None) => { |
|
|
Ok(None) => { |
|
|
|
|
|
warn!(logger, "terminate signal rcvd"; "count" => count); |
|
|
if buf.len() > 0 { |
|
|
if buf.len() > 0 { |
|
|
debug!(logger, "sending buffer to influx"; "len" => count); |
|
|
|
|
|
send(&buf) |
|
|
|
|
|
|
|
|
info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); |
|
|
|
|
|
let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); |
|
|
|
|
|
count = next(buffer_size, &meas, &mut buf, &loop_time, &mut last); |
|
|
|
|
|
info!(logger, "triggered send of remaining buffer"; "count" => count); |
|
|
|
|
|
if !buf.is_empty() { |
|
|
|
|
|
warn!(logger, "buffer sill isn't empty after 'wtrterm' meas"; |
|
|
|
|
|
"count" => count, "buf.len()" => buf.len()); |
|
|
|
|
|
send(&buf); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
info!(logger, "exiting loop"; "count" => count, "buf.len()" => buf.len()); |
|
|
break |
|
|
break |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|