diff --git a/src/influx.rs b/src/influx.rs index 5365077..2df7379 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -30,35 +30,141 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write"; const ZMQ_RCV_HWM: i32 = 0; const ZMQ_SND_HWM: i32 = 0; -/// # Examples +/// Provides flexible and ergonomic use of `Sender`. +/// +/// The macro both creates an `OwnedMeasurement` from the supplied tags and +/// values, as well as sends it with the `Sender`. /// -/// ```rust,ignore +/// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized +/// measurement (see `tests` mod). /// -/// let M = // ... Sender +/// # Examples /// -/// measure![m; meas_name] { -/// tag [ "ticker" => "xmr_btc" ], -/// int [ "len" => 2 ] -/// float [ "x" => 1.234 ] -/// time [ now() ] -/// }; /// ``` +/// #[macro_use] extern crate logging; +/// +/// use std::sync::mpsc::channel; +/// use logging::influx::*; +/// +/// fn main() { +/// let (tx, rx) = channel(); +/// +/// // "shorthand" syntax +/// +/// measure!(tx, test, tag[color;"red"], int[n;1]); +/// +/// let meas: OwnedMeasurement = rx.recv().unwrap(); +/// +/// assert_eq!(meas.key, "test"); +/// assert_eq!(meas.tags.get("color"), Some(&"red")); +/// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1))); /// -/// Resolves to: +/// // alternate syntax ... /// -/// ```rust,ignore -/// let measurements = // ... +/// measure!(tx, test, +/// tag [ one => "a" ], +/// tag [ two => "b" ], +/// int [ three => 2 ], +/// float [ four => 1.2345 ], +/// string [ five => String::from("d") ], +/// bool [ six => true ], +/// int [ seven => { 1 + 2 } ], +/// time [ 1 ] +/// ); /// -/// measurements.send( -/// OwnedMeasurement::new("meas_name") -/// .add_tag("ticker", "xmr_btc") -/// .add_field("len", OwnedValue::Integer(2)) -/// .add_field("x", OwnedValue::Float(1.234)) -/// .set_timestamp(now() as i64)); +/// let meas: OwnedMeasurement = rx.recv().unwrap(); +/// +/// assert_eq!(meas.key, "test"); +/// assert_eq!(meas.tags.get("one"), Some(&"a")); +/// assert_eq!(meas.tags.get("two"), Some(&"b")); +/// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); +/// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); +/// assert_eq!(meas.timestamp, Some(1)); +/// } /// ``` /// +#[macro_export] macro_rules! measure { - () => {} + (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; + (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; + (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) }; + + (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; + (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; + (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; + (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; + (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) }; + + ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ + let mut meas = $crate::influx::OwnedMeasurement::new(stringify!($name)); + $( + measure!(@kv $t, meas, $($tail)*); + )* + let _ = $m.send(meas); + }}; +} + +#[test] +fn it_checks_color_tag_error_in_non_doctest() { + let (tx, rx) = channel(); + measure!(tx, test, tag[color;"red"], int[n;1]); + let meas: OwnedMeasurement = rx.recv().unwrap(); + assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas); +} + +#[test] +fn it_uses_the_measure_macro() { + let (tx, rx) = channel(); + measure!(tx, test_measurement, + tag [ one => "a" ], + tag [ two => "b" ], + int [ three => 2 ], + float [ four => 1.2345 ], + string [ five => String::from("d") ], + bool [ six => true ], + int [ seven => { 1 + 2 } ], + time [ 1 ] + ); + thread::sleep_ms(10); + let meas: OwnedMeasurement = rx.try_recv().unwrap(); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.tags.get("one"), Some(&"a")); + assert_eq!(meas.tags.get("two"), Some(&"b")); + assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); + assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); + assert_eq!(meas.timestamp, Some(1)); +} + +#[test] +fn it_uses_the_measure_macro_alt_syntax() { + + let (tx, rx) = channel(); + measure!(tx, test_measurement, + tag[one; "a"], + tag[two; "b"], + int[three; 2], + float[four; 1.2345], + string[five; String::from("d")], + bool [ six => true ], + int[seven; { 1 + 2 }], + time[1] + ); + + thread::sleep_ms(10); + let meas: OwnedMeasurement = rx.try_recv().unwrap(); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.tags.get("one"), Some(&"a")); + assert_eq!(meas.tags.get("two"), Some(&"b")); + assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); + assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); + assert_eq!(meas.timestamp, Some(1)); +} + +#[test] +fn try_to_break_measure_macro() { + let (tx, _) = channel(); + measure!(tx, one, tag[x=>"y"], int[n;1]); + measure!(tx, one, tag[x;"y"], int[n;1],); } pub fn pull(ctx: &zmq::Context) -> Result { @@ -491,6 +597,37 @@ mod tests { use super::*; use test::{black_box, Bencher}; + #[bench] + fn measure_macro_small(b: &mut Bencher) { + let (tx, rx) = channel(); + let listener = thread::spawn(move || { + loop { if rx.recv().is_err() { break } } + }); + b.iter(|| { + measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]); + }); + } + + #[bench] + fn measure_macro_medium(b: &mut Bencher) { + let (tx, rx) = channel(); + let listener = thread::spawn(move || { + loop { if rx.recv().is_err() { break } } + }); + b.iter(|| { + measure!(tx, test, + tag[color; "red"], + tag[mood => "playful"], + tag [ ticker => "xmr_btc" ], + float[ price => 1.2345 ], + float[ amount => 56.323], + int[n; 1], + time[now()] + ); + }); + } + + #[test] #[ignore] fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {