Browse Source

adds d128, Uuid variants to OwnedValue + assorted improvements

master
Jonathan Strong 7 years ago
parent
commit
c84e7b46a6
5 changed files with 184 additions and 36 deletions
  1. +7
    -2
      Cargo.toml
  2. +168
    -29
      src/influx.rs
  3. +1
    -1
      src/latency.rs
  4. +4
    -0
      src/lib.rs
  5. +4
    -4
      src/warnings.rs

+ 7
- 2
Cargo.toml View File

@@ -14,9 +14,14 @@ slog = "2.0.6"
sloggers = "0.2" sloggers = "0.2"
slog-term = "2" slog-term = "2"
# chashmap = "2" # chashmap = "2"
ordermap = "0.2"
fnv = "1"
uuid = { version = "0.5", features = ["serde", "v4"] }


windows = { path = "../windows" }
money = { path = "../money" }
decimal = { path = "../decimal", version = "2" }

windows = { path = "../windows", version = "0.1" }
money = { path = "../money", version = "0.1" }
pubsub = { path = "../pubsub" } pubsub = { path = "../pubsub" }


[features] [features]


+ 168
- 29
src/influx.rs View File

@@ -5,9 +5,9 @@ use std::iter::FromIterator;
use std::io::{Write, Read}; use std::io::{Write, Read};
use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use std::sync::mpsc::{Sender, Receiver, channel, SendError};
use std::thread; use std::thread;
use std::collections::HashMap;
use std::fs::{self, OpenOptions}; use std::fs::{self, OpenOptions};
use std::time::Duration; use std::time::Duration;
use std::hash::{Hash, BuildHasherDefault};


use hyper::status::StatusCode; use hyper::status::StatusCode;
use hyper::client::response::Response; use hyper::client::response::Response;
@@ -17,6 +17,12 @@ use influent::measurement::{Measurement, Value};
use zmq; use zmq;
use chrono::{DateTime, Utc, TimeZone}; use chrono::{DateTime, Utc, TimeZone};
use sloggers::types::Severity; use sloggers::types::Severity;
use ordermap::OrderMap;
use fnv::FnvHasher;
use decimal::d128;
use uuid::Uuid;

use money::Ticker;


use super::{nanos, file_logger}; use super::{nanos, file_logger};
use warnings::Warning; use warnings::Warning;
@@ -29,7 +35,13 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
const ZMQ_RCV_HWM: i32 = 0; const ZMQ_RCV_HWM: i32 = 0;
const ZMQ_SND_HWM: i32 = 0; const ZMQ_SND_HWM: i32 = 0;


const N_BUFFER: u8 = 80;
const BUFFER_SIZE: u8 = 80;

pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;

pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
Map::with_capacity_and_hasher(capacity, Default::default())
}


/// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
/// ///
@@ -93,10 +105,24 @@ macro_rules! measure {
(@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
(@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
(@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
(@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
(@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
(@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) }; (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
(@count_tags) => {0usize};
(@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
(@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};

(@count_fields) => {0usize};
(@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
(@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
(@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};


($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
let mut meas = $crate::influx::OwnedMeasurement::new(stringify!($name));
let n_tags = measure!(@count_tags $($t)*);
let n_fields = measure!(@count_fields $($t)*);
let mut meas =
$crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
$( $(
measure!(@kv $t, meas, $($tail)*); measure!(@kv $t, meas, $($tail)*);
)* )*
@@ -105,7 +131,7 @@ macro_rules! measure {
} }


/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
/// it receives (over a SPSC channel) and inserts to influxdb via http when `N_BUFFER`
/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
/// measurements have accumulated. /// measurements have accumulated.
/// ///
pub struct InfluxWriter { pub struct InfluxWriter {
@@ -118,7 +144,7 @@ pub struct InfluxWriter {


impl Default for InfluxWriter { impl Default for InfluxWriter {
fn default() -> Self { fn default() -> Self {
InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log")
InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE)
} }
} }


@@ -129,7 +155,7 @@ impl InfluxWriter {
self.tx.send(m) self.tx.send(m)
} }


pub fn new(host: &'static str, db: &'static str, log_path: &str) -> Self {
pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self {
let (kill_switch, terminate) = channel(); let (kill_switch, terminate) = channel();
let (tx, rx) = channel(); let (tx, rx) = channel();
let logger = file_logger(log_path, Severity::Info); let logger = file_logger(log_path, Severity::Info);
@@ -155,7 +181,7 @@ impl InfluxWriter {
1 1
} }


n @ 1 ... N_BUFFER => {
n if n < buffer_size => {
buf.push_str("\n"); buf.push_str("\n");
buf.push_str(s); buf.push_str(s);
n + 1 n + 1
@@ -204,7 +230,16 @@ impl InfluxWriter {
loop { loop {
rcvd_msg = false; rcvd_msg = false;
rx.recv_timeout(Duration::from_millis(10)) rx.recv_timeout(Duration::from_millis(10))
.map(|meas| {
.map(|mut meas: OwnedMeasurement| {
// if we didn't set the timestamp, it would end up
// being whenever we accumulated `BUFFER_SIZE` messages,
// which might be some period of time after we received
// the message.
//
if meas.timestamp.is_none() {
meas.timestamp = Some(now());
}

trace!(logger, "rcvd new OwnedMeasurement"; "count" => count); trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
serialize_owned(&meas, &mut meas_buf); serialize_owned(&meas, &mut meas_buf);
count = next(count, &meas_buf, &mut buf); count = next(count, &meas_buf, &mut buf);
@@ -382,9 +417,9 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
add_tag(line, key, value); add_tag(line, key, value);
} }


for (key, value) in measurement.string_tags.iter() {
add_tag(line, key, value);
}
// for (key, value) in measurement.string_tags.iter() {
// add_tag(line, key, value);
// }


let mut fields = measurement.fields.iter(); let mut fields = measurement.fields.iter();


@@ -392,11 +427,13 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
line.push_str(" "); line.push_str(" ");
line.push_str(&escape_tag(key)); line.push_str(&escape_tag(key));
line.push_str("="); line.push_str("=");
match value {
&OwnedValue::String(ref s) => line.push_str(&as_string(s)),
&OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
&OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
&OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b))
match *value {
OwnedValue::String(ref s) => line.push_str(&as_string(s)),
OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
}; };
}; };


@@ -481,38 +518,48 @@ pub enum OwnedValue {
String(String), String(String),
Float(f64), Float(f64),
Integer(i64), Integer(i64),
Boolean(bool)
Boolean(bool),
D128(d128),
Uuid(Uuid),
} }


#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OwnedMeasurement { pub struct OwnedMeasurement {
pub key: &'static str, pub key: &'static str,
pub timestamp: Option<i64>, pub timestamp: Option<i64>,
pub fields: HashMap<&'static str, OwnedValue>,
pub tags: HashMap<&'static str, &'static str>,
pub string_tags: HashMap<&'static str, String>
pub fields: Map<&'static str, OwnedValue>,
pub tags: Map<&'static str, &'static str>,
//pub n_tags: usize,
//pub n_fields: usize,
//pub string_tags: HashMap<&'static str, String>
} }


impl OwnedMeasurement { impl OwnedMeasurement {
pub fn new(key: &'static str) -> Self {
pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
OwnedMeasurement { OwnedMeasurement {
key, key,
timestamp: None, timestamp: None,
fields: HashMap::new(),
tags: HashMap::new(),
string_tags: HashMap::new()
tags: new_map(n_tags),
fields: new_map(n_fields),
//n_tags,
//n_fields,
//string_tags: HashMap::new()
} }
} }


pub fn new(key: &'static str) -> Self {
OwnedMeasurement::with_capacity(key, 4, 4)
}

pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
self.tags.insert(key, value); self.tags.insert(key, value);
self self
} }


pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
self.string_tags.insert(key, value);
self
}
// pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
// self.string_tags.insert(key, value);
// self
// }


pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
self.fields.insert(key, value); self.fields.insert(key, value);
@@ -540,13 +587,26 @@ mod tests {
use test::{black_box, Bencher}; use test::{black_box, Bencher};


#[bench] #[bench]
fn influx_writer_send(b: &mut Bencher) {
fn influx_writer_send_basic(b: &mut Bencher) {
let m = InfluxWriter::default(); let m = InfluxWriter::default();
b.iter(|| { b.iter(|| {
measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]); measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]);
}); });
} }


#[bench]
fn influx_writer_send_price(b: &mut Bencher) {
let m = InfluxWriter::default();
b.iter(|| {
measure!(m, test,
tag[ticker; t!(xmr-btc).to_str()],
tag[exchange; "plnx"],
d128[bid; d128::zero()],
d128[ask; d128::zero()],
);
});
}

#[test] #[test]
fn it_checks_color_tag_error_in_non_doctest() { fn it_checks_color_tag_error_in_non_doctest() {
let (tx, rx) = channel(); let (tx, rx) = channel();
@@ -578,6 +638,29 @@ mod tests {
assert_eq!(meas.timestamp, Some(1)); assert_eq!(meas.timestamp, Some(1));
} }


#[test]
fn it_uses_measure_macro_for_d128_and_uuid() {

let (tx, rx) = channel();
let u = Uuid::new_v4();
let d = d128::zero();
let t = now();
measure!(tx, test_measurement,
tag[one; "a"],
d128[two; d],
uuid[three; u],
time[t]
);

thread::sleep_ms(10);
let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
assert_eq!(meas.timestamp, Some(t));
}

#[test] #[test]
fn it_uses_the_measure_macro_alt_syntax() { fn it_uses_the_measure_macro_alt_syntax() {


@@ -779,4 +862,60 @@ mod tests {
} }
} }
} }

// macro_rules! make_measurement {
// (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
// (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
// (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
// (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
// (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
// (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
// (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
// (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
//
// (@count_tags) => {0usize};
// (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
// (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
//
// (@count_fields) => {0usize};
// (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
// (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
// (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
//
// ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
// let n_tags = measure!(@count_tags $($t)*);
// let n_fields = measure!(@count_fields $($t)*);
// let mut meas =
// $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
// $(
// measure!(@kv $t, meas, $($tail)*);
// )*
// //let _ = $m.send(meas);
// meas
// }};
// }
//
// #[test]
// fn it_checks_n_tags_is_correct() {
// let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
// assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
// assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
// assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
// assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
//
// let m4 =
// make_measurement!(tx, test,
// tag[a;"b"],
// tag[c;"d"],
// int[n; 1],
// tag[e;"f"],
// float[x; 1.234],
// tag[g;"h"],
// time[1],
// );
// assert_eq!(m4.n_tags, 4);
// assert_eq!(m4.n_fields, 2);
// }


} }

+ 1
- 1
src/latency.rs View File

@@ -360,7 +360,7 @@ impl Manager {
let nanos = DurationWindow::nanos(dur); let nanos = DurationWindow::nanos(dur);
measurements.send( measurements.send(
OwnedMeasurement::new("gdax_trade_api") OwnedMeasurement::new("gdax_trade_api")
.add_string_tag("ticker", ticker.to_string())
.add_tag("ticker", ticker.to_str())
.add_field("nanos", OwnedValue::Integer(nanos as i64)) .add_field("nanos", OwnedValue::Integer(nanos as i64))
.set_timestamp(influx::now())); .set_timestamp(influx::now()));
} }


+ 4
- 0
src/lib.rs View File

@@ -15,6 +15,10 @@ extern crate termion;
//extern crate pub_sub; //extern crate pub_sub;
extern crate sloggers; extern crate sloggers;
extern crate slog_term; extern crate slog_term;
extern crate fnv;
extern crate ordermap;
extern crate decimal;
extern crate uuid;
//extern crate shuteye; //extern crate shuteye;
//extern crate chashmap; //extern crate chashmap;




+ 4
- 4
src/warnings.rs View File

@@ -26,7 +26,7 @@ const N_WARNINGS: usize = 500;
macro_rules! confirmed { macro_rules! confirmed {
($warnings:ident, $($args:tt)*) => ( ($warnings:ident, $($args:tt)*) => (
{ {
$let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap();
let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap();
} }
) )
} }
@@ -36,7 +36,7 @@ macro_rules! confirmed {
macro_rules! awesome { macro_rules! awesome {
($warnings:ident, $($args:tt)*) => ( ($warnings:ident, $($args:tt)*) => (
{ {
$warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap();
let _ = $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap();
} }
) )
} }
@@ -45,7 +45,7 @@ macro_rules! awesome {
macro_rules! critical { macro_rules! critical {
($warnings:ident, $($args:tt)*) => ( ($warnings:ident, $($args:tt)*) => (
{ {
$warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap();
let _ = $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap();
} }
) )
} }
@@ -54,7 +54,7 @@ macro_rules! critical {
macro_rules! notice { macro_rules! notice {
($warnings:ident, $($args:tt)*) => ( ($warnings:ident, $($args:tt)*) => (
{ {
$warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap();
let _ = $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap();
} }
) )
} }


Loading…
Cancel
Save