diff --git a/Cargo.toml b/Cargo.toml index 6ee8b8c..ad01dec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,56 +1,33 @@ [package] -name = "logging" -version = "0.5.2" +name = "influx-writer" +version = "0.6.0" authors = ["Jonathan Strong "] edition = "2018" -[[example]] -name = "hist-interval" -path = "examples/hist-interval.rs" +[lib] +name = "influx_writer" +path = "src/lib.rs" [dependencies] chrono = { version = "0.4", features = ["serde"] } hyper = "0.10" -termion = "1.4.0" -slog = "2.0.6" +slog = "2" slog-term = "2" uuid = { version = "0.8", features = ["serde", "v4", "slog"] } -hdrhistogram = "6" slog-async = "2" smallvec = "0.6" -num = "0.1" -dirs = "1" crossbeam-channel = "0.3" pretty_toa = "1.0.0" -sloggers = "0.3" -#sloggers = { path = "../sloggers" } - -#decimal = { path = "../decimal", version = "2" } -#decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } decimal-macros = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } -#windows = { path = "../windows", version = "0.2" } -money = { path = "../money", version = "0.3" } -pubsub = { path = "../pubsub", optional = true } - [features] -default = ["inlines"] -no-thrash = [] +default = ["string-tags"] trace = ["slog/release_max_level_trace", "slog/max_level_trace"] debug = ["slog/release_max_level_debug", "slog/max_level_debug"] -test = [] -localhost = [] -harrison = [] -washington = [] -scholes = [] -no-influx-buffer = [] -disable-short-uuid = [] -warnings = [] -inlines = [] -latency = ["pubsub"] string-tags = [] +unstable = [] [profile.bench] lto = true diff --git a/README.md b/README.md new file mode 100644 index 0000000..b831c1d --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# influx-writer + +An opinionated influxdb client in rust: + +- http write requests are managed by a worker thread, so "writing" to influx is wait free on the hot path +- worker thread recovers from failed writes/requests with robust retry and failure handling +- library includes `measure!`, a mini-dsl for creating and sending measurements to influxdb + +## `measure!` + +`measure!` is a macro that allows highly ergonomic creation and sending of `OwnedMeasurement` instances, at the cost of a bit of a learning curve. + +A single letter is used to denote the type of a field, and `t` is used to denote a tag: + +```rust +measure!(influx, "measurement_name", t(color, "red"), i(n, 1), tm(now())); +``` + +In the above example, `influx` is the identifier for a `InfluxWriter` instance. `InfluxWriter` is a handle to the worker thread that is sending data to the influxdb server. + +Now for a more advanced example: if only a single argument is passed to `t`, `f`, `i`, `b`, and other field type short-hands, it creates a field or tag by that name, and uses value in a variable binding of the same name as the field or tag's value: + +```rust +let color = "red"; +let n = 1; +let t = now(); // in this example, now() would return integer timestamp +measure!(influx, "even_faster", t(color), i(n), tm(t)); +``` + +In the example above, `measure!` creates a `OwnedMeasurement` instance, fills it with a tag named "color" with the value "red", an integer field "n" with the value 1, and a timestamp with the value that's in the variable `t`. Then it uses `influx` to send the measurement to the worker thread. diff --git a/examples/hist-interval.rs b/examples/hist-interval.rs deleted file mode 100644 index dbb3880..0000000 --- a/examples/hist-interval.rs +++ /dev/null @@ -1,127 +0,0 @@ -#![allow(unused)] - -extern crate logging; - -use std::time::{Instant, Duration}; -use std::thread; - -use logging::hist::{Entry, HistLog, nanos}; - -const N_SECS: u64 = 30; - -//const N_THREADS: usize = 48; - -macro_rules! n_threads { - ($n:expr) => { - const N_THREADS: usize = $n; - const NAME: &'static str = concat!("sleeptest_", stringify!($n)); - } -} - -n_threads!(96); - -fn main() { - ::std::process::exit(test_n()); -} - -fn test_n() -> i32 { - let start = Instant::now(); - let hist = HistLog::new(NAME, "master", Duration::from_millis(100)); - for _ in 0..N_THREADS { - let mut a = hist.clone_with_tag("sleep_1ns"); - thread::spawn(move || { - - let mut prev = Instant::now(); - let mut loop_time = Instant::now(); - - loop { - prev = loop_time; - loop_time = Instant::now(); - a.record(nanos(loop_time - prev)); - a.check_send(loop_time); - - - if loop_time - start > Duration::from_secs(N_SECS) { break } - - thread::sleep(Duration::new(0, 1)); - //thread::yield_now(); - } - }); - } - - thread::sleep(Duration::from_secs(N_SECS)); - - 0 -} - -fn test_3() -> i32 { - let start = Instant::now(); - let hist = HistLog::new("sleeptest", "master", Duration::from_millis(100)); - let mut a = hist.clone_with_tag("yield"); - let mut b = hist.clone_with_tag("sleep_0"); - let mut c = hist.clone_with_tag("sleep_100ns"); - - thread::spawn(move || { - - let mut prev = Instant::now(); - let mut loop_time = Instant::now(); - let mut i = 0; - - loop { - prev = loop_time; - loop_time = Instant::now(); - a.record(nanos(loop_time - prev)); - a.check_send(loop_time); - - - if loop_time - start > Duration::from_secs(N_SECS) { break } - - i += 1; - //if i % 100 == 0 { thread::sleep(Duration::new(0, 0)); } - //thread::sleep(Duration::new(0, 0)); - thread::yield_now(); - } - }); - - thread::spawn(move || { - let mut prev = Instant::now(); - let mut loop_time = Instant::now(); - let mut i = 0; - - loop { - prev = loop_time; - loop_time = Instant::now(); - b.record(nanos(loop_time - prev)); - b.check_send(loop_time); - - if loop_time - start > Duration::from_secs(N_SECS) { break } - - i += 1; - //if i % 1_000 == 0 { thread::sleep(Duration::new(0, 0)); } - //if i % 100 == 0 { thread::sleep(Duration::new(0, 0)); } - thread::sleep(Duration::new(0, 0)); - } - }); - - let mut prev = Instant::now(); - let mut loop_time = Instant::now(); - let mut i = 0; - - loop { - prev = loop_time; - loop_time = Instant::now(); - c.record(nanos(loop_time - prev)); - c.check_send(loop_time); - - if loop_time - start > Duration::from_secs(N_SECS) { break } - - i += 1; - //if i % 100_000 == 0 { thread::sleep(Duration::from_millis(10)); } - //if i % 100 == 0 { thread::sleep(Duration::new(0, 0)); } - thread::sleep(Duration::new(0, 100)); - } - - 0 -} - - diff --git a/examples/precipice.rs b/examples/precipice.rs deleted file mode 100644 index 1c18b76..0000000 --- a/examples/precipice.rs +++ /dev/null @@ -1,62 +0,0 @@ -#![allow(unused_imports)] - -#[macro_use] -extern crate slog; -#[macro_use] -extern crate logging; - -use std::io::{self, prelude::*}; -use std::thread; -use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; -use std::time::*; -use chrono::Utc; -use slog::Drain; -use pretty_toa::ThousandsSep; -use logging::influx::InfluxWriter; - -const INTERVAL: Duration = Duration::from_micros(1); //from_millis(1); -const HB_EVERY: usize = 1_000_000; - -fn main() { - let to_file = logging::truncating_file_logger("var/log/precipice.log", sloggers::types::Severity::Debug); - let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); - let drain = slog_term::CompactFormat::new(decorator).use_utc_timestamp().build().fuse(); - let drain = slog_async::Async::new(drain).chan_size(8192).thread_name("recv".into()).build().fuse(); - let drain = slog::Duplicate::new(drain, to_file).fuse(); - let root = slog::Logger::root(drain, o!()); - let logger = root.new(o!("thread" => "main")); - info!(logger, "initializing..."); - let influx = InfluxWriter::with_logger("localhost", "precipice", 1024, root.new(o!("thread" => "InfluxWriter"))); - let stop = Arc::new(AtomicBool::new(false)); - let thread = { - let stop = Arc::clone(&stop); - let logger = root.new(o!("thread" => "blaster")); - let influx = influx.clone(); - thread::spawn(move || { - let mut i = 0; - let mut sum = 0; - while !stop.load(Ordering::Relaxed) { - measure!(influx, xs, i(i), tm(logging::inanos(Utc::now()))); - sum += i; - i += 1; - if i % HB_EVERY == 0 { - info!(logger, "sent {} measurements", i.thousands_sep()); - } - thread::sleep(INTERVAL); - } - info!(logger, "exiting"; "n_sent" => i, "sum" => sum); - }) - }; - - let mut keys = String::new(); - loop { - if let Ok(_) = io::stdin().read_line(&mut keys) { - break - } - thread::sleep(Duration::from_millis(1)); - } - stop.store(true, Ordering::Relaxed); - let _ = thread.join(); -} - - diff --git a/src/hist.rs b/src/hist.rs deleted file mode 100644 index f34bff7..0000000 --- a/src/hist.rs +++ /dev/null @@ -1,195 +0,0 @@ -use std::sync::mpsc::{Sender, Receiver, channel}; -use std::sync::Arc; -use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; -use std::path::PathBuf; -use std::thread::{self, JoinHandle}; -use std::io; -use std::{mem, fs}; - -use dirs::home_dir; -use hdrhistogram::{Histogram}; -use hdrhistogram::serialization::V2DeflateSerializer; -use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder, Tag}; - -pub type C = u64; - -pub fn nanos(d: Duration) -> u64 { - d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64) -} - -pub struct HistLog { - series: &'static str, - tag: &'static str, - freq: Duration, - last_sent: Instant, - tx: Sender>, - hist: Histogram, - thread: Option>>, -} - -pub struct Entry { - pub tag: &'static str, - pub start: SystemTime, - pub end: SystemTime, - pub hist: Histogram, -} - -impl Clone for HistLog { - fn clone(&self) -> Self { - let thread = self.thread.as_ref().map(|x| Arc::clone(x)); - Self { - series: self.series.clone(), - tag: self.tag.clone(), - freq: self.freq.clone(), - last_sent: Instant::now(), - tx: self.tx.clone(), - hist: self.hist.clone(), - thread, - } - } -} - -impl HistLog { - pub fn new(series: &'static str, tag: &'static str, freq: Duration) -> Self { - let (tx, rx) = channel(); - let mut dir = home_dir().expect("home_dir"); - dir.push("src/market-maker/var/hist"); - fs::create_dir_all(&dir).unwrap(); - let thread = Some(Arc::new(Self::scribe(series, rx, dir))); - let last_sent = Instant::now(); - let hist = Histogram::new(3).unwrap(); - Self { series, tag, freq, last_sent, tx, hist, thread } - } - - /// Create a new `HistLog` that will save results in a specified - /// directory (`path`). - pub fn with_path( - path: &str, - series: &'static str, - tag: &'static str, - freq: Duration, - ) -> Self { - let (tx, rx) = channel(); - let dir = PathBuf::from(path); - // let mut dir = env::home_dir().unwrap(); - // dir.push("src/market-maker/var/hist"); - fs::create_dir_all(&dir).ok(); - let thread = Some(Arc::new(Self::scribe(series, rx, dir))); - let last_sent = Instant::now(); - let hist = Histogram::new(3).unwrap(); - Self { series, tag, freq, last_sent, tx, hist, thread } - } - - - pub fn new_with_tag(&self, tag: &'static str) -> Self { - Self::new(self.series, tag, self.freq) - } - - pub fn clone_with_tag(&self, tag: &'static str) -> Self { - let thread = self.thread.as_ref().map(|x| Arc::clone(x)).unwrap(); - assert!(self.thread.is_some(), "self.thread is {:?}", self.thread); - let tx = self.tx.clone(); - Self { - series: self.series, - tag, - freq: self.freq, - last_sent: Instant::now(), - tx, - hist: self.hist.clone(), - thread: Some(thread), - } - } - - pub fn clone_with_tag_and_freq(&self, tag: &'static str, freq: Duration) -> HistLog { - let mut clone = self.clone_with_tag(tag); - clone.freq = freq; - clone - } - - pub fn record(&mut self, value: u64) { - let _ = self.hist.record(value); - } - - /// If for some reason there was a pause in between using the struct, - /// this resets the internal state of both the values recorded to the - /// `Histogram` and the value of when it last sent a `Histogram` onto - /// the writing thread. - /// - pub fn reset(&mut self) { - self.hist.clear(); - self.last_sent = Instant::now(); - } - - fn send(&mut self, loop_time: Instant) { - let end = SystemTime::now(); - let start = end - (loop_time - self.last_sent); - assert!(end > start, "end <= start!"); - let mut next = Histogram::new_from(&self.hist); - mem::swap(&mut self.hist, &mut next); - self.tx.send(Some(Entry { tag: self.tag, start, end, hist: next })).expect("sending entry failed"); - self.last_sent = loop_time; - } - - pub fn check_send(&mut self, loop_time: Instant) { - //let since = loop_time - self.last_sent; - if loop_time > self.last_sent && loop_time - self.last_sent >= self.freq { - // send sets self.last_sent to loop_time fyi - self.send(loop_time); - } - } - - fn scribe( - series : &'static str, - rx : Receiver>, - dir : PathBuf, - ) -> JoinHandle<()> { - let mut ser = V2DeflateSerializer::new(); - let start_time = SystemTime::now(); - let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs(); - let path = dir.join(&format!("{}-interval-log-{}.v2z", series, seconds)); - let file = fs::File::create(&path).unwrap(); - thread::Builder::new().name(format!("mm:hist:{}", series)).spawn(move || { - let mut buf = io::LineWriter::new(file); - let mut wtr = - IntervalLogWriterBuilder::new() - .with_base_time(UNIX_EPOCH) - .with_start_time(start_time) - .begin_log_with(&mut buf, &mut ser) - .unwrap(); - - loop { - match rx.recv() { //.recv_timeout(Duration::from_millis(1)) { - //match rx.recv_timeout(Duration::new(1, 0)) { - Ok(Some(Entry { tag, start, end, hist })) => { - wtr.write_histogram(&hist, start.duration_since(UNIX_EPOCH).unwrap(), - end.duration_since(start).unwrap(), Tag::new(tag)) - .ok(); - //.map_err(|e| { println!("{:?}", e); e }).ok(); - } - - // `None` used as terminate signal from `Drop` - Ok(None) => break, - - _ => { - thread::sleep(Duration::new(0, 0)); - } - } - - } - }).unwrap() - } -} - -impl Drop for HistLog { - fn drop(&mut self) { - if !self.hist.is_empty() { self.send(Instant::now()) } - - if let Some(arc) = self.thread.take() { - //println!("in Drop, strong count is {}", Arc::strong_count(&arc)); - if let Ok(thread) = Arc::try_unwrap(arc) { - let _ = self.tx.send(None); - let _ = thread.join(); - } - } - } -} diff --git a/src/influx.rs b/src/influx.rs deleted file mode 100644 index b643902..0000000 --- a/src/influx.rs +++ /dev/null @@ -1,1417 +0,0 @@ -//! Utilities to efficiently send data to influx -//! - -use std::io::Read; -use std::sync::Arc; -use crossbeam_channel::{Sender, Receiver, bounded, SendError}; -use std::{thread, mem}; -use std::time::*; -use std::collections::VecDeque; - -use hyper::status::StatusCode; -use hyper::client::response::Response; -use hyper::Url; -use hyper::client::Client; -use chrono::{DateTime, Utc}; -use decimal::d128; -use uuid::Uuid; -use smallvec::SmallVec; -use slog::Logger; -use pretty_toa::ThousandsSep; - -use super::{nanos, file_logger, LOG_LEVEL}; -#[cfg(feature = "warnings")] -use warnings::Warning; - -pub use super::{dur_nanos, dt_nanos, measure}; - -pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096; - -/// Created this so I know what types can be passed through the -/// `measure!` macro, which used to convert with `as i64` and -/// `as f64` until I accidentally passed a function name, and it -/// still compiled, but with garbage numbers. -pub trait AsI64 { - fn as_i64(x: Self) -> i64; -} - -impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } } -impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } } -impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } } - -/// Created this so I know what types can be passed through the -/// `measure!` macro, which used to convert with `as i64` and -/// `as f64` until I accidentally passed a function name, and it -/// still compiled, but with garbage numbers. -pub trait AsF64 { - fn as_f64(x: Self) -> f64; -} - -impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } } -impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } } -impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } } -impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } } -impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } } -impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } } -impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } } - -/// Provides flexible and ergonomic use of `Sender`. -/// -/// The macro both creates an `OwnedMeasurement` from the supplied tags and -/// values, as well as sends it with the `Sender`. -/// -/// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized -/// measurement (see `tests` mod). -/// -/// # Examples -/// -/// ``` -/// #![feature(try_from)] -/// #[macro_use] extern crate logging; -/// extern crate decimal; -/// -/// use std::sync::mpsc::channel; -/// use decimal::d128; -/// use logging::influx::*; -/// -/// fn main() { -/// let (tx, rx) = crossbeam_channel::bounded(1024); -/// -/// // "shorthand" syntax -/// -/// measure!(tx, test, tag[color;"red"], int[n;1]); -/// -/// let meas: OwnedMeasurement = rx.recv().unwrap(); -/// -/// assert_eq!(meas.key, "test"); -/// assert_eq!(meas.get_tag("color"), Some("red")); -/// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1))); -/// -/// // alternate syntax ... -/// -/// measure!(tx, test, -/// tag [ one => "a" ], -/// tag [ two => "b" ], -/// int [ three => 2 ], -/// float [ four => 1.2345 ], -/// string [ five => String::from("d") ], -/// bool [ six => true ], -/// int [ seven => { 1 + 2 } ], -/// time [ 1 ] -/// ); -/// -/// let meas: OwnedMeasurement = rx.recv().unwrap(); -/// -/// assert_eq!(meas.key, "test"); -/// assert_eq!(meas.get_tag("one"), Some("a")); -/// assert_eq!(meas.get_tag("two"), Some("b")); -/// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2))); -/// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3))); -/// assert_eq!(meas.timestamp, Some(1)); -/// -/// // use the @make_meas flag to skip sending a measurement, instead merely -/// // creating it. -/// -/// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]); -/// -/// // each variant also has shorthand aliases -/// -/// let meas: OwnedMeasurement = -/// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]); -/// } -/// ``` -/// -#[macro_export] -macro_rules! measure { - (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; - (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; - (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; - (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) }; - (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) }; - (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) }; - (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) }; - (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) }; - (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; - (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; - (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) }; - (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) }; - (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) }; - (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) }; - (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; - (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; - (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) }; - (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) }; - (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) }; - (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) }; - (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) }; - (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) }; - - (@as_expr $e:expr) => {$e}; - - (@count_tags) => {0usize}; - (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; - (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; - - (@count_fields) => {0usize}; - (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; - (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; - (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)}; - - (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => { - measure!(@make_meas $name, $( $t [ $($tail)* ] ),*) - }; - - (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ - let n_tags = measure!(@count_tags $($t)*); - let n_fields = measure!(@count_fields $($t)*); - let mut meas = - $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); - $( - measure!(@kv $t, meas, $($tail)*); - )* - meas - }}; - - ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => { - measure!($m, $name, $($t [ $($tail)* ] ),+) - }; - - ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ - #[allow(unused_imports)] - use $crate::influx::{AsI64, AsF64}; - let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*); - let _ = $m.send(measurement); - }}; -} - -#[derive(Clone, Debug)] -pub struct Point { - pub time: T, - pub value: V -} -pub struct DurationWindow { - pub size: Duration, - pub mean: Duration, - pub sum: Duration, - pub count: u32, - pub items: VecDeque> -} - -impl DurationWindow { - #[inline] - pub fn update(&mut self, time: Instant, value: Duration) { - self.add(time, value); - self.refresh(time); - } - - #[inline] - pub fn refresh(&mut self, t: Instant) -> &Self { - if !self.items.is_empty() { - let (n_remove, sum, count) = - self.items.iter() - .take_while(|x| t - x.time > self.size) - .fold((0, self.sum, self.count), |(n_remove, sum, count), x| { - (n_remove + 1, sum - x.value, count - 1) - }); - self.sum = sum; - self.count = count; - for _ in 0..n_remove { - self.items.pop_front(); - } - } - - if self.count > 0 { - self.mean = self.sum / self.count.into(); - } - - self - } - - #[inline] - pub fn add(&mut self, time: Instant, value: Duration) { - let p = Point { time, value }; - self.sum += p.value; - self.count += 1; - self.items.push_back(p); - } -} - -/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s -/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` -/// measurements have accumulated. -/// -#[derive(Debug)] -pub struct InfluxWriter { - host: String, - db: String, - tx: Sender>, - thread: Option>>, -} - -impl Default for InfluxWriter { - fn default() -> Self { - //if cfg!(any(test, feature = "test")) { - // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0) - //} else { - InfluxWriter::new("localhost", "test", "/tmp/influx-test.log", 4096) - //} - } -} - -impl Clone for InfluxWriter { - fn clone(&self) -> Self { - debug_assert!(self.thread.is_some()); - let thread = self.thread.as_ref().map(|x| Arc::clone(x)); - InfluxWriter { - host: self.host.to_string(), - db: self.db.to_string(), - tx: self.tx.clone(), - thread, - } - } -} - -impl InfluxWriter { - pub fn host(&self) -> &str { self.host.as_str() } - - pub fn db(&self) -> &str { self.db.as_str() } - - /// Sends the `OwnedMeasurement` to the serialization thread. - /// - #[cfg_attr(feature = "inlines", inline)] - pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError>> { - self.tx.send(Some(m)) - } - - #[cfg_attr(feature = "inlines", inline)] - pub fn nanos(&self, d: DateTime) -> i64 { nanos(d) as i64 } - - #[cfg_attr(feature = "inlines", inline)] - pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 } - - #[cfg_attr(feature = "inlines", inline)] - pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 } - - #[cfg_attr(feature = "inlines", inline)] - pub fn rsecs(&self, d: Duration) -> f64 { - ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64)) - * 1000.0) - .round() - / 1000.0 - } - - #[cfg_attr(feature = "inlines", inline)] - pub fn secs(&self, d: Duration) -> f64 { - d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64 - } - - pub fn tx(&self) -> Sender> { - self.tx.clone() - } - - pub fn is_full(&self) -> bool { self.tx.is_full() } - - pub fn placeholder() -> Self { - let (tx, _) = bounded(1024); - Self { - host: String::new(), - db: String::new(), - tx, - thread: None, - } - } - - pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self { - let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread - Self::with_logger(host, db, buffer_size, logger) - } - - #[allow(unused_assignments)] - pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self { - let logger = logger.new(o!( - "host" => host.to_string(), - "db" => db.to_string())); - let (tx, rx): (Sender>, Receiver>) = bounded(4096); - let url = - Url::parse_with_params(&format!("http://{}:8086/write", host), - &[("db", db), ("precision", "ns")]) - .expect("influx writer url should parse"); - let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { - use std::time::*; - use crossbeam_channel as chan; - - #[cfg(feature = "no-influx-buffer")] - const N_BUFFER_LINES: usize = 0; - - const N_BUFFER_LINES: usize = 8192; - const MAX_PENDING: Duration = Duration::from_secs(3); - const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32; - const MAX_BACKLOG: usize = 512; - const MAX_OUTSTANDING_HTTP: usize = 64; - const HB_EVERY: usize = 100_000; - const N_HTTP_ATTEMPTS: u32 = 15; - - let client = Arc::new(Client::new()); - - info!(logger, "initializing InfluxWriter ..."; - "N_BUFFER_LINES" => N_BUFFER_LINES, - "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING), - "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP, - "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, - "MAX_BACKLOG" => MAX_BACKLOG); - - // pre-allocated buffers ready for use if the active one is stasheed - // during an outage - let mut spares: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); - - // queue failed sends here until problem resolved, then send again. in worst - // case scenario, loop back around on buffers queued in `backlog`, writing - // over the oldest first. - // - let mut backlog: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); - - for _ in 0..MAX_BACKLOG { - spares.push_back(String::with_capacity(32 * 32 * 32)); - } - - struct Resp { - pub buf: String, - pub took: Duration, - } - - let mut db_health = DurationWindow { - size: Duration::from_secs(120), - mean: Duration::new(10, 0), - sum: Duration::new(0, 0), - count: 0, - items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP), - }; - - let (http_tx, http_rx) = chan::bounded(32); - - let mut buf = spares.pop_front().unwrap(); - let mut count = 0; - let mut extras = 0; // any new Strings we intro to the system - let mut n_rcvd = 0; - let mut last = Instant::now(); - let mut active: bool; - let mut last_clear = Instant::now(); - let mut loop_time = Instant::now(); - - let n_out = |s: &VecDeque, b: &VecDeque, extras: usize| -> usize { - MAX_BACKLOG + extras - s.len() - b.len() - 1 - }; - - assert_eq!(n_out(&spares, &backlog, extras), 0); - - let send = |mut buf: String, backlog: &mut VecDeque, n_outstanding: usize| { - if n_outstanding >= MAX_OUTSTANDING_HTTP { - backlog.push_back(buf); - return - } - let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url - let tx = http_tx.clone(); - let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure - let client = Arc::clone(&client); - debug!(logger, "launching http thread"); - let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { - let logger = thread_logger; - debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len()); - let start = Instant::now(); - for n_req in 0..N_HTTP_ATTEMPTS { - let throttle = Duration::from_secs(2) * n_req * n_req; - if n_req > 0 { - warn!(logger, "InfluxWriter http thread: pausing before next request"; - "n_req" => n_req, - "throttle" => %format_args!("{:?}", throttle), - "elapsed" => %format_args!("{:?}", Instant::now() - start)); - thread::sleep(throttle); // 0, 2, 8, 16, 32 - } - let sent = Instant::now(); - let resp = client.post(url.clone()) - .body(buf.as_str()) - .send(); - let rcvd = Instant::now(); - let took = rcvd - sent; - let mut n_tx = 0u32; - match resp { - Ok(Response { status, .. }) if status == StatusCode::NoContent => { - debug!(logger, "server responded ok: 204 NoContent"); - buf.clear(); - let mut resp = Some(Ok(Resp { buf, took })); - loop { - n_tx += 1; - match tx.try_send(resp.take().unwrap()) { - Ok(_) => { - if n_req > 0 { - info!(logger, "successfully recovered from failed request with retry"; - "n_req" => n_req, - "n_tx" => n_tx, - "elapsed" => %format_args!("{:?}", Instant::now() - start)); - } - return - } - - Err(chan::TrySendError::Full(r)) => { - let throttle = Duration::from_millis(1000) * n_tx; - warn!(logger, "channel full: InfluxWriter http thread failed to return buf"; - "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle)); - resp = Some(r); - thread::sleep(throttle); - } - - Err(chan::TrySendError::Disconnected(_)) => { - warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return"; - "n_tx" => n_tx, "n_req" => n_req); - return - } - } - } - } - - Ok(mut resp) => { - let mut server_resp = String::new(); - let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); - error!(logger, "influx server error (request took {:?})", took; - "status" => %resp.status, - "body" => server_resp); - } - - Err(e) => { - error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e); - } - } - - } - let took = Instant::now() - start; - warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer"; - "took" => %format_args!("{:?}", took)); - let buflen = buf.len(); - let n_lines = buf.lines().count(); - if let Err(e) = tx.send(Err(Resp { buf, took })) { - crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e; - "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines); - } - }); - - if let Err(e) = thread_res { - crit!(logger, "failed to spawn thread: {}", e); - } - }; - - let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result { - match prev { - 0 if N_BUFFER_LINES > 0 => { - serialize_owned(m, buf); - Ok(1) - } - - n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => { - buf.push_str("\n"); - serialize_owned(m, buf); - Ok(n + 1) - } - - n => { - buf.push_str("\n"); - serialize_owned(m, buf); - Err(n + 1) - } - } - }; - - 'event: loop { - loop_time = Instant::now(); - active = false; - match rx.recv() { - Ok(Some(mut meas)) => { - n_rcvd += 1; - active = true; - - if n_rcvd % HB_EVERY == 0 { - let n_outstanding = n_out(&spares, &backlog, extras); - info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "n_rcvd" => n_rcvd, - "n_active_buf" => count, - "db_health" => %format_args!("{:?}", db_health.mean), - "backlog.len()" => backlog.len()); - } - - if meas.timestamp.is_none() { meas.timestamp = Some(now()) } - - if meas.fields.is_empty() { - meas.fields.push(("n", OwnedValue::Integer(1))); - } - - //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } } - - count = match next(count, &meas, &mut buf, loop_time, last) { - Ok(n) => n, - Err(_n) => { - let mut count = 0; - let mut next: String = match spares.pop_front() { - Some(x) => x, - - None => { - let n_outstanding = n_out(&spares, &backlog, extras); - crit!(logger, "no available buffers in `spares`, pulling from backlog"; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "n_rcvd" => n_rcvd, - "backlog.len()" => backlog.len()); - match backlog.pop_front() { - // Note: this does not clear the backlog buffer, - // instead we will just write more and more until - // we are out of memory. I expect that will never - // happen. - // - Some(x) => { - count = 1; // otherwise, no '\n' added in `next(..)` - we are - // sending a "full" buffer to be extended - x - } - - None => { - extras += 1; - crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "backlog.len()" => backlog.len(), - "n_rcvd" => n_rcvd, - "extras" => extras); - String::new() - } - } - } - }; - // after swap, buf in next, so want to send next - // - mem::swap(&mut buf, &mut next); - let n_outstanding = n_out(&spares, &backlog, extras); - send(next, &mut backlog, n_outstanding); - last = loop_time; - count - } - }; - } - - Ok(None) => { - let start = Instant::now(); - let mut hb = Instant::now(); - warn!(logger, "terminate signal rcvd"; "count" => count); - if buf.len() > 0 { - info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); - let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); - let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last); - let n_outstanding = n_out(&spares, &backlog, extras); - let mut placeholder = spares.pop_front().unwrap_or_else(String::new); - mem::swap(&mut buf, &mut placeholder); - send(placeholder, &mut backlog, n_outstanding); - } - let mut n_ok = 0; - let mut n_err = 0; - loop { - loop_time = Instant::now(); - let n_outstanding = n_out(&spares, &backlog, extras); - if backlog.is_empty() && n_outstanding < 1 { - info!(logger, "cleared any remaining backlog"; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "backlog.len()" => backlog.len(), - "n_cleared_ok" => n_ok, - "n_cleared_err" => n_err, - "n_rcvd" => n_rcvd, - "extras" => extras, - "elapsed" => %format_args!("{:?}", loop_time - start)); - break 'event - } - if loop_time - hb > Duration::from_secs(5) { - info!(logger, "InfluxWriter still clearing backlog .."; - "n_outstanding" => n_outstanding, - "spares.len()" => spares.len(), - "backlog.len()" => backlog.len(), - "n_cleared_ok" => n_ok, - "n_cleared_err" => n_err, - "extras" => extras, - "n_rcvd" => n_rcvd, - "elapsed" => %format_args!("{:?}", loop_time - start)); - hb = loop_time; - } - if let Some(buf) = backlog.pop_front() { - let n_outstanding = n_out(&spares, &backlog, extras); - debug!(logger, "resending queued buffer from backlog"; - "backlog.len()" => backlog.len(), - "spares.len()" => spares.len(), - "n_rcvd" => n_rcvd, - "n_outstanding" => n_outstanding); - send(buf, &mut backlog, n_outstanding); - last_clear = loop_time; - } - - 'rx: loop { - match http_rx.try_recv() { - Ok(Ok(Resp { buf, .. })) => { - n_ok += 1; - spares.push_back(buf); // needed so `n_outstanding` count remains accurate - } - Ok(Err(Resp { buf, .. })) => { - warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); - n_err += 1; - backlog.push_front(buf); - } - Err(chan::TryRecvError::Disconnected) => { - crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting"; - "n_outstanding" => n_outstanding, - "backlog.len()" => backlog.len(), - "n_cleared_ok" => n_ok, - "n_cleared_err" => n_err, - "extras" => extras, - "n_rcvd" => n_rcvd, - "elapsed" => %format_args!("{:?}", loop_time - start)); - break 'event - } - Err(_) => break 'rx - } - } - thread::sleep(Duration::from_millis(1)); - } - } - - _ => {} - } - - db_health.refresh(loop_time); - let n_outstanding = n_out(&spares, &backlog, extras); - let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200); - if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { - if let Some(queued) = backlog.pop_front() { - let n_outstanding = n_out(&spares, &backlog, extras); - send(queued, &mut backlog, n_outstanding); - active = true; - } - } - - loop { - match http_rx.try_recv() { - Ok(Ok(Resp { buf, took })) => { - db_health.add(loop_time, took); - spares.push_back(buf); - active = true; - } - - Ok(Err(Resp { buf, took })) => { - db_health.add(loop_time, took); - backlog.push_front(buf); - active = true; - } - - Err(chan::TryRecvError::Disconnected) => { - crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting"; - "n_outstanding" => n_outstanding, - "backlog.len()" => backlog.len(), - "n_rcvd" => n_rcvd, - "extras" => extras); - break 'event - } - - Err(_) => break - } - } - - if !active { - thread::sleep(Duration::new(0, 1)) - } - } - info!(logger, "waiting 1s before exiting thread"); - thread::sleep(Duration::from_secs(1)); - }).unwrap(); - - InfluxWriter { - host: host.to_string(), - db: db.to_string(), - tx, - thread: Some(Arc::new(thread)) - } - } -} - -impl Drop for InfluxWriter { - fn drop(&mut self) { - if let Some(arc) = self.thread.take() { - if let Ok(thread) = Arc::try_unwrap(arc) { - let _ = self.tx.send(None); - let _ = thread.join(); - } - } - } -} - -#[cfg(feature = "zmq")] -const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; - -#[cfg(feature = "zmq")] -pub fn pull(ctx: &zmq::Context) -> Result { - let socket = ctx.socket(zmq::PULL)?; - socket.bind(WRITER_ADDR)?; - socket.set_rcvhwm(0)?; - Ok(socket) -} - -#[cfg(feature = "zmq")] -pub fn push(ctx: &zmq::Context) -> Result { - let socket = ctx.socket(zmq::PUSH)?; - socket.connect(WRITER_ADDR)?; - socket.set_sndhwm(0)?; - Ok(socket) -} - -/// This removes offending things rather than escaping them. -/// -fn escape_tag(s: &str) -> String { - s.replace(" ", "") - .replace(",", "") - .replace("\"", "") -} - -fn escape(s: &str) -> String { - s.replace(" ", "\\ ") - .replace(",", "\\,") -} - -fn as_string(s: &str) -> String { - // the second replace removes double escapes - // - format!("\"{}\"", s.replace("\"", "\\\"") - .replace(r#"\\""#, r#"\""#)) -} - -#[test] -fn it_checks_as_string_does_not_double_escape() { - let raw = "this is \\\"an escaped string\\\" so it's problematic"; - let escaped = as_string(&raw); - assert_eq!(escaped, format!("\"{}\"", raw).as_ref()); -} - -fn as_boolean(b: &bool) -> &str { - if *b { "t" } else { "f" } -} - -pub fn now() -> i64 { - nanos(Utc::now()) as i64 -} - -/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. -/// -/// The serialized measurement is appended to the end of the string without -/// any regard for what exited in it previously. -/// -pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { - line.push_str(&escape_tag(measurement.key)); - - let add_tag = |line: &mut String, key: &str, value: &str| { - line.push_str(","); - line.push_str(&escape_tag(key)); - line.push_str("="); - line.push_str(&escape(value)); - }; - - for (key, value) in measurement.tags.iter() { - #[cfg(not(feature = "string-tags"))] - add_tag(line, key, value); - - #[cfg(feature = "string-tags")] - add_tag(line, key, value.as_str()); - } - - let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| { - if is_first { line.push_str(" "); } else { line.push_str(","); } - line.push_str(&escape_tag(key)); - line.push_str("="); - match *value { - OwnedValue::String(ref s) => line.push_str(&as_string(s)), - OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)), - OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)), - - OwnedValue::D128(ref d) => { - if d.is_finite() { - line.push_str(&format!("{}", d)); - } else { - line.push_str("0.0"); - } - } - - OwnedValue::Float(ref f) => { - if f.is_finite() { - line.push_str(&format!("{}", f)); - } else { - line.push_str("-999.0"); - } - } - - OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)), - }; - }; - - let mut fields = measurement.fields.iter(); - - // first time separate from tags with space - // - fields.next().map(|kv| { - add_field(line, &kv.0, &kv.1, true); - }); - - // then seperate the rest w/ comma - // - for kv in fields { - add_field(line, kv.0, &kv.1, false); - } - - if let Some(t) = measurement.timestamp { - line.push_str(" "); - line.push_str(&t.to_string()); - } -} - -#[cfg(feature = "warnings")] -#[deprecated(since="0.4", note="Replace with InfluxWriter")] -#[cfg(feature = "zmq")] -pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { - assert!(false); - thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || { - const DB_HOST: &'static str = "http://127.0.0.1:8086/write"; - let _ = fs::create_dir("/tmp/mm"); - let ctx = zmq::Context::new(); - let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); - let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); - let client = Client::new(); - let mut buf = String::with_capacity(4096); - let mut server_resp = String::with_capacity(4096); - let mut count = 0; - loop { - if let Ok(bytes) = socket.recv_bytes(0) { - if let Ok(msg) = String::from_utf8(bytes) { - count = match count { - 0 => { - buf.push_str(&msg); - 1 - } - n @ 1...40 => { - buf.push_str("\n"); - buf.push_str(&msg); - n + 1 - } - _ => { - buf.push_str("\n"); - buf.push_str(&msg); - match client.post(url.clone()) - .body(&buf) - .send() { - - Ok(Response { status, .. }) if status == StatusCode::NoContent => {} - - Ok(mut resp) => { - let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); - let _ = warnings.send( - Warning::Error( - format!("Influx server: {}", server_resp))); - server_resp.clear(); - } - - Err(why) => { - let _ = warnings.send( - Warning::Error( - format!("Influx write error: {}", why))); - } - } - buf.clear(); - 0 - } - } - } - } - } - }).unwrap() -} - -#[derive(Debug, Clone, PartialEq)] -pub enum OwnedValue { - String(String), - Float(f64), - Integer(i64), - Boolean(bool), - D128(d128), - Uuid(Uuid), -} - -/// Holds data meant for an influxdb measurement in transit to the -/// writing thread. -/// -/// TODO: convert `Map` to `SmallVec`? -/// -#[derive(Clone, Debug)] -pub struct OwnedMeasurement { - pub key: &'static str, - pub timestamp: Option, - //pub fields: Map<&'static str, OwnedValue>, - //pub tags: Map<&'static str, &'static str>, - pub fields: SmallVec<[(&'static str, OwnedValue); 8]>, - #[cfg(not(feature = "string-tags"))] - pub tags: SmallVec<[(&'static str, &'static str); 8]>, - #[cfg(feature = "string-tags")] - pub tags: SmallVec<[(&'static str, String); 8]>, -} - -impl OwnedMeasurement { - pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self { - OwnedMeasurement { - key, - timestamp: None, - tags: SmallVec::with_capacity(n_tags), - fields: SmallVec::with_capacity(n_fields), - } - } - - pub fn new(key: &'static str) -> Self { - OwnedMeasurement { - key, - timestamp: None, - tags: SmallVec::new(), - fields: SmallVec::new(), - } - } - - /// Unusual consuming `self` signature because primarily used by - /// the `measure!` macro. - #[cfg(not(feature = "string-tags"))] - pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { - self.tags.push((key, value)); - self - } - - #[cfg(feature = "string-tags")] - pub fn add_tag(mut self, key: &'static str, value: S) -> Self { - self.tags.push((key, value.to_string())); - self - } - - /// Unusual consuming `self` signature because primarily used by - /// the `measure!` macro. - pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { - self.fields.push((key, value)); - self - } - - pub fn set_timestamp(mut self, timestamp: i64) -> Self { - self.timestamp = Some(timestamp); - self - } - - #[cfg(not(feature = "string-tags"))] - pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self { - match self.tags.iter().position(|kv| kv.0 == key) { - Some(i) => { - self.tags.get_mut(i) - .map(|x| { - x.0 = value; - }); - self - } - - None => { - self.add_tag(key, value) - } - } - } - - pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> { - self.fields.iter() - .find(|kv| kv.0 == key) - .map(|kv| &kv.1) - } - - #[cfg(not(feature = "string-tags"))] - pub fn get_tag(&self, key: &'static str) -> Option<&'static str> { - self.tags.iter() - .find(|kv| kv.0 == key) - .map(|kv| kv.1) - } -} - -#[allow(unused_imports, unused_variables)] -#[cfg(test)] -mod tests { - use super::*; - use test::{black_box, Bencher}; - - #[ignore] - #[bench] - fn measure_ten(b: &mut Bencher) { - let influx = InfluxWriter::new("localhost", "test", "log/influx.log", 8192); - let mut n = 0; - b.iter(|| { - for _ in 0..10 { - let time = influx.nanos(Utc::now()); - n += 1; - measure!(influx, million, i(n), tm(time)); - } - }); - } - - #[test] - fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() { - const VERSION: &str = "0.3.90"; - let tag_value = "one"; - let color = "red"; - let time = Utc::now(); - let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time)); - assert_eq!(m.get_tag("color"), Some("red")); - assert_eq!(m.get_tag("version"), Some(VERSION)); - assert_eq!(m.timestamp, Some(nanos(time) as i64)); - } - - #[test] - fn it_uses_the_v_for_version_shortcut() { - const VERSION: &str = "0.3.90"; - let tag_value = "one"; - let color = "red"; - let time = now(); - let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time)); - assert_eq!(m.get_tag("color"), Some("red")); - assert_eq!(m.get_tag("version"), Some(VERSION)); - assert_eq!(m.timestamp, Some(time)); - } - - #[test] - fn it_uses_the_new_tag_k_only_shortcut() { - let tag_value = "one"; - let color = "red"; - let time = now(); - let m = measure!(@make_meas test, t(color), t(tag_value), tm(time)); - assert_eq!(m.get_tag("color"), Some("red")); - assert_eq!(m.get_tag("tag_value"), Some("one")); - assert_eq!(m.timestamp, Some(time)); - } - - #[test] - fn it_uses_measure_macro_parenthesis_syntax() { - let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1)); - assert_eq!(m.key, "test"); - assert_eq!(m.get_tag("a"), Some("b")); - assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1))); - assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1))); - assert_eq!(m.timestamp, Some(1)); - } - - #[test] - fn it_uses_measure_macro_on_a_self_attribute() { - struct A { - pub influx: InfluxWriter, - } - - impl A { - fn f(&self) { - measure!(self.influx, test, t(color, "red"), i(n, 1)); - } - } - - let a = A { influx: InfluxWriter::default() }; - - a.f(); - } - - #[test] - fn it_clones_an_influx_writer_to_check_both_drop() { - let influx = InfluxWriter::default(); - measure!(influx, drop_test, i(a, 1), i(b, 2)); - { - let influx = influx.clone(); - thread::spawn(move || { - measure!(influx, drop_test, i(a, 3), i(b, 4)); - }); - } - } - - #[bench] - fn influx_writer_send_basic(b: &mut Bencher) { - let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); - b.iter(|| { - measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]); - }); - } - - #[bench] - fn influx_writer_send_price(b: &mut Bencher) { - let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); - b.iter(|| { - measure!(m, test, - t(ticker, t!(xmr-btc).as_str()), - t(exchange, "plnx"), - d(bid, d128::zero()), - d(ask, d128::zero()), - ); - }); - } - - #[test] - fn it_checks_color_tag_error_in_non_doctest() { - let (tx, rx) = bounded(1024); - measure!(tx, test, tag[color;"red"], int[n;1]); - let meas: OwnedMeasurement = rx.recv().unwrap(); - assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas); - } - - #[test] - fn it_uses_the_make_meas_pattern_of_the_measure_macro() { - let meas = measure!(@make_meas test_measurement, - tag [ one => "a" ], - tag [ two => "b" ], - int [ three => 2 ], - float [ four => 1.2345 ], - string [ five => String::from("d") ], - bool [ six => true ], - int [ seven => { 1 + 2 } ], - time [ 1 ] - ); - assert_eq!(meas.key, "test_measurement"); - assert_eq!(meas.get_tag("one"), Some("a")); - assert_eq!(meas.get_tag("two"), Some("b")); - assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2))); - assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3))); - assert_eq!(meas.timestamp, Some(1)); - } - - #[test] - fn it_uses_the_measure_macro() { - let (tx, rx) = bounded(1024); - measure!(tx, test_measurement, - tag [ one => "a" ], - tag [ two => "b" ], - int [ three => 2 ], - float [ four => 1.2345 ], - string [ five => String::from("d") ], - bool [ six => true ], - int [ seven => { 1 + 2 } ], - time [ 1 ] - ); - thread::sleep(Duration::from_millis(10)); - let meas: OwnedMeasurement = rx.try_recv().unwrap(); - assert_eq!(meas.key, "test_measurement"); - assert_eq!(meas.get_tag("one"), Some("a")); - assert_eq!(meas.get_tag("two"), Some("b")); - assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2))); - assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3))); - assert_eq!(meas.timestamp, Some(1)); - } - - #[test] - fn it_uses_measure_macro_for_d128_and_uuid() { - - let (tx, rx) = bounded(1024); - let u = Uuid::new_v4(); - let d = d128::zero(); - let t = now(); - measure!(tx, test_measurement, - tag[one; "a"], - d128[two; d], - uuid[three; u], - time[t] - ); - - thread::sleep(Duration::from_millis(10)); - let meas: OwnedMeasurement = rx.try_recv().unwrap(); - assert_eq!(meas.key, "test_measurement"); - assert_eq!(meas.get_tag("one"), Some("a")); - assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero()))); - assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u))); - assert_eq!(meas.timestamp, Some(t)); - } - - #[test] - fn it_uses_the_measure_macro_alt_syntax() { - - let (tx, rx) = bounded(1024); - measure!(tx, test_measurement, - tag[one; "a"], - tag[two; "b"], - int[three; 2], - float[four; 1.2345], - string[five; String::from("d")], - bool [ six => true ], - int[seven; { 1 + 2 }], - time[1] - ); - - thread::sleep(Duration::from_millis(10)); - let meas: OwnedMeasurement = rx.try_recv().unwrap(); - assert_eq!(meas.key, "test_measurement"); - assert_eq!(meas.get_tag("one"), Some("a")); - assert_eq!(meas.get_tag("two"), Some("b")); - assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2))); - assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3))); - assert_eq!(meas.timestamp, Some(1)); - } - - #[test] - fn it_checks_that_fields_are_separated_correctly() { - let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]); - assert_eq!(m.key, "test"); - assert_eq!(m.get_tag("a"), Some("one")); - assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1))); - - let mut buf = String::new(); - serialize_owned(&m, &mut buf); - assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf); - } - - #[test] - fn try_to_break_measure_macro() { - let (tx, _) = bounded(1024); - measure!(tx, one, tag[x=>"y"], int[n;1]); - measure!(tx, one, tag[x;"y"], int[n;1],); - - struct A { - pub one: i32, - pub two: i32, - } - - struct B { - pub a: A - } - - let b = B { a: A { one: 1, two: 2 } }; - - let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one)); - - assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1))); - } - - #[bench] - fn measure_macro_small(b: &mut Bencher) { - let (tx, rx) = bounded(1024); - let listener = thread::spawn(move || { - loop { if rx.recv().is_err() { break } } - }); - b.iter(|| { - measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]); - }); - } - - #[bench] - fn measure_macro_medium(b: &mut Bencher) { - let (tx, rx) = bounded(1024); - let listener = thread::spawn(move || { - loop { if rx.recv().is_err() { break } } - }); - b.iter(|| { - measure!(tx, test, t(color, "red"), t(mood, "playful"), - t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322), - i(n, 1), tm(now())); - }); - } - - #[bench] - fn serialize_owned_longer(b: &mut Bencher) { - let mut buf = String::with_capacity(1024); - let m = - OwnedMeasurement::new("test") - .add_tag("one", "a") - .add_tag("two", "b") - .add_tag("ticker", "xmr_btc") - .add_tag("exchange", "plnx") - .add_tag("side", "bid") - .add_field("three", OwnedValue::Float(1.2345)) - .add_field("four", OwnedValue::Integer(57)) - .add_field("five", OwnedValue::Boolean(true)) - .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz"))) - .set_timestamp(now()); - b.iter(|| { - serialize_owned(&m, &mut buf); - buf.clear() - }); - } - - #[bench] - fn serialize_owned_simple(b: &mut Bencher) { - let mut buf = String::with_capacity(1024); - let m = - OwnedMeasurement::new("test") - .add_tag("one", "a") - .add_tag("two", "b") - .add_field("three", OwnedValue::Float(1.2345)) - .add_field("four", OwnedValue::Integer(57)) - .set_timestamp(now()); - b.iter(|| { - serialize_owned(&m, &mut buf); - buf.clear() - }); - } - - #[bench] - fn clone_url_for_thread(b: &mut Bencher) { - let host = "ahmes"; - let db = "mlp"; - let url = - Url::parse_with_params(&format!("http://{}:8086/write", host), - &[("db", db), ("precision", "ns")]).unwrap(); - b.iter(|| { - url.clone() - }) - } - - #[bench] - fn clone_arc_url_for_thread(b: &mut Bencher) { - let host = "ahmes"; - let db = "mlp"; - let url = - Url::parse_with_params(&format!("http://{}:8086/write", host), - &[("db", db), ("precision", "ns")]).unwrap(); - let url = Arc::new(url); - b.iter(|| { - Arc::clone(&url) - }) - } - - #[test] - fn it_serializes_a_hard_to_serialize_message_from_owned() { - let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; - let mut buf = String::new(); - let mut server_resp = String::new(); - let m = OwnedMeasurement::new("rust_test") - .add_field("s", OwnedValue::String(raw.to_string())) - .set_timestamp(now()); - serialize_owned(&m, &mut buf); - println!("{}", buf); - buf.push_str("\n"); - let buf_copy = buf.clone(); - buf.push_str(&buf_copy); - println!("{}", buf); - - let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); - let client = Client::new(); - match client.post(url.clone()) - .body(&buf) - .send() { - - Ok(Response { status, .. }) if status == StatusCode::NoContent => {} - - Ok(mut resp) => { - resp.read_to_string(&mut server_resp).unwrap(); - panic!("{}", server_resp); - } - - Err(why) => { - panic!(why) - } - } - } -} diff --git a/src/latency.rs b/src/latency.rs deleted file mode 100644 index 9e31a28..0000000 --- a/src/latency.rs +++ /dev/null @@ -1,774 +0,0 @@ -#![allow(unused)] -use std::thread::{self, JoinHandle}; -use std::sync::mpsc::{Sender, channel}; -use std::fmt; -use std::time::{Instant, Duration}; - -use chrono::{self, DateTime, Utc}; -use pub_sub::PubSub; -use sloggers::types::Severity; - -//use windows::{DurationWindow, Incremental, Window}; -use money::{Ticker, Side, Exchange}; - -use super::file_logger; -use influx::{self, OwnedMeasurement, OwnedValue}; - -use self::windows::Incremental; - -pub type Nanos = u64; - -pub const SECOND: u64 = 1e9 as u64; -pub const MINUTE: u64 = SECOND * 60; -pub const HOUR: u64 = MINUTE * 60; -pub const MILLISECOND: u64 = SECOND / 1000; -pub const MICROSECOND: u64 = MILLISECOND / 1000; - -pub fn nanos(d: Duration) -> Nanos { - d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) -} - -pub fn dt_nanos(t: DateTime) -> i64 { - (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64) -} - -pub fn now() -> i64 { dt_nanos(Utc::now()) } - -pub fn tfmt(ns: Nanos) -> String { - match ns { - t if t <= MICROSECOND => { - format!("{}ns", t) - } - - t if t > MICROSECOND && t < MILLISECOND => { - format!("{}u", t / MICROSECOND) - } - t if t > MILLISECOND && t < SECOND => { - format!("{}ms", t / MILLISECOND) - } - - t => { - format!("{}.{}sec", t / SECOND, t / MILLISECOND) - } - } -} - -pub fn tfmt_dur(d: Duration) -> String { - tfmt(nanos(d)) -} - -pub fn tfmt_dt(dt: DateTime) -> String { - Utc::now().signed_duration_since(dt) - .to_std() - .map(|dur| { - tfmt_dur(dur) - }).unwrap_or("?".into()) -} - - -pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) -> fmt::Result { - match ns { - t if t <= MICROSECOND => { - write!(f, "{}ns", t) - } - - t if t > MICROSECOND && t < MILLISECOND => { - write!(f, "{}u", t / MICROSECOND) - } - - t if t > MILLISECOND && t < SECOND => { - write!(f, "{}ms", t / MILLISECOND) - } - - t => { - write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND) - } - } -} - -#[doc(hide)] -mod windows { - use super::*; - use std::ops::{Div, Mul, Sub, SubAssign, AddAssign}; - use std::collections::VecDeque; - use num::Float; - - const INITIAL_CAPACITY: usize = 1000; - - #[derive(Clone, Debug)] - pub struct Point - //where T: Default - { - time: Instant, - value: T - } - - #[derive(Debug, Clone)] - pub struct Window - where T: Default - { - pub size: Duration, // window size - mean: T, - ps: T, - psa: T, - var: T, - sum: T, - count: u32, - items: VecDeque>, - } - - #[derive(Default)] - pub struct DurationWindow { - pub size: Duration, - mean: Duration, - sum: Duration, - count: u32, - items: VecDeque> - } - - impl Point - where T: Default + Copy - { - fn new(time: Instant, value: T) -> Self { - Point { time, value } - } - - fn value(&self) -> T { - self.value - } - } - - impl Window - where T: Default + Zero - { - pub fn new(size: Duration) -> Self { - Window { - size, - mean: T::default(), - psa: T::default(), - ps: T::default(), - sum: T::default(), - count: 0, - var: T::default(), - items: VecDeque::with_capacity(INITIAL_CAPACITY), - } - } - - pub fn with_size_and_capacity(size: Duration, capacity: usize) -> Self { - Window { - size, - mean: T::default(), - psa: T::default(), - ps: T::default(), - sum: T::default(), - count: 0, - var: T::default(), - items: VecDeque::with_capacity(capacity), - } - } - } - - impl From for Window - where T: Default + Zero - { - fn from(size: Duration) -> Self { - Window::new(size) - } - } - - impl From for DurationWindow { - fn from(size: Duration) -> Self { - DurationWindow::new(size) - } - } - - pub trait Incremental { - /// Purge expired items. - /// - #[inline] - fn refresh(&mut self, t: Instant) -> &Self; - - /// Add a new item. - /// - #[inline] - fn add(&mut self, time: Instant, value: T); - - /// Add a new item and purge expired items. - /// - #[inline] - fn update(&mut self, time: Instant, value: T) { - self.refresh(time); - self.add(time, value); - } - } - - pub trait Zero { - fn zero() -> Self; - } - - pub trait One { - fn one() -> Self; - } - - macro_rules! zero { - ($t:ty, $body:expr) => { - - impl Zero for $t { - fn zero() -> $t { $body } - } - } - } - - macro_rules! one { - ($t:ty, $body:expr) => { - - impl One for $t { - fn one() -> $t { $body } - } - } - } - - zero!(f64, 0.0); - zero!(f32, 0.0); - zero!(u128, 0); - zero!(i128, 0); - zero!(u64, 0); - zero!(i64, 0); - zero!(i32, 0); - zero!(u32, 0); - zero!(u16, 0); - one!(f64, 1.0); - one!(f32, 1.0); - one!(u128, 1); - one!(i128, 1); - one!(u64, 1); - one!(i64, 1); - one!(i32, 1); - one!(u32, 1); - one!(u16, 1); - - impl Incremental for Window - where T: Default + AddAssign + SubAssign + From + Div + - Mul + Sub + Copy - { - #[inline] - fn refresh(&mut self, t: Instant) -> &Self { - if !self.items.is_empty() { - let (n_remove, sum, ps, count) = - self.items.iter() - .take_while(|x| t - x.time > self.size) - .fold((0, self.sum, self.ps, self.count), |(n_remove, sum, ps, count), x| { - (n_remove + 1, sum - x.value, ps - x.value * x.value, count - 1) - }); - self.sum = sum; - self.ps = ps; - self.count = count; - for _ in 0..n_remove { - self.items.pop_front(); - } - } - - if self.count > 0 { - self.mean = self.sum / self.count.into(); - self.psa = self.ps / self.count.into(); - let c: T = self.count.into(); - self.var = (self.psa * c - c * self.mean * self.mean) / c; - } - self - } - - /// Creates `Point { time, value }` and pushes to `self.items`. - /// - #[inline] - fn add(&mut self, time: Instant, value: T) { - let p = Point::new(time, value); - self.sum += p.value; - self.ps += p.value * p.value; - self.count += 1; - self.items.push_back(p); - } - - #[inline] - fn update(&mut self, time: Instant, value: T) { - self.add(time, value); - self.refresh(time); - } - } - - impl Incremental for DurationWindow { - #[inline] - fn refresh(&mut self, t: Instant) -> &Self { - if !self.items.is_empty() { - let (n_remove, sum, count) = - self.items.iter() - .take_while(|x| t - x.time > self.size) - .fold((0, self.sum, self.count), |(n_remove, sum, count), x| { - (n_remove + 1, sum - x.value, count - 1) - }); - self.sum = sum; - self.count = count; - for _ in 0..n_remove { - self.items.pop_front(); - } - } - - if self.count > 0 { - self.mean = self.sum / self.count.into(); - } - - self - } - - #[inline] - fn add(&mut self, time: Instant, value: Duration) { - let p = Point::new(time, value); - self.sum += p.value; - self.count += 1; - self.items.push_back(p); - } - } - - - impl Window - where T: Default + Copy - { - pub fn mean(&self) -> T { self.mean } - pub fn var(&self) -> T { self.var } - pub fn psa(&self) -> T { self.psa } - pub fn ps(&self) -> T { self.ps } - pub fn count(&self) -> u32 { self.count } - pub fn len(&self) -> usize { self.items.len() } - pub fn is_empty(&self) -> bool { self.items.is_empty() } - - /// Returns the `Duration` between `t` and the first `Point` in `self.items`. - /// - /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. - /// - /// # Panics - /// - /// This function will panic if `t` is earlier than the first `Point`'s `Instant`. - /// - #[inline] - pub fn elapsed(&self, t: Instant) -> Duration { - self.items.front() - .map(|p| { - t - p.time - }).unwrap_or_else(|| Duration::new(0, 0)) - } - } - - impl Window - where T: Float + Default - { - #[inline] - pub fn std(&self) -> T { self.var.sqrt() } - } - - impl DurationWindow { - pub fn new(size: Duration) -> Self { DurationWindow { size, ..Default::default() } } - pub fn mean(&self) -> Duration { self.mean } - pub fn count(&self) -> u32 { self.count } - pub fn len(&self) -> usize { self.items.len() } - pub fn is_empty(&self) -> bool { self.items.is_empty() } - - #[inline] - pub fn nanos(d: Duration) -> u64 { d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) } - - #[inline] - pub fn micros(d: Duration) -> Option { - if d <= Duration::new(4_294, 967_295_000) { - Some((d.as_secs() * 1_000_000) as u32 + d.subsec_nanos() / 1_000u32) - } else { - None - } - } - - #[inline] - pub fn mean_nanos(&self) -> u64 { DurationWindow::nanos(self.mean()) } - - #[inline] - pub fn max(&self) -> Option { - self.items.iter() - .map(|p| p.value) - .max() - } - - #[inline] - pub fn max_nanos(&self) -> Option { - self.max() - .map(|x| DurationWindow::nanos(x)) - } - - #[inline] - pub fn first(&self) -> Option { - self.items - .front() - .map(|pt| pt.value()) - } - - /// Returns the `Duration` between `t` and the first `Point` in `self.items`. - /// - /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. - /// - /// # Panics - /// - /// This function will panic if `t` is earlier than the first `Point`'s `Instant`. - /// - #[inline] - pub fn elapsed(&self, t: Instant) -> Duration { - self.items.front() - .map(|p| { - t - p.time - }).unwrap_or_else(|| Duration::new(0, 0)) - } - } -} - -#[derive(Debug)] -pub enum Latency { - Ws(Exchange, Ticker, Duration), - Http(Exchange, Duration), - Trade(Exchange, Ticker, Duration), - Terminate -} - -#[derive(Debug)] -pub enum ExperiencedLatency { - - GdaxWebsocket(Duration), - GdaxHttpPublic(Duration), - GdaxHttpPrivate(Duration), - PlnxHttpPublic(Duration), - PlnxHttpPrivate(Duration), - PlnxOrderBook(Duration), - KrknHttpPublic(Duration), - KrknHttpPrivate(Duration), - KrknTrade(Duration, &'static str, Option, Option), - PlnxWs(Ticker), - - Terminate -} - -#[derive(Debug, Clone)] -pub struct Update { - pub gdax_ws: Nanos, - pub gdax_trade: Nanos, - pub gdax_last: DateTime -} - -impl Default for Update { - fn default() -> Self { - Update { - gdax_ws: 0, - gdax_trade: 0, - gdax_last: Utc::now(), - } - } -} - -#[derive(Debug, Clone)] -pub struct LatencyUpdate { - pub gdax_ws: Nanos, - pub krkn_pub: Nanos, - pub krkn_priv: Nanos, - pub plnx_pub: Nanos, - pub plnx_priv: Nanos, - pub plnx_order: Nanos, - pub krkn_trade_30_mean: Nanos, - pub krkn_trade_30_max: Nanos, - - pub krkn_trade_300_mean: Nanos, - pub krkn_trade_300_max: Nanos, - - pub plnx_last: DateTime, - pub krkn_last: DateTime, - - pub plnx_ws_count: u64, -} - -impl Default for LatencyUpdate { - fn default() -> Self { - LatencyUpdate { - gdax_ws : 0, - krkn_pub : 0, - krkn_priv : 0, - plnx_pub : 0, - plnx_priv : 0, - plnx_order : 0, - krkn_trade_30_mean : 0, - krkn_trade_30_max : 0, - krkn_trade_300_mean : 0, - krkn_trade_300_max : 0, - plnx_ws_count : 0, - - plnx_last : Utc::now(), - krkn_last : Utc::now(), - } - } -} - -pub struct Manager { - pub tx: Sender, - pub channel: PubSub, - thread: Option>, -} - -pub struct LatencyManager { - pub tx: Sender, - pub channel: PubSub, - thread: Option>, -} - -/// returns a DateTime equal to now - `dur` -/// -pub fn dt_from_dur(dur: Duration) -> DateTime { - let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64); - Utc::now() - old_dur -} - -struct Last { - broadcast: Instant, - plnx: Instant, - krkn: Instant, - gdax: Instant, -} - -impl Default for Last { - fn default() -> Self { - Last { - broadcast: Instant::now(), - plnx: Instant::now(), - krkn: Instant::now(), - gdax: Instant::now(), - } - } -} - -impl Manager { - pub fn new(window: Duration, - log_path: &'static str, - measurements: Sender) -> Self { - - let (tx, rx) = channel(); - let channel = PubSub::new(); - let channel_copy = channel.clone(); - let logger = file_logger(log_path, Severity::Info); - - info!(logger, "initializing"); - - let mut gdax_ws = windows::DurationWindow::new(window); - let mut gdax_trade = windows::DurationWindow::new(window); - - let mut last = Last::default(); - - info!(logger, "entering loop"); - - let thread = Some(thread::spawn(move || { - loop { - - let loop_time = Instant::now(); - - if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1)) { - debug!(logger, "rcvd {:?}", msg); - - match msg { - Latency::Ws(_, _, dur) => { - gdax_ws.update(loop_time, dur); - last.gdax = loop_time; - } - - Latency::Trade(_, ticker, dur) => { - gdax_trade.update(loop_time, dur); - last.gdax = loop_time; - let nanos = windows::DurationWindow::nanos(dur); - measurements.send( - OwnedMeasurement::new("gdax_trade_api") - .add_tag("ticker", ticker.as_str()) - .add_field("nanos", OwnedValue::Integer(nanos as i64)) - .set_timestamp(influx::now())).unwrap(); - } - - Latency::Terminate => break, - - _ => {} - } - } - - if loop_time - last.broadcast > Duration::from_millis(100) { - debug!(logger, "initalizing broadcast"); - - let update = Update { - gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(), - gdax_trade: gdax_trade.refresh(loop_time).mean_nanos(), - gdax_last: dt_from_dur(loop_time - last.gdax) - }; - channel.send(update).unwrap(); - last.broadcast = loop_time; - debug!(logger, "sent broadcast"); - } - - } - debug!(logger, "latency manager terminating"); - })); - - Manager { - tx, - channel: channel_copy, - thread, - } - } -} - -impl Drop for LatencyManager { - fn drop(&mut self) { - for _ in 0..100 { self.tx.send(ExperiencedLatency::Terminate).unwrap(); } - if let Some(thread) = self.thread.take() { - let _ = thread.join(); - } - } -} - -impl Drop for Manager { - fn drop(&mut self) { - for _ in 0..100 { self.tx.send(Latency::Terminate).unwrap(); } - if let Some(thread) = self.thread.take() { - let _ = thread.join(); - } - } -} - -impl LatencyManager { - pub fn new(d: Duration) -> Self { - let (tx, rx) = channel(); - let tx_copy = tx.clone(); - let channel = PubSub::new(); - let channel_copy = channel.clone(); - //let w = w.clone(); - - let thread = Some(thread::spawn(move || { - let logger = file_logger("var/log/latency-manager.log", Severity::Info); - - info!(logger, "initializing DurationWindows"); - let mut gdax_ws = windows::DurationWindow::new(d); - let mut gdax_priv = windows::DurationWindow::new(d); - let mut krkn_pub = windows::DurationWindow::new(d); - let mut krkn_priv = windows::DurationWindow::new(d); - let mut plnx_pub = windows::DurationWindow::new(d); - let mut plnx_priv = windows::DurationWindow::new(d); - let mut plnx_order = windows::DurationWindow::new(d); - let mut plnx_ws_count: windows::Window = windows::Window::new(d); - - // yes I am intentionally breaking from the hard-typed duration - // window ... that was a stupid idea - // - let mut krkn_trade_30 = windows::DurationWindow::new(Duration::from_secs(30)); - let mut krkn_trade_300 = windows::DurationWindow::new(Duration::from_secs(300)); - - let mut last = Last::default(); - - thread::sleep(Duration::from_millis(1)); - - info!(logger, "entering loop"); - loop { - let loop_time = Instant::now(); - - if let Ok(msg) = rx.recv() { - debug!(logger, "new msg: {:?}", msg); - - match msg { - ExperiencedLatency::Terminate => { - crit!(logger, "terminating"); - break; - } - - ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d), - - ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d), - - ExperiencedLatency::KrknHttpPublic(d) => { - last.krkn = loop_time; - krkn_pub.update(loop_time, d) - } - - ExperiencedLatency::KrknHttpPrivate(d) => { - last.krkn = loop_time; - krkn_priv.update(loop_time, d) - } - - ExperiencedLatency::PlnxHttpPublic(d) => { - last.plnx = loop_time; - plnx_pub.update(loop_time, d) - } - - ExperiencedLatency::PlnxHttpPrivate(d) => { - last.plnx = loop_time; - plnx_priv.update(loop_time, d) - } - - ExperiencedLatency::PlnxOrderBook(d) => { - last.plnx = loop_time; - plnx_order.update(loop_time, d) - } - - ExperiencedLatency::PlnxWs(_) => { - last.plnx = loop_time; - plnx_ws_count.update(loop_time, 1_u32); - } - - ExperiencedLatency::KrknTrade(d, cmd, _, _) => { - debug!(logger, "new KrknTrade"; - "cmd" => cmd); - last.krkn = loop_time; - krkn_trade_30.update(loop_time, d); - krkn_trade_300.update(loop_time, d); - } - - other => { - warn!(logger, "unexpected msg: {:?}", other); - } - } - } - - if loop_time - last.broadcast > Duration::from_millis(100) { - debug!(logger, "initalizing broadcast"); - // note - because we mutated the Window instances - // above, we need a fresh Instant to avoid less than other - // panic - // - krkn_trade_30.refresh(loop_time); - krkn_trade_300.refresh(loop_time); - let update = LatencyUpdate { - gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(), - krkn_pub: krkn_pub.refresh(loop_time).mean_nanos(), - krkn_priv: krkn_priv.refresh(loop_time).mean_nanos(), - plnx_pub: plnx_pub.refresh(loop_time).mean_nanos(), - plnx_priv: plnx_priv.refresh(loop_time).mean_nanos(), - plnx_order: plnx_order.refresh(loop_time).mean_nanos(), - - krkn_trade_30_mean: krkn_trade_30.mean_nanos(), - krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0), - - krkn_trade_300_mean: krkn_trade_300.mean_nanos(), - krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0), - - plnx_last: dt_from_dur(loop_time - last.plnx), - krkn_last: dt_from_dur(loop_time - last.krkn), - - plnx_ws_count: plnx_ws_count.refresh(loop_time).count() as u64, - - }; - channel.send(update).unwrap(); - last.broadcast = loop_time; - debug!(logger, "sent broadcast"); - } - } - crit!(logger, "goodbye"); - })); - - LatencyManager { - tx: tx_copy, - channel: channel_copy, - thread - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 83868f8..1f7167a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,50 +1,187 @@ -//! Tools to record and display what's happening in your program +//! Utilities to efficiently send data to influx //! -#![feature(test)] +#![cfg_attr(all(test, feature = "unstable"), feature(test))] +#![cfg(all(test, feature = "unstable"))] +extern crate test; #[macro_use] extern crate slog; -#[allow(unused_imports)] -#[macro_use] -extern crate money; -#[cfg(test)] -extern crate test; -#[cfg(feature = "zmq")] -extern crate zmq; -#[cfg(feature = "latency")] -extern crate pubsub as pub_sub; - -use std::time::Duration; -use chrono::{DateTime, Utc, TimeZone as ChronoTZ}; -#[allow(unused_imports)] -use sloggers::Build; -#[allow(unused_imports)] -pub use sloggers::types::Severity; -use sloggers::types::TimeZone; -#[allow(unused_imports)] -use sloggers::file::FileLoggerBuilder; + +use std::io::Read; +use std::sync::Arc; +use crossbeam_channel::{Sender, Receiver, bounded, SendError}; +use std::{thread, mem}; +use std::time::*; +use std::collections::VecDeque; +use hyper::status::StatusCode; +use hyper::client::response::Response; +use hyper::Url; +use hyper::client::Client; use slog::Drain; +use chrono::prelude::*; +use decimal::d128; use uuid::Uuid; +use smallvec::SmallVec; +use slog::Logger; +use pretty_toa::ThousandsSep; + +/// Created this so I know what types can be passed through the +/// `measure!` macro, which used to convert with `as i64` and +/// `as f64` until I accidentally passed a function name, and it +/// still compiled, but with garbage numbers. +pub trait AsI64 { + fn as_i64(x: Self) -> i64; +} + +impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } } +impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } } +impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } } + +/// Created this so I know what types can be passed through the +/// `measure!` macro, which used to convert with `as i64` and +/// `as f64` until I accidentally passed a function name, and it +/// still compiled, but with garbage numbers. +pub trait AsF64 { + fn as_f64(x: Self) -> f64; +} + +impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } } +impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } } +impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } } +impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } } +impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } } +impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } } +impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } } + +/// Provides flexible and ergonomic use of `Sender`. +/// +/// The macro both creates an `OwnedMeasurement` from the supplied tags and +/// values, as well as sends it with the `Sender`. +/// +/// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized +/// measurement (see `tests` mod). +/// +/// # Examples +/// +/// ``` +/// #![feature(try_from)] +/// #[macro_use] extern crate influx_writer; +/// extern crate decimal; +/// +/// use std::sync::mpsc::channel; +/// use decimal::d128; +/// +/// fn main() { +/// let (tx, rx) = crossbeam_channel::bounded(1024); +/// +/// // "shorthand" syntax +/// +/// measure!(tx, test, tag[color;"red"], int[n;1]); +/// +/// let meas: OwnedMeasurement = rx.recv().unwrap(); +/// +/// assert_eq!(meas.key, "test"); +/// assert_eq!(meas.get_tag("color"), Some("red")); +/// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1))); +/// +/// // alternate syntax ... +/// +/// measure!(tx, test, +/// tag [ one => "a" ], +/// tag [ two => "b" ], +/// int [ three => 2 ], +/// float [ four => 1.2345 ], +/// string [ five => String::from("d") ], +/// bool [ six => true ], +/// int [ seven => { 1 + 2 } ], +/// time [ 1 ] +/// ); +/// +/// let meas: OwnedMeasurement = rx.recv().unwrap(); +/// +/// assert_eq!(meas.key, "test"); +/// assert_eq!(meas.get_tag("one"), Some("a")); +/// assert_eq!(meas.get_tag("two"), Some("b")); +/// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2))); +/// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3))); +/// assert_eq!(meas.timestamp, Some(1)); +/// +/// // use the @make_meas flag to skip sending a measurement, instead merely +/// // creating it. +/// +/// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]); +/// +/// // each variant also has shorthand aliases +/// +/// let meas: OwnedMeasurement = +/// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]); +/// } +/// ``` +/// +#[macro_export] +macro_rules! measure { + (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; + (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; + (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; + //(@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) }; + (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) }; + (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) }; + (@kv v, $meas:ident, $k:expr) => { measure!(@ea t, $meas, "version", $k) }; + (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) }; + (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; + (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Integer(AsI64::as_i64($v))) }; + (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Float(AsF64::as_f64($v))) }; + (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::String($v)) }; + (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128($v)) }; + (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Uuid($v)) }; + (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Boolean(bool::from($v))) }; + + (@as_expr $e:expr) => {$e}; + + (@count_tags) => {0usize}; + (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; + (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; -pub mod influx; -#[cfg(feature = "warnings")] -pub mod warnings; -#[cfg(feature = "latency")] -pub mod latency; -pub mod hist; - -#[cfg(feature = "trace")] -pub const LOG_LEVEL: Severity = Severity::Trace; -#[cfg(all(feature = "debug", not(feature = "trace")))] -pub const LOG_LEVEL: Severity = Severity::Debug; -#[cfg(not(any(feature = "debug", feature = "trace")))] -pub const LOG_LEVEL: Severity = Severity::Info; - -#[cfg(not(feature = "trace"))] -const CHANNEL_SIZE: usize = 32_768; -#[cfg(feature = "trace")] -const CHANNEL_SIZE: usize = 2_097_152; + (@count_fields) => {0usize}; + (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; + (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; + (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)}; + + (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => { + measure!(@make_meas $name, $( $t [ $($tail)* ] ),*) + }; + + (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ + let n_tags = measure!(@count_tags $($t)*); + let n_fields = measure!(@count_fields $($t)*); + let mut meas = + $crate::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); + $( + measure!(@kv $t, meas, $($tail)*); + )* + meas + }}; + + ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => { + measure!($m, $name, $($t [ $($tail)* ] ),+) + }; + + ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ + #[allow(unused_imports)] + use $crate::{AsI64, AsF64}; + let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*); + let _ = $m.send(measurement); + }}; +} /// converts a chrono::DateTime to an integer timestamp (ns) /// @@ -63,43 +200,6 @@ pub fn inanos(t: DateTime) -> i64 { t.timestamp() * 1_000_000_000i64 + t.timestamp_subsec_nanos() as i64 } -//#[cfg(not(any(test, feature = "test")))] -pub fn file_logger>(path: P, level: Severity) -> slog::Logger { - rotating_file_logger(path, level, true) -} - -pub fn rotating_file_logger>(path: P, level: Severity, compress: bool) -> slog::Logger { - let mut builder = FileLoggerBuilder::new(path); - builder.level(level) - .timezone(TimeZone::Utc) - .channel_size(CHANNEL_SIZE) - .rotate_size(1024 * 1024 * 1024) - .rotate_keep(1000) - .rotate_compress(compress) - .source_location(sloggers::types::SourceLocation::ModuleAndLine); - builder.build().unwrap() // the sloggers impl can't actually fail (v0.3) -} - -pub fn truncating_file_logger(path: &str, level: Severity) -> slog::Logger { - let mut builder = FileLoggerBuilder::new(path); - builder.level(level); - builder.timezone(TimeZone::Utc); - builder.truncate(); - builder.channel_size(CHANNEL_SIZE); - builder.build().unwrap() -} - -#[deprecated(since="0.4.0", note="Turns out the file logger in sloggers uses async, \ - making the async here duplicative")] -pub fn async_file_logger(path: &str, level: Severity) -> slog::Logger { - let drain = file_logger(path, level); - let async_drain = - slog_async::Async::new(drain) - .chan_size(100_000) - .build(); - slog::Logger::root(async_drain.fuse(), o!()) -} - //#[deprecated(since="0.4.3", note="Use `nanos(DateTime) -> u64` instead")] pub fn dt_nanos(t: DateTime) -> i64 { (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64) @@ -115,11 +215,780 @@ pub fn nanos_utc(t: i64) -> DateTime { Utc.timestamp(t / 1_000_000_000, (t % 1_000_000_000) as u32) } -pub fn short_uuid(id: &Uuid) -> String { - if cfg!(feature = "disable-short-uuid") { - id.to_string() - } else { - format!("{}", &id.to_string()[..8]) +#[derive(Clone, Debug)] +pub struct Point { + pub time: T, + pub value: V +} +pub struct DurationWindow { + pub size: Duration, + pub mean: Duration, + pub sum: Duration, + pub count: u32, + pub items: VecDeque> +} + +impl DurationWindow { + #[inline] + pub fn update(&mut self, time: Instant, value: Duration) { + self.add(time, value); + self.refresh(time); + } + + #[inline] + pub fn refresh(&mut self, t: Instant) -> &Self { + if !self.items.is_empty() { + let (n_remove, sum, count) = + self.items.iter() + .take_while(|x| t - x.time > self.size) + .fold((0, self.sum, self.count), |(n_remove, sum, count), x| { + (n_remove + 1, sum - x.value, count - 1) + }); + self.sum = sum; + self.count = count; + for _ in 0..n_remove { + self.items.pop_front(); + } + } + + if self.count > 0 { + self.mean = self.sum / self.count.into(); + } + + self + } + + #[inline] + pub fn add(&mut self, time: Instant, value: Duration) { + let p = Point { time, value }; + self.sum += p.value; + self.count += 1; + self.items.push_back(p); + } +} + +/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s +/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` +/// measurements have accumulated. +/// +#[derive(Debug)] +pub struct InfluxWriter { + host: String, + db: String, + tx: Sender>, + thread: Option>>, +} + +impl Default for InfluxWriter { + fn default() -> Self { + InfluxWriter::new("localhost", "test") + } +} + +impl Clone for InfluxWriter { + fn clone(&self) -> Self { + debug_assert!(self.thread.is_some()); + let thread = self.thread.as_ref().map(|x| Arc::clone(x)); + InfluxWriter { + host: self.host.to_string(), + db: self.db.to_string(), + tx: self.tx.clone(), + thread, + } + } +} + +impl InfluxWriter { + pub fn host(&self) -> &str { self.host.as_str() } + + pub fn db(&self) -> &str { self.db.as_str() } + + /// Sends the `OwnedMeasurement` to the serialization thread. + /// + #[inline] + pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError>> { + self.tx.send(Some(m)) + } + + #[inline] + pub fn nanos(&self, d: DateTime) -> i64 { nanos(d) as i64 } + + #[inline] + pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 } + + #[inline] + pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 } + + #[inline] + pub fn rsecs(&self, d: Duration) -> f64 { + ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64)) + * 1000.0) + .round() + / 1000.0 + } + + #[inline] + pub fn secs(&self, d: Duration) -> f64 { + d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64 + } + + pub fn tx(&self) -> Sender> { + self.tx.clone() + } + + #[inline] + pub fn is_full(&self) -> bool { self.tx.is_full() } + + pub fn placeholder() -> Self { + let (tx, _) = bounded(1024); + Self { + host: String::new(), + db: String::new(), + tx, + thread: None, + } + } + + pub fn new(host: &str, db: &str) -> Self { + let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!()); + Self::with_logger(host, db, &noop_logger) + } + + #[allow(unused_assignments)] + pub fn with_logger(host: &str, db: &str, logger: &Logger) -> Self { + let logger = logger.new(o!( + "host" => host.to_string(), + "db" => db.to_string())); + let (tx, rx): (Sender>, Receiver>) = bounded(4096); + let url = + Url::parse_with_params(&format!("http://{}:8086/write", host), + &[("db", db), ("precision", "ns")]) + .expect("influx writer url should parse"); + let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { + use std::time::*; + use crossbeam_channel as chan; + + #[cfg(feature = "no-influx-buffer")] + const N_BUFFER_LINES: usize = 0; + + const N_BUFFER_LINES: usize = 1024; + const MAX_PENDING: Duration = Duration::from_secs(3); + const INITIAL_BUFFER_CAPACITY: usize = 4096; + const MAX_BACKLOG: usize = 128; + const MAX_OUTSTANDING_HTTP: usize = 64; + const HB_EVERY: usize = 100_000; + const N_HTTP_ATTEMPTS: u32 = 15; + + let client = Arc::new(Client::new()); + + info!(logger, "initializing InfluxWriter ..."; + "N_BUFFER_LINES" => N_BUFFER_LINES, + "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING), + "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP, + "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, + "MAX_BACKLOG" => MAX_BACKLOG); + + // pre-allocated buffers ready for use if the active one is stasheed + // during an outage + let mut spares: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); + + // queue failed sends here until problem resolved, then send again. in worst + // case scenario, loop back around on buffers queued in `backlog`, writing + // over the oldest first. + // + let mut backlog: VecDeque = VecDeque::with_capacity(MAX_BACKLOG); + + for _ in 0..MAX_BACKLOG { + spares.push_back(String::with_capacity(1024)); + } + + struct Resp { + pub buf: String, + pub took: Duration, + } + + let mut db_health = DurationWindow { + size: Duration::from_secs(120), + mean: Duration::new(10, 0), + sum: Duration::new(0, 0), + count: 0, + items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP), + }; + + let (http_tx, http_rx) = chan::bounded(32); + + let mut buf = spares.pop_front().unwrap(); + let mut count = 0; + let mut extras = 0; // any new Strings we intro to the system + let mut n_rcvd = 0; + let mut last = Instant::now(); + let mut active: bool; + let mut last_clear = Instant::now(); + let mut loop_time = Instant::now(); + + let n_out = |s: &VecDeque, b: &VecDeque, extras: usize| -> usize { + MAX_BACKLOG + extras - s.len() - b.len() - 1 + }; + + assert_eq!(n_out(&spares, &backlog, extras), 0); + + let send = |mut buf: String, backlog: &mut VecDeque, n_outstanding: usize| { + if n_outstanding >= MAX_OUTSTANDING_HTTP { + backlog.push_back(buf); + return + } + let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url + let tx = http_tx.clone(); + let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure + let client = Arc::clone(&client); + debug!(logger, "launching http thread"); + let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { + let logger = thread_logger; + debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len()); + let start = Instant::now(); + for n_req in 0..N_HTTP_ATTEMPTS { + let throttle = Duration::from_secs(2) * n_req * n_req; + if n_req > 0 { + warn!(logger, "InfluxWriter http thread: pausing before next request"; + "n_req" => n_req, + "throttle" => %format_args!("{:?}", throttle), + "elapsed" => %format_args!("{:?}", Instant::now() - start)); + thread::sleep(throttle); // 0, 2, 8, 16, 32 + } + let sent = Instant::now(); + let resp = client.post(url.clone()) + .body(buf.as_str()) + .send(); + let rcvd = Instant::now(); + let took = rcvd - sent; + let mut n_tx = 0u32; + match resp { + Ok(Response { status, .. }) if status == StatusCode::NoContent => { + debug!(logger, "server responded ok: 204 NoContent"); + buf.clear(); + let mut resp = Some(Ok(Resp { buf, took })); + loop { + n_tx += 1; + match tx.try_send(resp.take().unwrap()) { + Ok(_) => { + if n_req > 0 { + info!(logger, "successfully recovered from failed request with retry"; + "n_req" => n_req, + "n_tx" => n_tx, + "elapsed" => %format_args!("{:?}", Instant::now() - start)); + } + return + } + + Err(chan::TrySendError::Full(r)) => { + let throttle = Duration::from_millis(1000) * n_tx; + warn!(logger, "channel full: InfluxWriter http thread failed to return buf"; + "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle)); + resp = Some(r); + thread::sleep(throttle); + } + + Err(chan::TrySendError::Disconnected(_)) => { + warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return"; + "n_tx" => n_tx, "n_req" => n_req); + return + } + } + } + } + + Ok(mut resp) => { + let mut server_resp = String::new(); + let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); + error!(logger, "influx server error (request took {:?})", took; + "status" => %resp.status, + "body" => server_resp); + } + + Err(e) => { + error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e); + } + } + + } + let took = Instant::now() - start; + warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer"; + "took" => %format_args!("{:?}", took)); + let buflen = buf.len(); + let n_lines = buf.lines().count(); + if let Err(e) = tx.send(Err(Resp { buf, took })) { + crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e; + "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines); + } + }); + + if let Err(e) = thread_res { + crit!(logger, "failed to spawn thread: {}", e); + } + }; + + let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result { + match prev { + 0 if N_BUFFER_LINES > 0 => { + serialize_owned(m, buf); + Ok(1) + } + + n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => { + buf.push_str("\n"); + serialize_owned(m, buf); + Ok(n + 1) + } + + n => { + buf.push_str("\n"); + serialize_owned(m, buf); + Err(n + 1) + } + } + }; + + 'event: loop { + loop_time = Instant::now(); + active = false; + match rx.recv() { + Ok(Some(mut meas)) => { + n_rcvd += 1; + active = true; + + if n_rcvd % HB_EVERY == 0 { + let n_outstanding = n_out(&spares, &backlog, extras); + info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "n_active_buf" => count, + "db_health" => %format_args!("{:?}", db_health.mean), + "backlog.len()" => backlog.len()); + } + + if meas.timestamp.is_none() { meas.timestamp = Some(now()) } + + if meas.fields.is_empty() { + meas.fields.push(("n", OwnedValue::Integer(1))); + } + + //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } } + + count = match next(count, &meas, &mut buf, loop_time, last) { + Ok(n) => n, + Err(_n) => { + let mut count = 0; + let mut next: String = match spares.pop_front() { + Some(x) => x, + + None => { + let n_outstanding = n_out(&spares, &backlog, extras); + crit!(logger, "no available buffers in `spares`, pulling from backlog"; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "backlog.len()" => backlog.len()); + match backlog.pop_front() { + // Note: this does not clear the backlog buffer, + // instead we will just write more and more until + // we are out of memory. I expect that will never + // happen. + // + Some(x) => { + count = 1; // otherwise, no '\n' added in `next(..)` - we are + // sending a "full" buffer to be extended + x + } + + None => { + extras += 1; + crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len(), + "n_rcvd" => n_rcvd, + "extras" => extras); + String::new() + } + } + } + }; + // after swap, buf in next, so want to send next + // + mem::swap(&mut buf, &mut next); + let n_outstanding = n_out(&spares, &backlog, extras); + send(next, &mut backlog, n_outstanding); + last = loop_time; + count + } + }; + } + + Ok(None) => { + let start = Instant::now(); + let mut hb = Instant::now(); + warn!(logger, "terminate signal rcvd"; "count" => count); + if buf.len() > 0 { + info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); + let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); + let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last); + let n_outstanding = n_out(&spares, &backlog, extras); + let mut placeholder = spares.pop_front().unwrap_or_else(String::new); + mem::swap(&mut buf, &mut placeholder); + send(placeholder, &mut backlog, n_outstanding); + } + let mut n_ok = 0; + let mut n_err = 0; + loop { + loop_time = Instant::now(); + let n_outstanding = n_out(&spares, &backlog, extras); + if backlog.is_empty() && n_outstanding < 1 { + info!(logger, "cleared any remaining backlog"; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len(), + "n_cleared_ok" => n_ok, + "n_cleared_err" => n_err, + "n_rcvd" => n_rcvd, + "extras" => extras, + "elapsed" => %format_args!("{:?}", loop_time - start)); + break 'event + } + if loop_time - hb > Duration::from_secs(5) { + info!(logger, "InfluxWriter still clearing backlog .."; + "n_outstanding" => n_outstanding, + "spares.len()" => spares.len(), + "backlog.len()" => backlog.len(), + "n_cleared_ok" => n_ok, + "n_cleared_err" => n_err, + "extras" => extras, + "n_rcvd" => n_rcvd, + "elapsed" => %format_args!("{:?}", loop_time - start)); + hb = loop_time; + } + if let Some(buf) = backlog.pop_front() { + let n_outstanding = n_out(&spares, &backlog, extras); + debug!(logger, "resending queued buffer from backlog"; + "backlog.len()" => backlog.len(), + "spares.len()" => spares.len(), + "n_rcvd" => n_rcvd, + "n_outstanding" => n_outstanding); + send(buf, &mut backlog, n_outstanding); + last_clear = loop_time; + } + + 'rx: loop { + match http_rx.try_recv() { + Ok(Ok(Resp { buf, .. })) => { + n_ok += 1; + spares.push_back(buf); // needed so `n_outstanding` count remains accurate + } + Ok(Err(Resp { buf, .. })) => { + warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); + n_err += 1; + backlog.push_front(buf); + } + Err(chan::TryRecvError::Disconnected) => { + crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting"; + "n_outstanding" => n_outstanding, + "backlog.len()" => backlog.len(), + "n_cleared_ok" => n_ok, + "n_cleared_err" => n_err, + "extras" => extras, + "n_rcvd" => n_rcvd, + "elapsed" => %format_args!("{:?}", loop_time - start)); + break 'event + } + Err(_) => break 'rx + } + } + thread::sleep(Duration::from_millis(1)); + } + } + + _ => {} + } + + db_health.refresh(loop_time); + let n_outstanding = n_out(&spares, &backlog, extras); + let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200); + if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { + if let Some(queued) = backlog.pop_front() { + let n_outstanding = n_out(&spares, &backlog, extras); + send(queued, &mut backlog, n_outstanding); + active = true; + } + } + + loop { + match http_rx.try_recv() { + Ok(Ok(Resp { buf, took })) => { + db_health.add(loop_time, took); + spares.push_back(buf); + active = true; + } + + Ok(Err(Resp { buf, took })) => { + db_health.add(loop_time, took); + backlog.push_front(buf); + active = true; + } + + Err(chan::TryRecvError::Disconnected) => { + crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting"; + "n_outstanding" => n_outstanding, + "backlog.len()" => backlog.len(), + "n_rcvd" => n_rcvd, + "extras" => extras); + break 'event + } + + Err(_) => break + } + } + + if !active { + thread::sleep(Duration::new(0, 1)) + } + } + info!(logger, "waiting 1s before exiting thread"); + thread::sleep(Duration::from_secs(1)); + }).unwrap(); + + InfluxWriter { + host: host.to_string(), + db: db.to_string(), + tx, + thread: Some(Arc::new(thread)) + } + } +} + +impl Drop for InfluxWriter { + fn drop(&mut self) { + if let Some(arc) = self.thread.take() { + if let Ok(thread) = Arc::try_unwrap(arc) { + let _ = self.tx.send(None); + let _ = thread.join(); + } + } + } +} + +/// This removes offending things rather than escaping them. +/// +fn escape_tag(s: &str) -> String { + s.replace(" ", "") + .replace(",", "") + .replace("\"", "") +} + +fn escape(s: &str) -> String { + s.replace(" ", "\\ ") + .replace(",", "\\,") +} + +fn as_string(s: &str) -> String { + // the second replace removes double escapes + // + format!("\"{}\"", s.replace("\"", "\\\"") + .replace(r#"\\""#, r#"\""#)) +} + +#[test] +fn it_checks_as_string_does_not_double_escape() { + let raw = "this is \\\"an escaped string\\\" so it's problematic"; + let escaped = as_string(&raw); + assert_eq!(escaped, format!("\"{}\"", raw).as_ref()); +} + +fn as_boolean(b: &bool) -> &str { + if *b { "t" } else { "f" } +} + +pub fn now() -> i64 { + nanos(Utc::now()) as i64 +} + +/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. +/// +/// The serialized measurement is appended to the end of the string without +/// any regard for what exited in it previously. +/// +pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { + line.push_str(&escape_tag(measurement.key)); + + let add_tag = |line: &mut String, key: &str, value: &str| { + line.push_str(","); + line.push_str(&escape_tag(key)); + line.push_str("="); + line.push_str(&escape(value)); + }; + + for (key, value) in measurement.tags.iter() { + #[cfg(not(feature = "string-tags"))] + add_tag(line, key, value); + + #[cfg(feature = "string-tags")] + add_tag(line, key, value.as_str()); + } + + let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| { + if is_first { line.push_str(" "); } else { line.push_str(","); } + line.push_str(&escape_tag(key)); + line.push_str("="); + match *value { + OwnedValue::String(ref s) => line.push_str(&as_string(s)), + OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)), + OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)), + + OwnedValue::D128(ref d) => { + if d.is_finite() { + line.push_str(&format!("{}", d)); + } else { + line.push_str("0.0"); + } + } + + OwnedValue::Float(ref f) => { + if f.is_finite() { + line.push_str(&format!("{}", f)); + } else { + line.push_str("-999.0"); + } + } + + OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)), + }; + }; + + let mut fields = measurement.fields.iter(); + + // first time separate from tags with space + // + fields.next().map(|kv| { + add_field(line, &kv.0, &kv.1, true); + }); + + // then seperate the rest w/ comma + // + for kv in fields { + add_field(line, kv.0, &kv.1, false); + } + + if let Some(t) = measurement.timestamp { + line.push_str(" "); + line.push_str(&t.to_string()); + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum OwnedValue { + String(String), + Float(f64), + Integer(i64), + Boolean(bool), + D128(d128), + Uuid(Uuid), +} + +/// Holds data meant for an influxdb measurement in transit to the +/// writing thread. +/// +#[derive(Clone, Debug)] +pub struct OwnedMeasurement { + pub key: &'static str, + pub timestamp: Option, + //pub fields: Map<&'static str, OwnedValue>, + //pub tags: Map<&'static str, &'static str>, + pub fields: SmallVec<[(&'static str, OwnedValue); 8]>, + #[cfg(not(feature = "string-tags"))] + pub tags: SmallVec<[(&'static str, &'static str); 8]>, + #[cfg(feature = "string-tags")] + pub tags: SmallVec<[(&'static str, String); 8]>, +} + +impl OwnedMeasurement { + pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self { + OwnedMeasurement { + key, + timestamp: None, + tags: SmallVec::with_capacity(n_tags), + fields: SmallVec::with_capacity(n_fields), + } + } + + pub fn new(key: &'static str) -> Self { + OwnedMeasurement { + key, + timestamp: None, + tags: SmallVec::new(), + fields: SmallVec::new(), + } + } + + /// Unusual consuming `self` signature because primarily used by + /// the `measure!` macro. + #[cfg(not(feature = "string-tags"))] + pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { + self.tags.push((key, value)); + self + } + + #[cfg(feature = "string-tags")] + pub fn add_tag(mut self, key: &'static str, value: S) -> Self { + self.tags.push((key, value.to_string())); + self + } + + /// Unusual consuming `self` signature because primarily used by + /// the `measure!` macro. + pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { + self.fields.push((key, value)); + self + } + + pub fn set_timestamp(mut self, timestamp: i64) -> Self { + self.timestamp = Some(timestamp); + self + } + + #[cfg(not(feature = "string-tags"))] + pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self { + match self.tags.iter().position(|kv| kv.0 == key) { + Some(i) => { + self.tags.get_mut(i) + .map(|x| { + x.0 = value; + }); + self + } + + None => { + self.add_tag(key, value) + } + } + } + + pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> { + self.fields.iter() + .find(|kv| kv.0 == key) + .map(|kv| &kv.1) + } + + #[cfg(feature = "string-tags")] + pub fn get_tag(&self, key: &'static str) -> Option<&str> { + self.tags.iter() + .find(|kv| kv.0 == key) + .map(|kv| kv.1.as_str()) + } + + #[cfg(not(feature = "string-tags"))] + pub fn get_tag(&self, key: &'static str) -> Option<&'static str> { + self.tags.iter() + .find(|kv| kv.0 == key) + .map(|kv| kv.1) } } @@ -127,16 +996,342 @@ pub fn short_uuid(id: &Uuid) -> String { #[cfg(test)] mod tests { use super::*; + #[cfg(feature = "unstable")] + use test::{black_box, Bencher}; + + #[ignore] + #[cfg(feature = "unstable")] + #[bench] + fn measure_ten(b: &mut Bencher) { + let influx = InfluxWriter::new("localhost", "test"); + let mut n = 0; + b.iter(|| { + for _ in 0..10 { + let time = influx.nanos(Utc::now()); + n += 1; + measure!(influx, million, i(n), tm(time)); + } + }); + } + + #[test] + fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() { + const VERSION: &str = "0.3.90"; + let tag_value = "one"; + let color = "red"; + let time = Utc::now(); + let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time)); + assert_eq!(m.get_tag("color"), Some("red")); + assert_eq!(m.get_tag("version"), Some(VERSION)); + assert_eq!(m.timestamp, Some(nanos(time) as i64)); + } #[test] - fn utc_nanos_round_trip() { - let utc = Utc::now(); - let ns = inanos(utc); - let rt = nanos_utc(ns); - assert_eq!(utc, rt); - let utc = Utc.ymd(1970, 1, 1).and_hms(0, 0, 0); - let ns = inanos(utc); - let rt = nanos_utc(ns); - assert_eq!(utc, rt); + fn it_uses_the_v_for_version_shortcut() { + const VERSION: &str = "0.3.90"; + let tag_value = "one"; + let color = "red"; + let time = now(); + let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time)); + assert_eq!(m.get_tag("color"), Some("red")); + assert_eq!(m.get_tag("version"), Some(VERSION)); + assert_eq!(m.timestamp, Some(time)); + } + + #[test] + fn it_uses_the_new_tag_k_only_shortcut() { + let tag_value = "one"; + let color = "red"; + let time = now(); + let m = measure!(@make_meas test, t(color), t(tag_value), tm(time)); + assert_eq!(m.get_tag("color"), Some("red")); + assert_eq!(m.get_tag("tag_value"), Some("one")); + assert_eq!(m.timestamp, Some(time)); + } + + #[test] + fn it_uses_measure_macro_parenthesis_syntax() { + let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1)); + assert_eq!(m.key, "test"); + assert_eq!(m.get_tag("a"), Some("b")); + assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1))); + assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1))); + assert_eq!(m.timestamp, Some(1)); + } + + #[test] + fn it_uses_measure_macro_on_a_self_attribute() { + struct A { + pub influx: InfluxWriter, + } + + impl A { + fn f(&self) { + measure!(self.influx, test, t(color, "red"), i(n, 1)); + } + } + + let a = A { influx: InfluxWriter::default() }; + + a.f(); + } + + #[test] + fn it_clones_an_influx_writer_to_check_both_drop() { + let influx = InfluxWriter::default(); + measure!(influx, drop_test, i(a, 1), i(b, 2)); + { + let influx = influx.clone(); + thread::spawn(move || { + measure!(influx, drop_test, i(a, 3), i(b, 4)); + }); + } + } + + #[cfg(feature = "unstable")] + #[bench] + fn influx_writer_send_basic(b: &mut Bencher) { + let m = InfluxWriter::new("localhost", "test"); + b.iter(|| { + measure!(m, test, t(color; "red"), i(n, 1)); //, float[p; 1.234]); + }); + } + + #[cfg(feature = "unstable")] + #[bench] + fn influx_writer_send_price(b: &mut Bencher) { + let m = InfluxWriter::new("localhost", "test"); + b.iter(|| { + measure!(m, test, + t(ticker, "xmr_btc"), + t(exchange, "plnx"), + d(bid, d128::zero()), + d(ask, d128::zero()), + ); + }); + } + + #[test] + fn it_checks_color_tag_error_in_non_doctest() { + let (tx, rx) = bounded(1024); + measure!(tx, test, t(color,"red"), i(n,1)); + let meas: OwnedMeasurement = rx.recv().unwrap(); + assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas); + } + + #[test] + fn it_uses_the_make_meas_pattern_of_the_measure_macro() { + let meas = measure!(@make_meas test_measurement, + t(one, "a"), t(two, "b"), i(three, 2), + f(four, 1.2345), s(five, String::from("d")), + b(six, true), i(seven, 1 + 2), + tm(1) + ); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.get_tag("one"), Some("a")); + assert_eq!(meas.get_tag("two"), Some("b")); + assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2))); + assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3))); + assert_eq!(meas.timestamp, Some(1)); + } + + #[test] + fn it_uses_measure_macro_for_d128_and_uuid() { + + let (tx, rx) = bounded(1024); + let one = "a"; + let two = d128::zero(); + let three = Uuid::new_v4(); + let time = now(); + measure!(tx, test_measurement, t(one), d(two), u(three), tm(time)); + + thread::sleep(Duration::from_millis(10)); + let meas: OwnedMeasurement = rx.try_recv().unwrap(); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.get_tag("one"), Some("a")); + assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero()))); + assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(three))); + assert_eq!(meas.timestamp, Some(time)); + } + + #[test] + fn it_uses_the_measure_macro_alt_syntax() { + + let (tx, rx) = bounded(1024); + measure!(tx, test_measurement, + t(one, "a"), t(two, "b"), i(three, 2), + f(four, 1.2345), s(five, String::from("d")), + b(six, true), i(seven, 1 + 2), + tm(1) + ); + + thread::sleep(Duration::from_millis(10)); + let meas: OwnedMeasurement = rx.try_recv().unwrap(); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.get_tag("one"), Some("a")); + assert_eq!(meas.get_tag("two"), Some("b")); + assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2))); + assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3))); + assert_eq!(meas.timestamp, Some(1)); + } + + #[test] + fn it_checks_that_fields_are_separated_correctly() { + let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]); + assert_eq!(m.key, "test"); + assert_eq!(m.get_tag("a"), Some("one")); + assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1))); + + let mut buf = String::new(); + serialize_owned(&m, &mut buf); + assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf); + } + + #[test] + fn try_to_break_measure_macro() { + let (tx, _) = bounded(1024); + measure!(tx, one, t(x,"y"), i(n,1)); + measure!(tx, one, t(x,"y"), i(n,1),); + + struct A { + pub one: i32, + pub two: i32, + } + + struct B { + pub a: A + } + + let b = B { a: A { one: 1, two: 2 } }; + + let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one)); + + assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1))); + } + + #[cfg(feature = "unstable")] + #[bench] + fn measure_macro_small(b: &mut Bencher) { + let (tx, rx) = bounded(1024); + let listener = thread::spawn(move || { + loop { if rx.recv().is_err() { break } } + }); + b.iter(|| { + measure!(tx, test, t(color, "red"), i(n, 1), tm(now())); + }); + } + + #[cfg(feature = "unstable")] + #[bench] + fn measure_macro_medium(b: &mut Bencher) { + let (tx, rx) = bounded(1024); + let listener = thread::spawn(move || { + loop { if rx.recv().is_err() { break } } + }); + b.iter(|| { + measure!(tx, test, t(color, "red"), t(mood, "playful"), + t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322), + i(n, 1), tm(now())); + }); + } + + #[cfg(feature = "unstable")] + #[bench] + fn serialize_owned_longer(b: &mut Bencher) { + let mut buf = String::with_capacity(1024); + let m = + OwnedMeasurement::new("test") + .add_tag("one", "a") + .add_tag("two", "b") + .add_tag("ticker", "xmr_btc") + .add_tag("exchange", "plnx") + .add_tag("side", "bid") + .add_field("three", OwnedValue::Float(1.2345)) + .add_field("four", OwnedValue::Integer(57)) + .add_field("five", OwnedValue::Boolean(true)) + .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz"))) + .set_timestamp(now()); + b.iter(|| { + serialize_owned(&m, &mut buf); + buf.clear() + }); + } + + #[cfg(feature = "unstable")] + #[bench] + fn serialize_owned_simple(b: &mut Bencher) { + let mut buf = String::with_capacity(1024); + let m = + OwnedMeasurement::new("test") + .add_tag("one", "a") + .add_tag("two", "b") + .add_field("three", OwnedValue::Float(1.2345)) + .add_field("four", OwnedValue::Integer(57)) + .set_timestamp(now()); + b.iter(|| { + serialize_owned(&m, &mut buf); + buf.clear() + }); + } + + #[cfg(feature = "unstable")] + #[bench] + fn clone_url_for_thread(b: &mut Bencher) { + let host = "ahmes"; + let db = "mlp"; + let url = + Url::parse_with_params(&format!("http://{}:8086/write", host), + &[("db", db), ("precision", "ns")]).unwrap(); + b.iter(|| { + url.clone() + }) + } + + #[cfg(feature = "unstable")] + #[bench] + fn clone_arc_url_for_thread(b: &mut Bencher) { + let host = "ahmes"; + let db = "mlp"; + let url = + Url::parse_with_params(&format!("http://{}:8086/write", host), + &[("db", db), ("precision", "ns")]).unwrap(); + let url = Arc::new(url); + b.iter(|| { + Arc::clone(&url) + }) + } + + #[test] + fn it_serializes_a_hard_to_serialize_message_from_owned() { + let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; + let mut buf = String::new(); + let mut server_resp = String::new(); + let m = OwnedMeasurement::new("rust_test") + .add_field("s", OwnedValue::String(raw.to_string())) + .set_timestamp(now()); + serialize_owned(&m, &mut buf); + println!("{}", buf); + buf.push_str("\n"); + let buf_copy = buf.clone(); + buf.push_str(&buf_copy); + println!("{}", buf); + + let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); + let client = Client::new(); + match client.post(url.clone()) + .body(&buf) + .send() { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => {} + + Ok(mut resp) => { + resp.read_to_string(&mut server_resp).unwrap(); + panic!("{}", server_resp); + } + + Err(why) => { + panic!(why) + } + } } } diff --git a/src/warnings.rs b/src/warnings.rs deleted file mode 100644 index 750573e..0000000 --- a/src/warnings.rs +++ /dev/null @@ -1,584 +0,0 @@ -//! An object to handle everyone's errors -//! - -use std::thread::{self, JoinHandle}; -use std::sync::{Arc, Mutex, RwLock}; -use std::sync::mpsc::{Sender, channel}; -use std::collections::{BTreeMap, VecDeque}; -use std::fmt::{self, Display, Error as FmtError, Formatter}; -use std::io::{self, Write}; -use std::fs; - -use chrono::{DateTime, Utc}; -use termion::color::{self, Fg, Bg}; -use influent::measurement::{Measurement, Value as InfluentValue}; -use slog::{self, OwnedKVList, Drain, Key, KV, Level, Logger}; -use sloggers::types::Severity; - -use super::{nanos, file_logger}; -use influx; - - -const N_WARNINGS: usize = 500; - -#[macro_export] -macro_rules! confirmed { - ($warnings:ident, $($args:tt)*) => ( - { - let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap(); - } - ) -} - -/// logs a `Warning::Awesome` message to the `WarningsManager` -#[macro_export] -macro_rules! awesome { - ($warnings:ident, $($args:tt)*) => ( - { - let _ = $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap(); - } - ) -} - -#[macro_export] -macro_rules! critical { - ($warnings:ident, $($args:tt)*) => ( - { - let _ = $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap(); - } - ) -} - -#[macro_export] -macro_rules! notice { - ($warnings:ident, $($args:tt)*) => ( - { - let _ = $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap(); - } - ) -} - -#[macro_export] -macro_rules! error_w { - ($warnings:ident, $($args:tt)*) => ( - { - $warnings.send(Warning::Error( ( format!($($args)*) ) ) ).unwrap(); - } - ) -} - -/// represents a non-fatal error somewhere in -/// the system to report either to the program interface -/// or in logs. -/// -#[derive(Debug, Clone, PartialEq)] -pub enum Warning { - Notice(String), - - Error(String), - - DegradedService(String), - - Critical(String), - - Confirmed(String), - - Awesome(String), - - Log { - level: Level, - module: &'static str, - function: &'static str, - line: u32, - msg: String, - kv: MeasurementRecord, - }, - - Terminate -} - -impl Warning { - pub fn msg(&self) -> String { - match *self { - Warning::Notice(ref s) | Warning::Error(ref s) | - Warning::DegradedService(ref s) | Warning::Critical(ref s) | - Warning::Awesome(ref s) | Warning::Confirmed(ref s) | - Warning::Log { msg: ref s, .. } => - s.clone(), - - Warning::Terminate => "".to_owned() - } - } - pub fn msg_str(&self) -> &str { - match *self { - Warning::Notice(ref s) | Warning::Error(ref s) | - Warning::DegradedService(ref s) | Warning::Critical(ref s) | - Warning::Awesome(ref s) | Warning::Confirmed(ref s) | - Warning::Log { msg: ref s, .. } => - - s.as_ref(), - - Warning::Terminate => "Terminate" - } - } - - pub fn category_str(&self) -> &str { - match self { - &Warning::Notice(_) => "NOTC", - &Warning::Error(_) => "ERRO", - &Warning::Critical(_) => "CRIT", - &Warning::DegradedService(_) => "DGRD", - &Warning::Confirmed(_) => "CNFD", - &Warning::Awesome(_) => "AWSM", - &Warning::Log { ref level, .. } => level.as_short_str(), - &Warning::Terminate => "TERM", - } - } - - pub fn category(&self, f: &mut Formatter) -> fmt::Result { - match *self { - Warning::Notice(_) => { - write!(f, "[ Notice ]") - } - - Warning::Error(_) => { - write!(f, "{yellow}[{title}]{reset}", - yellow = Fg(color::LightYellow), - title = " Error--", - reset = Fg(color::Reset)) - } - - Warning::Critical(_) => { - write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", - bg = Bg(color::Red), - fg = Fg(color::White), - title = " CRITICAL ", - resetbg = Bg(color::Reset), - resetfg = Fg(color::Reset)) - } - - Warning::Awesome(_) => { - write!(f, "{color}[{title}]{reset}", - color = Fg(color::Green), - title = "Awesome!", - reset = Fg(color::Reset)) - } - - Warning::DegradedService(_) => { - write!(f, "{color}[{title}] {reset}", - color = Fg(color::Blue), - title = "Degraded Service ", - reset = Fg(color::Reset)) - } - - Warning::Confirmed(_) => { - write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", - bg = Bg(color::Blue), - fg = Fg(color::White), - title = "Confirmed ", - resetbg = Bg(color::Reset), - resetfg = Fg(color::Reset)) - } - - _ => Ok(()) - } - } -} - -impl Display for Warning { - fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - self.category(f)?; - write!(f, " {}", self.msg()) - } -} - -#[derive(Debug, Clone)] -pub struct Record { - pub time: DateTime, - pub msg: Warning -} - -impl Record { - pub fn new(msg: Warning) -> Self { - let time = Utc::now(); - Record { time, msg } - } - - pub fn to_measurement(&self, name: &'static str) -> Measurement { - let cat = self.msg.category_str(); - let body = self.msg.msg_str(); - let mut m = Measurement::new(name); - m.add_tag("category", cat); - m.add_field("msg", InfluentValue::String(body)); - m.set_timestamp(nanos(self.time) as i64); - m - } -} - -impl Display for Record { - fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - write!(f, "{} | {}", self.time.format("%H:%M:%S"), self.msg) - } -} - -pub type SlogResult = Result<(), slog::Error>; - -#[derive(Debug, Clone, PartialEq)] -pub enum Value { - String(String), - Float(f64), - Integer(i64), - Boolean(bool) -} - -impl Value { - pub fn to_influent<'a>(&'a self) -> InfluentValue<'a> { - match self { - &Value::String(ref s) => InfluentValue::String(s), - &Value::Float(n) => InfluentValue::Float(n), - &Value::Integer(i) => InfluentValue::Integer(i), - &Value::Boolean(b) => InfluentValue::Boolean(b), - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct MeasurementRecord { - fields: Vec<(Key, Value)>, - tags: Vec<(Key, String)>, -} - -impl MeasurementRecord { - pub fn new() -> Self { - MeasurementRecord { - fields: Vec::new(), - tags: Vec::new(), - } - } - - pub fn add_field(&mut self, key: Key, val: Value) -> SlogResult { - self.fields.push((key, val)); - Ok(()) - } - - pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult { - match key { - "exchange" | "thread" | "ticker" | "category" => { - self.tags.push((key, val)); - } - - other => { - self.add_field(other, Value::String(val)).unwrap(); - } - } - - Ok(()) - } - - pub fn serialize_values(&mut self, record: &slog::Record, values: &OwnedKVList) { - let mut builder = TagBuilder { mrec: self }; - let _ = values.serialize(record, &mut builder); - } - - pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> { - let fields: BTreeMap<&'a str, InfluentValue<'a>> = - self.fields.iter() - .map(|&(k, ref v)| { - (k, v.to_influent()) - }).collect(); - - let tags: BTreeMap<&'a str, &'a str> = - self.tags.iter() - .map(|&(k, ref v)| { - (k, v.as_ref()) - }).collect(); - - Measurement { - key: name, - timestamp: Some(nanos(Utc::now()) as i64), - fields, - tags, - } - } -} - -impl slog::Serializer for MeasurementRecord { - fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)) } - fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_i16(&mut self, key: Key, val: i16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_u32(&mut self, key: Key, val: u32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_i32(&mut self, key: Key, val: i32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_f32(&mut self, key: Key, val: f32) -> SlogResult { self.add_field(key, Value::Float(val as f64)) } - fn emit_u64(&mut self, key: Key, val: u64) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } - fn emit_i64(&mut self, key: Key, val: i64) -> SlogResult { self.add_field(key, Value::Integer(val)) } - fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) } - fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_field(key, Value::String(val.to_string())) } - fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) } - fn emit_none(&mut self, _: Key) -> SlogResult { Ok(()) } //self.add_field(key, Value::String("none".into())) } - fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_field(key, Value::String(val.to_string())) } -} - -pub struct TagBuilder<'a> { - mrec: &'a mut MeasurementRecord -} - -impl<'a> slog::Serializer for TagBuilder<'a> { - fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { - match key { - "exchange" | "thread" | "ticker" | "category" => { - self.mrec.add_tag(key, val.to_string()) - } - - other => { - self.mrec.add_field(other, Value::String(val.to_string())) - } - } - } - - fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { - match key { - "exchange" | "thread" | "ticker" | "category" => { - self.mrec.add_tag(key, val.to_string()) - } - - other => { - self.mrec.add_field(other, Value::String(val.to_string())) - } - } - - } -} - -pub struct WarningsDrain { - level: Level, - tx: Arc>>, - drain: D, - to_file: Logger, -} - -impl WarningsDrain - where D: Drain -{ - pub fn new(tx: Sender, level: Level, drain: D) -> Self { - let tx = Arc::new(Mutex::new(tx)); - let to_file = file_logger("var/log/mm.log", Severity::Warning); - WarningsDrain { tx, drain, level, to_file } - } -} - -impl From> for WarningsDrain> { - fn from(tx: Sender) -> Self { - WarningsDrain::new(tx, Level::Debug, slog::Discard.fuse()) - } -} - -impl Drain for WarningsDrain { - type Ok = (); - type Err = D::Err; - - fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { - if record.level() <= self.level { - let mut ser = MeasurementRecord::new(); - ser.serialize_values(record, values); - let _ = record.kv().serialize(record, &mut ser); - let msg = record.msg().to_string(); - if let Ok(lock) = self.tx.lock() { - let _ = lock.send(Warning::Log { - level: record.level(), - module: record.module(), - function: record.function(), - line: record.line(), - msg, - kv: ser - }); - } - } - if record.level() <= Level::Warning { - let _ = self.to_file.log(record); - } - let _ = self.drain.log(record, values)?; - Ok(()) - } -} - -#[derive(Debug)] -pub struct WarningsManager { - pub tx: Sender, - pub warnings: Arc>>, - thread: Option> -} - -impl Drop for WarningsManager { - fn drop(&mut self) { - let _ = self.tx.send(Warning::Terminate); - if let Some(thread) = self.thread.take() { - thread.join().unwrap(); - } - } -} - -const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f"; - -/// Serializes *only* KV pair with `key == "thread"` -/// -struct ThreadSer<'a>(&'a mut Vec); - -impl<'a> slog::ser::Serializer for ThreadSer<'a> { - fn emit_arguments(&mut self, _: &str, _: &fmt::Arguments) -> slog::Result { - Ok(()) - } - - fn emit_str(&mut self, key: &str, val: &str) -> slog::Result { - if key == "thread" { - write!(self.0, " {:<20}", val)?; - } - Ok(()) - } -} - - -/// Serializes KV pairs as ", k: v" -/// -struct KvSer<'a>(&'a mut Vec); - -macro_rules! s( - ($s:expr, $k:expr, $v:expr) => { - try!(write!($s.0, ", {}: {}", $k, $v)); - }; -); - -impl<'a> slog::ser::Serializer for KvSer<'a> { - fn emit_none(&mut self, key: &str) -> slog::Result { - s!(self, key, "None"); - Ok(()) - } - fn emit_unit(&mut self, key: &str) -> slog::Result { - s!(self, key, "()"); - Ok(()) - } - - fn emit_bool(&mut self, key: &str, val: bool) -> slog::Result { - s!(self, key, val); - Ok(()) - } - - fn emit_char(&mut self, key: &str, val: char) -> slog::Result { - s!(self, key, val); - Ok(()) - } - - fn emit_usize(&mut self, key: &str, val: usize) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_isize(&mut self, key: &str, val: isize) -> slog::Result { - s!(self, key, val); - Ok(()) - } - - fn emit_u8(&mut self, key: &str, val: u8) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_i8(&mut self, key: &str, val: i8) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_u16(&mut self, key: &str, val: u16) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_i16(&mut self, key: &str, val: i16) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_u32(&mut self, key: &str, val: u32) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_i32(&mut self, key: &str, val: i32) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_f32(&mut self, key: &str, val: f32) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_u64(&mut self, key: &str, val: u64) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_i64(&mut self, key: &str, val: i64) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_f64(&mut self, key: &str, val: f64) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_str(&mut self, key: &str, val: &str) -> slog::Result { - s!(self, key, val); - Ok(()) - } - fn emit_arguments( - &mut self, - key: &str, - val: &fmt::Arguments, - ) -> slog::Result { - s!(self, key, val); - Ok(()) - } -} - -#[allow(unused_variables, unused_imports)] -#[cfg(test)] -mod tests { - use super::*; - use test::{black_box, Bencher}; - - #[test] - #[ignore] - fn it_creates_a_logger() { - let wm = WarningsManager::new("rust-test"); - let im = influx::writer(wm.tx.clone()); - let drain = - WarningsDrain { - tx: Arc::new(Mutex::new(wm.tx.clone())), - drain: slog::Discard, - to_file: Logger::root(slog::Discard, o!()), - level: Level::Trace, - }; - let logger = slog::Logger::root(drain, o!()); - } - - #[bench] - fn it_sends_integers_with_a_sender_behind_a_mutex(b: &mut Bencher) { - let (tx, rx) = channel(); - enum Msg { - Val(usize), - Terminate - } - let worker = thread::spawn(move || { - let mut xs = Vec::new(); - loop { - match rx.recv().unwrap() { - Msg::Val(x) => { xs.push(x); } - Msg::Terminate => break, - } - } - xs.len() - }); - let tx = Arc::new(Mutex::new(tx)); - b.iter(|| { - let lock = tx.lock().unwrap(); - let _ = lock.send(Msg::Val(1)); - }); - let _ = tx.lock().unwrap().send(Msg::Terminate); - let len = worker.join().unwrap(); - //println!("{}", len); - } -}