Browse Source

replaces Vec with SmallVec for backing storage of `OwnedMeasurement`

master
Jonathan Strong 6 years ago
parent
commit
a87cb002ac
3 changed files with 77 additions and 43 deletions
  1. +2
    -1
      Cargo.toml
  2. +74
    -42
      src/influx.rs
  3. +1
    -0
      src/lib.rs

+ 2
- 1
Cargo.toml View File

@@ -1,6 +1,6 @@
[package] [package]
name = "logging" name = "logging"
version = "0.4.1"
version = "0.4.2"
authors = ["Jonathan Strong <jstrong@legis.io>"] authors = ["Jonathan Strong <jstrong@legis.io>"]


[[example]] [[example]]
@@ -25,6 +25,7 @@ fnv = "1"
uuid = { version = "0.5", features = ["serde", "v4"] } uuid = { version = "0.5", features = ["serde", "v4"] }
hdrhistogram = "6" hdrhistogram = "6"
slog-async = "2" slog-async = "2"
smallvec = "0.6"


sloggers = { path = "../sloggers" } sloggers = { path = "../sloggers" }




+ 74
- 42
src/influx.rs View File

@@ -22,6 +22,7 @@ use ordermap::OrderMap;
use fnv::FnvHasher; use fnv::FnvHasher;
use decimal::d128; use decimal::d128;
use uuid::Uuid; use uuid::Uuid;
use smallvec::SmallVec;


use super::{nanos, file_logger, LOG_LEVEL}; use super::{nanos, file_logger, LOG_LEVEL};
#[cfg(feature = "warnings")] #[cfg(feature = "warnings")]
@@ -63,8 +64,8 @@ pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
/// let meas: OwnedMeasurement = rx.recv().unwrap(); /// let meas: OwnedMeasurement = rx.recv().unwrap();
/// ///
/// assert_eq!(meas.key, "test"); /// assert_eq!(meas.key, "test");
/// assert_eq!(meas.tags.get("color"), Some(&"red"));
/// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
/// assert_eq!(meas.get_tag("color"), Some("red"));
/// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
/// ///
/// // alternate syntax ... /// // alternate syntax ...
/// ///
@@ -82,10 +83,10 @@ pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
/// let meas: OwnedMeasurement = rx.recv().unwrap(); /// let meas: OwnedMeasurement = rx.recv().unwrap();
/// ///
/// assert_eq!(meas.key, "test"); /// assert_eq!(meas.key, "test");
/// assert_eq!(meas.tags.get("one"), Some(&"a"));
/// assert_eq!(meas.tags.get("two"), Some(&"b"));
/// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
/// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
/// assert_eq!(meas.get_tag("one"), Some("a"));
/// assert_eq!(meas.get_tag("two"), Some("b"));
/// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
/// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
/// assert_eq!(meas.timestamp, Some(1)); /// assert_eq!(meas.timestamp, Some(1));
/// ///
/// // use the @make_meas flag to skip sending a measurement, instead merely /// // use the @make_meas flag to skip sending a measurement, instead merely
@@ -453,7 +454,7 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
line.push_str(&escape(value)); line.push_str(&escape(value));
}; };


for (key, value) in measurement.tags.iter() {
for &(key, value) in measurement.tags.iter() {
add_tag(line, key, value); add_tag(line, key, value);
} }


@@ -481,13 +482,13 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
// first time separate from tags with space // first time separate from tags with space
// //
fields.next().map(|kv| { fields.next().map(|kv| {
add_field(line, kv.0, kv.1, true);
add_field(line, &kv.0, &kv.1, true);
}); });


// then seperate the rest w/ comma // then seperate the rest w/ comma
// //
for kv in fields { for kv in fields {
add_field(line, kv.0, kv.1, false);
add_field(line, kv.0, &kv.1, false);
} }


if let Some(t) = measurement.timestamp { if let Some(t) = measurement.timestamp {
@@ -575,8 +576,10 @@ pub enum OwnedValue {
pub struct OwnedMeasurement { pub struct OwnedMeasurement {
pub key: &'static str, pub key: &'static str,
pub timestamp: Option<i64>, pub timestamp: Option<i64>,
pub fields: Map<&'static str, OwnedValue>,
pub tags: Map<&'static str, &'static str>,
//pub fields: Map<&'static str, OwnedValue>,
//pub tags: Map<&'static str, &'static str>,
pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
pub tags: SmallVec<[(&'static str, &'static str); 8]>,
} }


impl OwnedMeasurement { impl OwnedMeasurement {
@@ -584,22 +587,27 @@ impl OwnedMeasurement {
OwnedMeasurement { OwnedMeasurement {
key, key,
timestamp: None, timestamp: None,
tags: new_map(n_tags),
fields: new_map(n_fields),
tags: SmallVec::with_capacity(n_tags),
fields: SmallVec::with_capacity(n_fields),
} }
} }


pub fn new(key: &'static str) -> Self { pub fn new(key: &'static str) -> Self {
OwnedMeasurement::with_capacity(key, 4, 4)
OwnedMeasurement {
key,
timestamp: None,
tags: SmallVec::new(),
fields: SmallVec::new(),
}
} }


pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
self.tags.insert(key, value);
self.tags.push((key, value));
self self
} }


pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
self.fields.insert(key, value);
self.fields.push((key, value));
self self
} }


@@ -609,9 +617,33 @@ impl OwnedMeasurement {
} }


pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self { pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
*self.tags.entry(key).or_insert(value) = value;
self
match self.tags.iter().position(|kv| kv.0 == key) {
Some(i) => {
self.tags.get_mut(i)
.map(|x| {
x.0 = value;
});
self
}

None => {
self.add_tag(key, value)
}
}
}

pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
self.fields.iter()
.find(|kv| kv.0 == key)
.map(|kv| &kv.1)
} }

pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
self.tags.iter()
.find(|kv| kv.0 == key)
.map(|kv| kv.1)
}

} }


#[allow(unused_imports, unused_variables)] #[allow(unused_imports, unused_variables)]
@@ -626,8 +658,8 @@ mod tests {
let color = "red"; let color = "red";
let time = now(); let time = now();
let m = measure!(@make_meas test, t(color), t(tag_value), tm(time)); let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
assert_eq!(m.tags.get("color"), Some(&"red"));
assert_eq!(m.tags.get("tag_value"), Some(&"one"));
assert_eq!(m.get_tag("color"), Some("red"));
assert_eq!(m.get_tag("tag_value"), Some("one"));
assert_eq!(m.timestamp, Some(time)); assert_eq!(m.timestamp, Some(time));
} }


@@ -635,9 +667,9 @@ mod tests {
fn it_uses_measure_macro_parenthesis_syntax() { fn it_uses_measure_macro_parenthesis_syntax() {
let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1)); let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
assert_eq!(m.key, "test"); assert_eq!(m.key, "test");
assert_eq!(m.tags.get("a"), Some(&"b"));
assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
assert_eq!(m.get_tag("a"), Some("b"));
assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
assert_eq!(m.timestamp, Some(1)); assert_eq!(m.timestamp, Some(1));
} }


@@ -696,7 +728,7 @@ mod tests {
let (tx, rx) = channel(); let (tx, rx) = channel();
measure!(tx, test, tag[color;"red"], int[n;1]); measure!(tx, test, tag[color;"red"], int[n;1]);
let meas: OwnedMeasurement = rx.recv().unwrap(); let meas: OwnedMeasurement = rx.recv().unwrap();
assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
} }


#[test] #[test]
@@ -712,10 +744,10 @@ mod tests {
time [ 1 ] time [ 1 ]
); );
assert_eq!(meas.key, "test_measurement"); assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.tags.get("two"), Some(&"b"));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.get_tag("one"), Some("a"));
assert_eq!(meas.get_tag("two"), Some("b"));
assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.timestamp, Some(1)); assert_eq!(meas.timestamp, Some(1));
} }


@@ -735,10 +767,10 @@ mod tests {
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
let meas: OwnedMeasurement = rx.try_recv().unwrap(); let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement"); assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.tags.get("two"), Some(&"b"));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.get_tag("one"), Some("a"));
assert_eq!(meas.get_tag("two"), Some("b"));
assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.timestamp, Some(1)); assert_eq!(meas.timestamp, Some(1));
} }


@@ -759,9 +791,9 @@ mod tests {
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
let meas: OwnedMeasurement = rx.try_recv().unwrap(); let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement"); assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
assert_eq!(meas.get_tag("one"), Some("a"));
assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
assert_eq!(meas.timestamp, Some(t)); assert_eq!(meas.timestamp, Some(t));
} }


@@ -783,10 +815,10 @@ mod tests {
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
let meas: OwnedMeasurement = rx.try_recv().unwrap(); let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement"); assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.tags.get("two"), Some(&"b"));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.get_tag("one"), Some("a"));
assert_eq!(meas.get_tag("two"), Some("b"));
assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.timestamp, Some(1)); assert_eq!(meas.timestamp, Some(1));
} }


@@ -794,8 +826,8 @@ mod tests {
fn it_checks_that_fields_are_separated_correctly() { fn it_checks_that_fields_are_separated_correctly() {
let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]); let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
assert_eq!(m.key, "test"); assert_eq!(m.key, "test");
assert_eq!(m.tags.get("a"), Some(&"one"));
assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
assert_eq!(m.get_tag("a"), Some("one"));
assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));


let mut buf = String::new(); let mut buf = String::new();
serialize_owned(&m, &mut buf); serialize_owned(&m, &mut buf);
@@ -821,7 +853,7 @@ mod tests {


let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one)); let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));


assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
} }


#[bench] #[bench]


+ 1
- 0
src/lib.rs View File

@@ -22,6 +22,7 @@ extern crate ordermap;
extern crate decimal; extern crate decimal;
extern crate uuid; extern crate uuid;
extern crate hdrhistogram; extern crate hdrhistogram;
extern crate smallvec;


extern crate windows; extern crate windows;
extern crate pubsub as pub_sub; extern crate pubsub as pub_sub;


Loading…
Cancel
Save