Browse Source

min changes to allow string tags in feature gate

master
Jonathan Strong 5 years ago
parent
commit
86542209b1
2 changed files with 33 additions and 14 deletions
  1. +3
    -1
      Cargo.toml
  2. +30
    -13
      src/influx.rs

+ 3
- 1
Cargo.toml View File

@@ -35,7 +35,8 @@ pretty_toa = "1.0.0"
sloggers = "0.3" sloggers = "0.3"
#sloggers = { path = "../sloggers" } #sloggers = { path = "../sloggers" }


decimal = { path = "../decimal", version = "2" }
#decimal = { path = "../decimal", version = "2" }
decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" }


#windows = { path = "../windows", version = "0.2" } #windows = { path = "../windows", version = "0.2" }
money = { path = "../money", version = "0.3" } money = { path = "../money", version = "0.3" }
@@ -56,6 +57,7 @@ disable-short-uuid = []
warnings = [] warnings = []
inlines = [] inlines = []
latency = ["pubsub"] latency = ["pubsub"]
string-tags = []


[profile.bench] [profile.bench]
lto = true lto = true

+ 30
- 13
src/influx.rs View File

@@ -3,7 +3,8 @@


use std::io::Read; use std::io::Read;
use std::sync::Arc; use std::sync::Arc;
use std::sync::mpsc::{Sender, Receiver, channel, SendError};
//use std::sync::mpsc::{Sender, Receiver, channel, SendError};
use crossbeam_channel::{Sender, Receiver, bounded, SendError};
use std::{thread, mem}; use std::{thread, mem};
use std::time::*; use std::time::*;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
@@ -96,7 +97,7 @@ impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
/// use logging::influx::*; /// use logging::influx::*;
/// ///
/// fn main() { /// fn main() {
/// let (tx, rx) = channel();
/// let (tx, rx) = bounded(1024);
/// ///
/// // "shorthand" syntax /// // "shorthand" syntax
/// ///
@@ -331,7 +332,7 @@ impl InfluxWriter {
} }


pub fn placeholder() -> Self { pub fn placeholder() -> Self {
let (tx, _) = channel();
let (tx, _) = bounded(1024);
Self { Self {
host: String::new(), host: String::new(),
db: String::new(), db: String::new(),
@@ -350,7 +351,7 @@ impl InfluxWriter {
let logger = logger.new(o!( let logger = logger.new(o!(
"host" => host.to_string(), "host" => host.to_string(),
"db" => db.to_string())); "db" => db.to_string()));
let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(1024);
let url = let url =
Url::parse_with_params(&format!("http://{}:8086/write", host), Url::parse_with_params(&format!("http://{}:8086/write", host),
&[("db", db), ("precision", "ns")]) &[("db", db), ("precision", "ns")])
@@ -901,8 +902,12 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
line.push_str(&escape(value)); line.push_str(&escape(value));
}; };


for &(key, value) in measurement.tags.iter() {
for (key, value) in measurement.tags.iter() {
#[cfg(not(feature = "string-tags"))]
add_tag(line, key, value); add_tag(line, key, value);

#[cfg(feature = "string-tags")]
add_tag(line, key, value.as_str());
} }


let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| { let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
@@ -1037,7 +1042,10 @@ pub struct OwnedMeasurement {
//pub fields: Map<&'static str, OwnedValue>, //pub fields: Map<&'static str, OwnedValue>,
//pub tags: Map<&'static str, &'static str>, //pub tags: Map<&'static str, &'static str>,
pub fields: SmallVec<[(&'static str, OwnedValue); 8]>, pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
#[cfg(not(feature = "string-tags"))]
pub tags: SmallVec<[(&'static str, &'static str); 8]>, pub tags: SmallVec<[(&'static str, &'static str); 8]>,
#[cfg(feature = "string-tags")]
pub tags: SmallVec<[(&'static str, String); 8]>,
} }


impl OwnedMeasurement { impl OwnedMeasurement {
@@ -1061,11 +1069,18 @@ impl OwnedMeasurement {


/// Unusual consuming `self` signature because primarily used by /// Unusual consuming `self` signature because primarily used by
/// the `measure!` macro. /// the `measure!` macro.
#[cfg(not(feature = "string-tags"))]
pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
self.tags.push((key, value)); self.tags.push((key, value));
self self
} }


#[cfg(feature = "string-tags")]
pub fn add_tag<S: ToString>(mut self, key: &'static str, value: S) -> Self {
self.tags.push((key, value.to_string()));
self
}

/// Unusual consuming `self` signature because primarily used by /// Unusual consuming `self` signature because primarily used by
/// the `measure!` macro. /// the `measure!` macro.
pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
@@ -1078,6 +1093,7 @@ impl OwnedMeasurement {
self self
} }


#[cfg(not(feature = "string-tags"))]
pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self { pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
match self.tags.iter().position(|kv| kv.0 == key) { match self.tags.iter().position(|kv| kv.0 == key) {
Some(i) => { Some(i) => {
@@ -1100,6 +1116,7 @@ impl OwnedMeasurement {
.map(|kv| &kv.1) .map(|kv| &kv.1)
} }


#[cfg(not(feature = "string-tags"))]
pub fn get_tag(&self, key: &'static str) -> Option<&'static str> { pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
self.tags.iter() self.tags.iter()
.find(|kv| kv.0 == key) .find(|kv| kv.0 == key)
@@ -1224,7 +1241,7 @@ mod tests {


#[test] #[test]
fn it_checks_color_tag_error_in_non_doctest() { fn it_checks_color_tag_error_in_non_doctest() {
let (tx, rx) = channel();
let (tx, rx) = bounded(1024);
measure!(tx, test, tag[color;"red"], int[n;1]); measure!(tx, test, tag[color;"red"], int[n;1]);
let meas: OwnedMeasurement = rx.recv().unwrap(); let meas: OwnedMeasurement = rx.recv().unwrap();
assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas); assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
@@ -1252,7 +1269,7 @@ mod tests {


#[test] #[test]
fn it_uses_the_measure_macro() { fn it_uses_the_measure_macro() {
let (tx, rx) = channel();
let (tx, rx) = bounded(1024);
measure!(tx, test_measurement, measure!(tx, test_measurement,
tag [ one => "a" ], tag [ one => "a" ],
tag [ two => "b" ], tag [ two => "b" ],
@@ -1276,7 +1293,7 @@ mod tests {
#[test] #[test]
fn it_uses_measure_macro_for_d128_and_uuid() { fn it_uses_measure_macro_for_d128_and_uuid() {


let (tx, rx) = channel();
let (tx, rx) = bounded(1024);
let u = Uuid::new_v4(); let u = Uuid::new_v4();
let d = d128::zero(); let d = d128::zero();
let t = now(); let t = now();
@@ -1299,7 +1316,7 @@ mod tests {
#[test] #[test]
fn it_uses_the_measure_macro_alt_syntax() { fn it_uses_the_measure_macro_alt_syntax() {


let (tx, rx) = channel();
let (tx, rx) = bounded(1024);
measure!(tx, test_measurement, measure!(tx, test_measurement,
tag[one; "a"], tag[one; "a"],
tag[two; "b"], tag[two; "b"],
@@ -1335,7 +1352,7 @@ mod tests {


#[test] #[test]
fn try_to_break_measure_macro() { fn try_to_break_measure_macro() {
let (tx, _) = channel();
let (tx, _) = bounded(1024);
measure!(tx, one, tag[x=>"y"], int[n;1]); measure!(tx, one, tag[x=>"y"], int[n;1]);
measure!(tx, one, tag[x;"y"], int[n;1],); measure!(tx, one, tag[x;"y"], int[n;1],);


@@ -1357,7 +1374,7 @@ mod tests {


#[bench] #[bench]
fn measure_macro_small(b: &mut Bencher) { fn measure_macro_small(b: &mut Bencher) {
let (tx, rx) = channel();
let (tx, rx) = bounded(1024);
let listener = thread::spawn(move || { let listener = thread::spawn(move || {
loop { if rx.recv().is_err() { break } } loop { if rx.recv().is_err() { break } }
}); });
@@ -1368,7 +1385,7 @@ mod tests {


#[bench] #[bench]
fn measure_macro_medium(b: &mut Bencher) { fn measure_macro_medium(b: &mut Bencher) {
let (tx, rx) = channel();
let (tx, rx) = bounded(1024);
let listener = thread::spawn(move || { let listener = thread::spawn(move || {
loop { if rx.recv().is_err() { break } } loop { if rx.recv().is_err() { break } }
}); });
@@ -1392,7 +1409,7 @@ mod tests {
fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() { fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
let ctx = zmq::Context::new(); let ctx = zmq::Context::new();
let socket = push(&ctx).unwrap(); let socket = push(&ctx).unwrap();
let (tx, rx) = channel();
let (tx, rx) = bounded(1024);
let w = writer(tx.clone()); let w = writer(tx.clone());
let mut buf = String::with_capacity(4096); let mut buf = String::with_capacity(4096);
let mut meas = Measurement::new("rust_test"); let mut meas = Measurement::new("rust_test");


Loading…
Cancel
Save