|
@@ -30,35 +30,141 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write"; |
|
|
const ZMQ_RCV_HWM: i32 = 0; |
|
|
const ZMQ_RCV_HWM: i32 = 0; |
|
|
const ZMQ_SND_HWM: i32 = 0; |
|
|
const ZMQ_SND_HWM: i32 = 0; |
|
|
|
|
|
|
|
|
/// # Examples |
|
|
|
|
|
|
|
|
/// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`. |
|
|
|
|
|
/// |
|
|
|
|
|
/// The macro both creates an `OwnedMeasurement` from the supplied tags and |
|
|
|
|
|
/// values, as well as sends it with the `Sender`. |
|
|
/// |
|
|
/// |
|
|
/// ```rust,ignore |
|
|
|
|
|
|
|
|
/// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized |
|
|
|
|
|
/// measurement (see `tests` mod). |
|
|
/// |
|
|
/// |
|
|
/// let M = // ... Sender<OwnedMeasurement> |
|
|
|
|
|
|
|
|
/// # Examples |
|
|
/// |
|
|
/// |
|
|
/// measure![m; meas_name] { |
|
|
|
|
|
/// tag [ "ticker" => "xmr_btc" ], |
|
|
|
|
|
/// int [ "len" => 2 ] |
|
|
|
|
|
/// float [ "x" => 1.234 ] |
|
|
|
|
|
/// time [ now() ] |
|
|
|
|
|
/// }; |
|
|
|
|
|
/// ``` |
|
|
/// ``` |
|
|
|
|
|
/// #[macro_use] extern crate logging; |
|
|
|
|
|
/// |
|
|
|
|
|
/// use std::sync::mpsc::channel; |
|
|
|
|
|
/// use logging::influx::*; |
|
|
|
|
|
/// |
|
|
|
|
|
/// fn main() { |
|
|
|
|
|
/// let (tx, rx) = channel(); |
|
|
|
|
|
/// |
|
|
|
|
|
/// // "shorthand" syntax |
|
|
|
|
|
/// |
|
|
|
|
|
/// measure!(tx, test, tag[color;"red"], int[n;1]); |
|
|
|
|
|
/// |
|
|
|
|
|
/// let meas: OwnedMeasurement = rx.recv().unwrap(); |
|
|
|
|
|
/// |
|
|
|
|
|
/// assert_eq!(meas.key, "test"); |
|
|
|
|
|
/// assert_eq!(meas.tags.get("color"), Some(&"red")); |
|
|
|
|
|
/// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1))); |
|
|
/// |
|
|
/// |
|
|
/// Resolves to: |
|
|
|
|
|
|
|
|
/// // alternate syntax ... |
|
|
/// |
|
|
/// |
|
|
/// ```rust,ignore |
|
|
|
|
|
/// let measurements = // ... |
|
|
|
|
|
|
|
|
/// measure!(tx, test, |
|
|
|
|
|
/// tag [ one => "a" ], |
|
|
|
|
|
/// tag [ two => "b" ], |
|
|
|
|
|
/// int [ three => 2 ], |
|
|
|
|
|
/// float [ four => 1.2345 ], |
|
|
|
|
|
/// string [ five => String::from("d") ], |
|
|
|
|
|
/// bool [ six => true ], |
|
|
|
|
|
/// int [ seven => { 1 + 2 } ], |
|
|
|
|
|
/// time [ 1 ] |
|
|
|
|
|
/// ); |
|
|
/// |
|
|
/// |
|
|
/// measurements.send( |
|
|
|
|
|
/// OwnedMeasurement::new("meas_name") |
|
|
|
|
|
/// .add_tag("ticker", "xmr_btc") |
|
|
|
|
|
/// .add_field("len", OwnedValue::Integer(2)) |
|
|
|
|
|
/// .add_field("x", OwnedValue::Float(1.234)) |
|
|
|
|
|
/// .set_timestamp(now() as i64)); |
|
|
|
|
|
|
|
|
/// let meas: OwnedMeasurement = rx.recv().unwrap(); |
|
|
|
|
|
/// |
|
|
|
|
|
/// assert_eq!(meas.key, "test"); |
|
|
|
|
|
/// assert_eq!(meas.tags.get("one"), Some(&"a")); |
|
|
|
|
|
/// assert_eq!(meas.tags.get("two"), Some(&"b")); |
|
|
|
|
|
/// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); |
|
|
|
|
|
/// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); |
|
|
|
|
|
/// assert_eq!(meas.timestamp, Some(1)); |
|
|
|
|
|
/// } |
|
|
/// ``` |
|
|
/// ``` |
|
|
/// |
|
|
/// |
|
|
|
|
|
#[macro_export] |
|
|
macro_rules! measure { |
|
|
macro_rules! measure { |
|
|
() => {} |
|
|
|
|
|
|
|
|
(@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; |
|
|
|
|
|
(@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; |
|
|
|
|
|
(@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) }; |
|
|
|
|
|
|
|
|
|
|
|
(@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; |
|
|
|
|
|
(@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; |
|
|
|
|
|
(@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; |
|
|
|
|
|
(@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; |
|
|
|
|
|
(@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) }; |
|
|
|
|
|
|
|
|
|
|
|
($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ |
|
|
|
|
|
let mut meas = $crate::influx::OwnedMeasurement::new(stringify!($name)); |
|
|
|
|
|
$( |
|
|
|
|
|
measure!(@kv $t, meas, $($tail)*); |
|
|
|
|
|
)* |
|
|
|
|
|
let _ = $m.send(meas); |
|
|
|
|
|
}}; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|
|
fn it_checks_color_tag_error_in_non_doctest() { |
|
|
|
|
|
let (tx, rx) = channel(); |
|
|
|
|
|
measure!(tx, test, tag[color;"red"], int[n;1]); |
|
|
|
|
|
let meas: OwnedMeasurement = rx.recv().unwrap(); |
|
|
|
|
|
assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|
|
fn it_uses_the_measure_macro() { |
|
|
|
|
|
let (tx, rx) = channel(); |
|
|
|
|
|
measure!(tx, test_measurement, |
|
|
|
|
|
tag [ one => "a" ], |
|
|
|
|
|
tag [ two => "b" ], |
|
|
|
|
|
int [ three => 2 ], |
|
|
|
|
|
float [ four => 1.2345 ], |
|
|
|
|
|
string [ five => String::from("d") ], |
|
|
|
|
|
bool [ six => true ], |
|
|
|
|
|
int [ seven => { 1 + 2 } ], |
|
|
|
|
|
time [ 1 ] |
|
|
|
|
|
); |
|
|
|
|
|
thread::sleep_ms(10); |
|
|
|
|
|
let meas: OwnedMeasurement = rx.try_recv().unwrap(); |
|
|
|
|
|
assert_eq!(meas.key, "test_measurement"); |
|
|
|
|
|
assert_eq!(meas.tags.get("one"), Some(&"a")); |
|
|
|
|
|
assert_eq!(meas.tags.get("two"), Some(&"b")); |
|
|
|
|
|
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); |
|
|
|
|
|
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); |
|
|
|
|
|
assert_eq!(meas.timestamp, Some(1)); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|
|
fn it_uses_the_measure_macro_alt_syntax() { |
|
|
|
|
|
|
|
|
|
|
|
let (tx, rx) = channel(); |
|
|
|
|
|
measure!(tx, test_measurement, |
|
|
|
|
|
tag[one; "a"], |
|
|
|
|
|
tag[two; "b"], |
|
|
|
|
|
int[three; 2], |
|
|
|
|
|
float[four; 1.2345], |
|
|
|
|
|
string[five; String::from("d")], |
|
|
|
|
|
bool [ six => true ], |
|
|
|
|
|
int[seven; { 1 + 2 }], |
|
|
|
|
|
time[1] |
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
thread::sleep_ms(10); |
|
|
|
|
|
let meas: OwnedMeasurement = rx.try_recv().unwrap(); |
|
|
|
|
|
assert_eq!(meas.key, "test_measurement"); |
|
|
|
|
|
assert_eq!(meas.tags.get("one"), Some(&"a")); |
|
|
|
|
|
assert_eq!(meas.tags.get("two"), Some(&"b")); |
|
|
|
|
|
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2))); |
|
|
|
|
|
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3))); |
|
|
|
|
|
assert_eq!(meas.timestamp, Some(1)); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|
|
fn try_to_break_measure_macro() { |
|
|
|
|
|
let (tx, _) = channel(); |
|
|
|
|
|
measure!(tx, one, tag[x=>"y"], int[n;1]); |
|
|
|
|
|
measure!(tx, one, tag[x;"y"], int[n;1],); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { |
|
|
pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { |
|
@@ -491,6 +597,37 @@ mod tests { |
|
|
use super::*; |
|
|
use super::*; |
|
|
use test::{black_box, Bencher}; |
|
|
use test::{black_box, Bencher}; |
|
|
|
|
|
|
|
|
|
|
|
#[bench] |
|
|
|
|
|
fn measure_macro_small(b: &mut Bencher) { |
|
|
|
|
|
let (tx, rx) = channel(); |
|
|
|
|
|
let listener = thread::spawn(move || { |
|
|
|
|
|
loop { if rx.recv().is_err() { break } } |
|
|
|
|
|
}); |
|
|
|
|
|
b.iter(|| { |
|
|
|
|
|
measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]); |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[bench] |
|
|
|
|
|
fn measure_macro_medium(b: &mut Bencher) { |
|
|
|
|
|
let (tx, rx) = channel(); |
|
|
|
|
|
let listener = thread::spawn(move || { |
|
|
|
|
|
loop { if rx.recv().is_err() { break } } |
|
|
|
|
|
}); |
|
|
|
|
|
b.iter(|| { |
|
|
|
|
|
measure!(tx, test, |
|
|
|
|
|
tag[color; "red"], |
|
|
|
|
|
tag[mood => "playful"], |
|
|
|
|
|
tag [ ticker => "xmr_btc" ], |
|
|
|
|
|
float[ price => 1.2345 ], |
|
|
|
|
|
float[ amount => 56.323], |
|
|
|
|
|
int[n; 1], |
|
|
|
|
|
time[now()] |
|
|
|
|
|
); |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
#[test] |
|
|
#[ignore] |
|
|
#[ignore] |
|
|
fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() { |
|
|
fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() { |
|
|