diff --git a/Cargo.toml b/Cargo.toml index 3d475ed..aa42611 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,9 +14,14 @@ slog = "2.0.6" sloggers = "0.2" slog-term = "2" # chashmap = "2" +ordermap = "0.2" +fnv = "1" +uuid = { version = "0.5", features = ["serde", "v4"] } -windows = { path = "../windows" } -money = { path = "../money" } +decimal = { path = "../decimal", version = "2" } + +windows = { path = "../windows", version = "0.1" } +money = { path = "../money", version = "0.1" } pubsub = { path = "../pubsub" } [features] diff --git a/src/influx.rs b/src/influx.rs index e667917..6e6ab90 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -5,9 +5,9 @@ use std::iter::FromIterator; use std::io::{Write, Read}; use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use std::thread; -use std::collections::HashMap; use std::fs::{self, OpenOptions}; use std::time::Duration; +use std::hash::{Hash, BuildHasherDefault}; use hyper::status::StatusCode; use hyper::client::response::Response; @@ -17,6 +17,12 @@ use influent::measurement::{Measurement, Value}; use zmq; use chrono::{DateTime, Utc, TimeZone}; use sloggers::types::Severity; +use ordermap::OrderMap; +use fnv::FnvHasher; +use decimal::d128; +use uuid::Uuid; + +use money::Ticker; use super::{nanos, file_logger}; use warnings::Warning; @@ -29,7 +35,13 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write"; const ZMQ_RCV_HWM: i32 = 0; const ZMQ_SND_HWM: i32 = 0; -const N_BUFFER: u8 = 80; +const BUFFER_SIZE: u8 = 80; + +pub type Map = OrderMap>; + +pub fn new_map(capacity: usize) -> Map { + Map::with_capacity_and_hasher(capacity, Default::default()) +} /// Provides flexible and ergonomic use of `Sender`. /// @@ -93,10 +105,24 @@ macro_rules! measure { (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; + (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) }; + (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) }; (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) }; + + (@count_tags) => {0usize}; + (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; + (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; + + (@count_fields) => {0usize}; + (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; + (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; + (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)}; ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ - let mut meas = $crate::influx::OwnedMeasurement::new(stringify!($name)); + let n_tags = measure!(@count_tags $($t)*); + let n_fields = measure!(@count_fields $($t)*); + let mut meas = + $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); $( measure!(@kv $t, meas, $($tail)*); )* @@ -105,7 +131,7 @@ macro_rules! measure { } /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s -/// it receives (over a SPSC channel) and inserts to influxdb via http when `N_BUFFER` +/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` /// measurements have accumulated. /// pub struct InfluxWriter { @@ -118,7 +144,7 @@ pub struct InfluxWriter { impl Default for InfluxWriter { fn default() -> Self { - InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log") + InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE) } } @@ -129,7 +155,7 @@ impl InfluxWriter { self.tx.send(m) } - pub fn new(host: &'static str, db: &'static str, log_path: &str) -> Self { + pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self { let (kill_switch, terminate) = channel(); let (tx, rx) = channel(); let logger = file_logger(log_path, Severity::Info); @@ -155,7 +181,7 @@ impl InfluxWriter { 1 } - n @ 1 ... N_BUFFER => { + n if n < buffer_size => { buf.push_str("\n"); buf.push_str(s); n + 1 @@ -204,7 +230,16 @@ impl InfluxWriter { loop { rcvd_msg = false; rx.recv_timeout(Duration::from_millis(10)) - .map(|meas| { + .map(|mut meas: OwnedMeasurement| { + // if we didn't set the timestamp, it would end up + // being whenever we accumulated `BUFFER_SIZE` messages, + // which might be some period of time after we received + // the message. + // + if meas.timestamp.is_none() { + meas.timestamp = Some(now()); + } + trace!(logger, "rcvd new OwnedMeasurement"; "count" => count); serialize_owned(&meas, &mut meas_buf); count = next(count, &meas_buf, &mut buf); @@ -382,9 +417,9 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { add_tag(line, key, value); } - for (key, value) in measurement.string_tags.iter() { - add_tag(line, key, value); - } + // for (key, value) in measurement.string_tags.iter() { + // add_tag(line, key, value); + // } let mut fields = measurement.fields.iter(); @@ -392,11 +427,13 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { line.push_str(" "); line.push_str(&escape_tag(key)); line.push_str("="); - match value { - &OwnedValue::String(ref s) => line.push_str(&as_string(s)), - &OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)), - &OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)), - &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)) + match *value { + OwnedValue::String(ref s) => line.push_str(&as_string(s)), + OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)), + OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)), + OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)), + OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)), + OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])), }; }; @@ -481,38 +518,48 @@ pub enum OwnedValue { String(String), Float(f64), Integer(i64), - Boolean(bool) + Boolean(bool), + D128(d128), + Uuid(Uuid), } #[derive(Clone, Debug)] pub struct OwnedMeasurement { pub key: &'static str, pub timestamp: Option, - pub fields: HashMap<&'static str, OwnedValue>, - pub tags: HashMap<&'static str, &'static str>, - pub string_tags: HashMap<&'static str, String> + pub fields: Map<&'static str, OwnedValue>, + pub tags: Map<&'static str, &'static str>, + //pub n_tags: usize, + //pub n_fields: usize, + //pub string_tags: HashMap<&'static str, String> } impl OwnedMeasurement { - pub fn new(key: &'static str) -> Self { + pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self { OwnedMeasurement { key, timestamp: None, - fields: HashMap::new(), - tags: HashMap::new(), - string_tags: HashMap::new() + tags: new_map(n_tags), + fields: new_map(n_fields), + //n_tags, + //n_fields, + //string_tags: HashMap::new() } } + pub fn new(key: &'static str) -> Self { + OwnedMeasurement::with_capacity(key, 4, 4) + } + pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { self.tags.insert(key, value); self } - pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self { - self.string_tags.insert(key, value); - self - } + // pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self { + // self.string_tags.insert(key, value); + // self + // } pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { self.fields.insert(key, value); @@ -540,13 +587,26 @@ mod tests { use test::{black_box, Bencher}; #[bench] - fn influx_writer_send(b: &mut Bencher) { + fn influx_writer_send_basic(b: &mut Bencher) { let m = InfluxWriter::default(); b.iter(|| { measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]); }); } + #[bench] + fn influx_writer_send_price(b: &mut Bencher) { + let m = InfluxWriter::default(); + b.iter(|| { + measure!(m, test, + tag[ticker; t!(xmr-btc).to_str()], + tag[exchange; "plnx"], + d128[bid; d128::zero()], + d128[ask; d128::zero()], + ); + }); + } + #[test] fn it_checks_color_tag_error_in_non_doctest() { let (tx, rx) = channel(); @@ -578,6 +638,29 @@ mod tests { assert_eq!(meas.timestamp, Some(1)); } + #[test] + fn it_uses_measure_macro_for_d128_and_uuid() { + + let (tx, rx) = channel(); + let u = Uuid::new_v4(); + let d = d128::zero(); + let t = now(); + measure!(tx, test_measurement, + tag[one; "a"], + d128[two; d], + uuid[three; u], + time[t] + ); + + thread::sleep_ms(10); + let meas: OwnedMeasurement = rx.try_recv().unwrap(); + assert_eq!(meas.key, "test_measurement"); + assert_eq!(meas.tags.get("one"), Some(&"a")); + assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero()))); + assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u))); + assert_eq!(meas.timestamp, Some(t)); + } + #[test] fn it_uses_the_measure_macro_alt_syntax() { @@ -779,4 +862,60 @@ mod tests { } } } + + // macro_rules! make_measurement { + // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; + // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; + // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) }; + // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; + // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; + // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; + // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; + // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) }; + // + // (@count_tags) => {0usize}; + // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; + // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; + // + // (@count_fields) => {0usize}; + // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; + // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; + // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)}; + // + // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ + // let n_tags = measure!(@count_tags $($t)*); + // let n_fields = measure!(@count_fields $($t)*); + // let mut meas = + // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); + // $( + // measure!(@kv $t, meas, $($tail)*); + // )* + // //let _ = $m.send(meas); + // meas + // }}; + // } + // + // #[test] + // fn it_checks_n_tags_is_correct() { + // let (tx, _): (Sender, Receiver) = channel(); + // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1); + // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2); + // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0); + // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0); + // + // let m4 = + // make_measurement!(tx, test, + // tag[a;"b"], + // tag[c;"d"], + // int[n; 1], + // tag[e;"f"], + // float[x; 1.234], + // tag[g;"h"], + // time[1], + // ); + // assert_eq!(m4.n_tags, 4); + // assert_eq!(m4.n_fields, 2); + // } + + } diff --git a/src/latency.rs b/src/latency.rs index 60e3cfe..239cad2 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -360,7 +360,7 @@ impl Manager { let nanos = DurationWindow::nanos(dur); measurements.send( OwnedMeasurement::new("gdax_trade_api") - .add_string_tag("ticker", ticker.to_string()) + .add_tag("ticker", ticker.to_str()) .add_field("nanos", OwnedValue::Integer(nanos as i64)) .set_timestamp(influx::now())); } diff --git a/src/lib.rs b/src/lib.rs index 406d963..5533fb4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,10 @@ extern crate termion; //extern crate pub_sub; extern crate sloggers; extern crate slog_term; +extern crate fnv; +extern crate ordermap; +extern crate decimal; +extern crate uuid; //extern crate shuteye; //extern crate chashmap; diff --git a/src/warnings.rs b/src/warnings.rs index a937b4a..da3231d 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -26,7 +26,7 @@ const N_WARNINGS: usize = 500; macro_rules! confirmed { ($warnings:ident, $($args:tt)*) => ( { - $let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap(); + let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap(); } ) } @@ -36,7 +36,7 @@ macro_rules! confirmed { macro_rules! awesome { ($warnings:ident, $($args:tt)*) => ( { - $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap(); + let _ = $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap(); } ) } @@ -45,7 +45,7 @@ macro_rules! awesome { macro_rules! critical { ($warnings:ident, $($args:tt)*) => ( { - $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap(); + let _ = $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap(); } ) } @@ -54,7 +54,7 @@ macro_rules! critical { macro_rules! notice { ($warnings:ident, $($args:tt)*) => ( { - $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap(); + let _ = $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap(); } ) }