From 7b8f8c09184f88878c3cdea185d95c0a944066e1 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 15 Feb 2018 04:03:20 -0500 Subject: [PATCH] InfluxWriter loop now blocks on recv, among other incremental improvements --- .gitignore | 1 + Cargo.toml | 2 +- src/hist.rs | 13 ++++++++++--- src/influx.rs | 12 ++++++++---- src/lib.rs | 2 +- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 7afe480..f594fe0 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ Cargo.lock .*.swp /var/*.log +/var/ diff --git a/Cargo.toml b/Cargo.toml index 708e5b6..ebe0d14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ slog-term = "2" ordermap = "0.3" fnv = "1" uuid = { version = "0.5", features = ["serde", "v4"] } -hdrsample = "6" +hdrhistogram = "6" slog-async = "2" decimal = { path = "../decimal", version = "2" } diff --git a/src/hist.rs b/src/hist.rs index 0751dd4..7430f2f 100644 --- a/src/hist.rs +++ b/src/hist.rs @@ -7,9 +7,9 @@ use std::io::{self, Write}; use std::{mem, fs, env}; use chrono::{DateTime, Utc, TimeZone}; -use hdrsample::{Histogram, Counter}; -use hdrsample::serialization::{Serializer, V2DeflateSerializer, V2Serializer}; -use hdrsample::serialization::interval_log::{IntervalLogWriterBuilder, Tag}; +use hdrhistogram::{Histogram, Counter}; +use hdrhistogram::serialization::{Serializer, V2DeflateSerializer, V2Serializer}; +use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder, Tag}; type C = u64; @@ -61,6 +61,12 @@ impl HistLog { } } + pub fn clone_with_tag_and_freq(&self, tag: &'static str, freq: Duration) -> HistLog { + let mut clone = self.clone_with_tag(tag); + clone.freq = freq; + clone + } + pub fn record(&mut self, value: u64) { let _ = self.hist.record(value); } @@ -88,6 +94,7 @@ impl HistLog { pub fn check_send(&mut self, loop_time: Instant) { //let since = loop_time - self.last_sent; if loop_time > self.last_sent && loop_time - self.last_sent >= self.freq { + // send sets self.last_sent to loop_time fyi self.send(loop_time); } } diff --git a/src/influx.rs b/src/influx.rs index 7fe0d4c..a5a1db3 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -297,7 +297,8 @@ impl InfluxWriter { }; loop { - match rx.try_recv() { + //match rx.try_recv() { + match rx.recv() { Ok(Some(mut meas)) => { if meas.timestamp.is_none() { meas.timestamp = Some(now()); @@ -308,13 +309,16 @@ impl InfluxWriter { } Ok(None) => { - if buf.len() > 0 { send(&buf) } + if buf.len() > 0 { + debug!(logger, "sending buffer to influx"; "len" => count); + send(&buf) + } break } _ => { - #[cfg(feature = "no-thrash")] - thread::sleep(Duration::new(0, 1)) + //#[cfg(feature = "no-thrash")] + thread::sleep(Duration::new(0, 1)) } } } diff --git a/src/lib.rs b/src/lib.rs index b9b53f1..c5ba48e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ extern crate fnv; extern crate ordermap; extern crate decimal; extern crate uuid; -extern crate hdrsample; +extern crate hdrhistogram; extern crate windows; extern crate pubsub as pub_sub;