diff --git a/Cargo.toml b/Cargo.toml index 0b1c12e..42c2590 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,8 @@ pretty_toa = "1.0.0" sloggers = "0.3" #sloggers = { path = "../sloggers" } -decimal = { path = "../decimal", version = "2" } +#decimal = { path = "../decimal", version = "2" } +decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } #windows = { path = "../windows", version = "0.2" } money = { path = "../money", version = "0.3" } @@ -56,6 +57,7 @@ disable-short-uuid = [] warnings = [] inlines = [] latency = ["pubsub"] +string-tags = [] [profile.bench] lto = true diff --git a/src/influx.rs b/src/influx.rs index d95a73d..d2f9e27 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -3,7 +3,8 @@ use std::io::Read; use std::sync::Arc; -use std::sync::mpsc::{Sender, Receiver, channel, SendError}; +//use std::sync::mpsc::{Sender, Receiver, channel, SendError}; +use crossbeam_channel::{Sender, Receiver, bounded, SendError}; use std::{thread, mem}; use std::time::*; use std::hash::BuildHasherDefault; @@ -96,7 +97,7 @@ impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } } /// use logging::influx::*; /// /// fn main() { -/// let (tx, rx) = channel(); +/// let (tx, rx) = bounded(1024); /// /// // "shorthand" syntax /// @@ -331,7 +332,7 @@ impl InfluxWriter { } pub fn placeholder() -> Self { - let (tx, _) = channel(); + let (tx, _) = bounded(1024); Self { host: String::new(), db: String::new(), @@ -350,7 +351,7 @@ impl InfluxWriter { let logger = logger.new(o!( "host" => host.to_string(), "db" => db.to_string())); - let (tx, rx): (Sender>, Receiver>) = channel(); + let (tx, rx): (Sender>, Receiver>) = bounded(1024); let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]) @@ -901,8 +902,12 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { line.push_str(&escape(value)); }; - for &(key, value) in measurement.tags.iter() { + for (key, value) in measurement.tags.iter() { + #[cfg(not(feature = "string-tags"))] add_tag(line, key, value); + + #[cfg(feature = "string-tags")] + add_tag(line, key, value.as_str()); } let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| { @@ -1037,7 +1042,10 @@ pub struct OwnedMeasurement { //pub fields: Map<&'static str, OwnedValue>, //pub tags: Map<&'static str, &'static str>, pub fields: SmallVec<[(&'static str, OwnedValue); 8]>, + #[cfg(not(feature = "string-tags"))] pub tags: SmallVec<[(&'static str, &'static str); 8]>, + #[cfg(feature = "string-tags")] + pub tags: SmallVec<[(&'static str, String); 8]>, } impl OwnedMeasurement { @@ -1061,11 +1069,18 @@ impl OwnedMeasurement { /// Unusual consuming `self` signature because primarily used by /// the `measure!` macro. + #[cfg(not(feature = "string-tags"))] pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { self.tags.push((key, value)); self } + #[cfg(feature = "string-tags")] + pub fn add_tag(mut self, key: &'static str, value: S) -> Self { + self.tags.push((key, value.to_string())); + self + } + /// Unusual consuming `self` signature because primarily used by /// the `measure!` macro. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { @@ -1078,6 +1093,7 @@ impl OwnedMeasurement { self } + #[cfg(not(feature = "string-tags"))] pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self { match self.tags.iter().position(|kv| kv.0 == key) { Some(i) => { @@ -1100,6 +1116,7 @@ impl OwnedMeasurement { .map(|kv| &kv.1) } + #[cfg(not(feature = "string-tags"))] pub fn get_tag(&self, key: &'static str) -> Option<&'static str> { self.tags.iter() .find(|kv| kv.0 == key) @@ -1224,7 +1241,7 @@ mod tests { #[test] fn it_checks_color_tag_error_in_non_doctest() { - let (tx, rx) = channel(); + let (tx, rx) = bounded(1024); measure!(tx, test, tag[color;"red"], int[n;1]); let meas: OwnedMeasurement = rx.recv().unwrap(); assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas); @@ -1252,7 +1269,7 @@ mod tests { #[test] fn it_uses_the_measure_macro() { - let (tx, rx) = channel(); + let (tx, rx) = bounded(1024); measure!(tx, test_measurement, tag [ one => "a" ], tag [ two => "b" ], @@ -1276,7 +1293,7 @@ mod tests { #[test] fn it_uses_measure_macro_for_d128_and_uuid() { - let (tx, rx) = channel(); + let (tx, rx) = bounded(1024); let u = Uuid::new_v4(); let d = d128::zero(); let t = now(); @@ -1299,7 +1316,7 @@ mod tests { #[test] fn it_uses_the_measure_macro_alt_syntax() { - let (tx, rx) = channel(); + let (tx, rx) = bounded(1024); measure!(tx, test_measurement, tag[one; "a"], tag[two; "b"], @@ -1335,7 +1352,7 @@ mod tests { #[test] fn try_to_break_measure_macro() { - let (tx, _) = channel(); + let (tx, _) = bounded(1024); measure!(tx, one, tag[x=>"y"], int[n;1]); measure!(tx, one, tag[x;"y"], int[n;1],); @@ -1357,7 +1374,7 @@ mod tests { #[bench] fn measure_macro_small(b: &mut Bencher) { - let (tx, rx) = channel(); + let (tx, rx) = bounded(1024); let listener = thread::spawn(move || { loop { if rx.recv().is_err() { break } } }); @@ -1368,7 +1385,7 @@ mod tests { #[bench] fn measure_macro_medium(b: &mut Bencher) { - let (tx, rx) = channel(); + let (tx, rx) = bounded(1024); let listener = thread::spawn(move || { loop { if rx.recv().is_err() { break } } }); @@ -1392,7 +1409,7 @@ mod tests { fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() { let ctx = zmq::Context::new(); let socket = push(&ctx).unwrap(); - let (tx, rx) = channel(); + let (tx, rx) = bounded(1024); let w = writer(tx.clone()); let mut buf = String::with_capacity(4096); let mut meas = Measurement::new("rust_test");