diff --git a/Cargo.toml b/Cargo.toml index 66fa938..da80c34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "logging" -version = "0.4.4" +version = "0.4.5" authors = ["Jonathan Strong "] [[example]] diff --git a/src/influx.rs b/src/influx.rs index 4a26994..d9612fc 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -7,7 +7,7 @@ use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use std::thread; #[cfg(feature = "warnings")] use std::fs; -use std::time::Duration; +use std::time::{Instant, Duration}; use std::hash::BuildHasherDefault; use hyper::status::StatusCode; @@ -33,6 +33,8 @@ pub use super::{dur_nanos, dt_nanos}; pub type Map = OrderMap>; +pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096; + pub fn new_map(capacity: usize) -> Map { Map::with_capacity_and_hasher(capacity, Default::default()) } @@ -269,11 +271,13 @@ impl InfluxWriter { } #[allow(unused_assignments)] - pub fn with_logger(host: &str, db: &str, buffer_size: u16, logger: Logger) -> Self { + pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self { let (tx, rx): (Sender>, Receiver>) = channel(); + let buffer_size = INFLUX_WRITER_MAX_BUFFER; + #[cfg(feature = "no-influx-buffer")] - let buffer_size = 0u16; + let buffer_size = 0usize; debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size); @@ -288,6 +292,8 @@ impl InfluxWriter { debug!(logger, "initializing buffers"); let mut buf = String::with_capacity(32 * 32 * 32); let mut count = 0; + let mut last = Instant::now(); + let mut loop_time = Instant::now(); let send = |buf: &str| { let resp = client.post(url.clone()) @@ -296,12 +302,15 @@ impl InfluxWriter { match resp { Ok(Response { status, .. }) if status == StatusCode::NoContent => { - debug!(logger, "server responded ok: 204 NoContent"); + debug!(logger, "server responded ok: 204 NoContent"); } Ok(mut resp) => { - let mut server_resp = String::with_capacity(1024); + let mut server_resp = String::with_capacity(32 * 1024); // need to allocate here bc will be + // sent to logging thread + let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); + error!(logger, "influx server error"; "status" => resp.status.to_string(), "body" => server_resp); @@ -313,14 +322,14 @@ impl InfluxWriter { } }; - let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 { + let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: &Instant, last: &mut Instant| -> usize { match prev { 0 if buffer_size > 0 => { serialize_owned(m, buf); 1 } - n if n < buffer_size => { + n if n < buffer_size && *loop_time - *last < Duration::from_secs(2) => { buf.push_str("\n"); serialize_owned(m, buf); n + 1 @@ -331,6 +340,7 @@ impl InfluxWriter { serialize_owned(m, buf); debug!(logger, "sending buffer to influx"; "len" => n); send(buf); + *last = *loop_time; buf.clear(); 0 } @@ -338,6 +348,7 @@ impl InfluxWriter { }; loop { + loop_time = Instant::now(); match rx.recv() { Ok(Some(mut meas)) => { @@ -345,7 +356,7 @@ impl InfluxWriter { //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } } - count = next(count, &meas, &mut buf); + count = next(count, &meas, &mut buf, &loop_time, &mut last); } Ok(None) => {