diff --git a/src/influx.rs b/src/influx.rs index e6c679d..a853ba1 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -1,5 +1,5 @@ //! Utilities to efficiently send data to influx -//! +//! use std::io::Read; use std::sync::Arc; @@ -128,13 +128,13 @@ impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } } /// assert_eq!(meas.timestamp, Some(1)); /// /// // use the @make_meas flag to skip sending a measurement, instead merely -/// // creating it. +/// // creating it. /// /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]); /// /// // each variant also has shorthand aliases /// -/// let meas: OwnedMeasurement = +/// let meas: OwnedMeasurement = /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]); /// } /// ``` @@ -165,7 +165,7 @@ macro_rules! measure { (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) }; (@as_expr $e:expr) => {$e}; - + (@count_tags) => {0usize}; (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; @@ -182,7 +182,7 @@ macro_rules! measure { (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ let n_tags = measure!(@count_tags $($t)*); let n_fields = measure!(@count_fields $($t)*); - let mut meas = + let mut meas = $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); $( measure!(@kv $t, meas, $($tail)*); @@ -202,10 +202,10 @@ macro_rules! measure { }}; } -/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s -/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` -/// measurements have accumulated. -/// +/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s +/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` +/// measurements have accumulated. +/// #[derive(Debug)] pub struct InfluxWriter { host: String, @@ -238,8 +238,8 @@ impl Clone for InfluxWriter { } impl InfluxWriter { - /// Sends the `OwnedMeasurement` to the serialization thread. - /// + /// Sends the `OwnedMeasurement` to the serialization thread. + /// #[cfg_attr(feature = "inlines", inline)] pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError>> { self.tx.send(Some(m)) @@ -365,14 +365,14 @@ impl InfluxWriter { } Ok(None) => { - if buf.len() > 0 { + if buf.len() > 0 { debug!(logger, "sending buffer to influx"; "len" => count); - send(&buf) + send(&buf) } break } - _ => { + _ => { thread::sleep(Duration::new(0, 1)) } } @@ -463,9 +463,9 @@ pub fn now() -> i64 { /// Serialize the measurement into influx line protocol /// and append to the buffer. -/// +/// /// # Examples -/// +/// /// ``` /// extern crate influent; /// extern crate logging; @@ -480,9 +480,9 @@ pub fn now() -> i64 { /// m.add_field("x", Value::Integer(1)); /// serialize(&m, &mut buf); /// } -/// +/// /// ``` -/// +/// pub fn serialize(measurement: &Measurement, line: &mut String) { line.push_str(&escape(measurement.key)); @@ -517,11 +517,11 @@ pub fn serialize(measurement: &Measurement, line: &mut String) { } } -/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. -/// +/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. +/// /// The serialized measurement is appended to the end of the string without -/// any regard for what exited in it previously. -/// +/// any regard for what exited in it previously. +/// pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { line.push_str(&escape_tag(measurement.key)); @@ -604,7 +604,7 @@ pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { if let Ok(bytes) = socket.recv_bytes(0) { if let Ok(msg) = String::from_utf8(bytes) { count = match count { - 0 => { + 0 => { buf.push_str(&msg); 1 } @@ -736,7 +736,6 @@ impl OwnedMeasurement { .find(|kv| kv.0 == key) .map(|kv| kv.1) } - } #[allow(unused_imports, unused_variables)] @@ -831,8 +830,8 @@ mod tests { fn influx_writer_send_price(b: &mut Bencher) { let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); b.iter(|| { - measure!(m, test, - t(ticker, t!(xmr-btc).as_str()), + measure!(m, test, + t(ticker, t!(xmr-btc).as_str()), t(exchange, "plnx"), d(bid, d128::zero()), d(ask, d128::zero()), @@ -991,13 +990,13 @@ mod tests { loop { if rx.recv().is_err() { break } } }); b.iter(|| { - measure!(tx, test, - tag[color; "red"], + measure!(tx, test, + tag[color; "red"], tag[mood => "playful"], tag [ ticker => "xmr_btc" ], float[ price => 1.2345 ], float[ amount => 56.323], - int[n; 1], + int[n; 1], time[now()] ); }); @@ -1069,13 +1068,12 @@ mod tests { panic!(why) } } - } #[bench] fn serialize_owned_longer(b: &mut Bencher) { let mut buf = String::with_capacity(1024); - let m = + let m = OwnedMeasurement::new("test") .add_tag("one", "a") .add_tag("two", "b") @@ -1096,7 +1094,7 @@ mod tests { #[bench] fn serialize_owned_simple(b: &mut Bencher) { let mut buf = String::with_capacity(1024); - let m = + let m = OwnedMeasurement::new("test") .add_tag("one", "a") .add_tag("two", "b") @@ -1109,7 +1107,6 @@ mod tests { }); } - #[test] fn it_serializes_a_hard_to_serialize_message_from_owned() { let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;