From 1a21ef0ed8285c6833fb9b2aff80bb5bd364f14b Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 15 Mar 2018 20:44:09 -0400 Subject: [PATCH] random warning silencing --- src/hist.rs | 28 ++---------- src/influx.rs | 113 +++++++++++-------------------------------------- src/latency.rs | 2 +- 3 files changed, 29 insertions(+), 114 deletions(-) diff --git a/src/hist.rs b/src/hist.rs index 503ee67..0b27da4 100644 --- a/src/hist.rs +++ b/src/hist.rs @@ -1,14 +1,13 @@ -use std::sync::mpsc::{Sender, Receiver, channel, SendError}; +use std::sync::mpsc::{Sender, Receiver, channel}; use std::sync::Arc; use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; use std::path::PathBuf; use std::thread::{self, JoinHandle}; -use std::io::{self, Write}; +use std::io; use std::{mem, fs, env}; -use chrono::{DateTime, Utc, TimeZone}; -use hdrhistogram::{Histogram, Counter}; -use hdrhistogram::serialization::{Serializer, V2DeflateSerializer, V2Serializer}; +use hdrhistogram::{Histogram}; +use hdrhistogram::serialization::V2DeflateSerializer; use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder, Tag}; type C = u64; @@ -173,22 +172,3 @@ impl Drop for HistLog { } } } - - - - -// pub fn save_hist(thread: &'static str, ticker: Ticker, hist: Histogram) { -// env::home_dir().and_then(|mut path| { -// path.push(&format!("src/market-maker/var/hist/{}/", ticker.to_str())); -// let _ = fs::create_dir_all(&path); -// path.push(&format!("mm-v{}-{}-{}-1h-{}.v2", crate_version!(), thread, ticker.to_string(), Utc::now().to_rfc3339())); -// fs::File::create(&path).ok() -// }).map(|mut file| { -// let mut ser = V2DeflateSerializer::new(); -// ser.serialize(&hist, &mut file) -// .map_err(|e| { -// let _ = write!(&mut file, "\n\n{:?}", e); -// e -// }).ok(); -// }); -// } diff --git a/src/influx.rs b/src/influx.rs index b3bb19a..70ddca4 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -5,6 +5,7 @@ use std::io::Read; use std::sync::Arc; use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use std::thread; +#[cfg(feature = "warnings")] use std::fs; use std::time::Duration; use std::hash::BuildHasherDefault; @@ -17,7 +18,6 @@ use influent::measurement::{Measurement, Value}; use zmq; #[allow(unused_imports)] use chrono::{DateTime, Utc}; -use sloggers::types::Severity; use ordermap::OrderMap; use fnv::FnvHasher; use decimal::d128; @@ -27,24 +27,6 @@ use super::{nanos, file_logger, LOG_LEVEL}; #[cfg(feature = "warnings")] use warnings::Warning; -const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; -const ZMQ_RCV_HWM: i32 = 0; -const ZMQ_SND_HWM: i32 = 0; - -const BUFFER_SIZE: u16 = 80; - -#[cfg(not(any(test, feature = "test")))] -const DB_NAME: &'static str = "mm2"; - -#[cfg(any(test, feature = "test"))] -const DB_NAME: &'static str = "mm2_test"; - -#[cfg(not(any(feature = "scholes")))] -const DB_HOST: &'static str = "http://127.0.0.1:8086/write"; - -#[cfg(feature = "scholes")] -const DB_HOST: &'static str = "http://159.203.81.249:8086/write"; - pub use super::{dur_nanos, dt_nanos}; pub type Map = OrderMap>; @@ -191,11 +173,11 @@ pub struct InfluxWriter { impl Default for InfluxWriter { fn default() -> Self { - if cfg!(any(test, feature = "test")) { - InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 1) - } else { - InfluxWriter::new("localhost", "mm2", "/home/jstrong/src/logging/var/log/influx-default.log", BUFFER_SIZE) - } + //if cfg!(any(test, feature = "test")) { + // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0) + //} else { + InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 4000) + //} } } @@ -341,17 +323,19 @@ impl Drop for InfluxWriter { } } +const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; + pub fn pull(ctx: &zmq::Context) -> Result { let socket = ctx.socket(zmq::PULL)?; socket.bind(WRITER_ADDR)?; - socket.set_rcvhwm(ZMQ_RCV_HWM)?; + socket.set_rcvhwm(0)?; Ok(socket) } pub fn push(ctx: &zmq::Context) -> Result { let socket = ctx.socket(zmq::PUSH)?; socket.connect(WRITER_ADDR)?; - socket.set_sndhwm(ZMQ_SND_HWM)?; + socket.set_sndhwm(0)?; Ok(socket) } @@ -515,7 +499,9 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { #[cfg(feature = "warnings")] #[deprecated(since="0.4", note="Replace with InfluxWriter")] pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { + assert!(false); thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || { + const DB_HOST: &'static str = "http://127.0.0.1:8086/write"; let _ = fs::create_dir("/tmp/mm"); let ctx = zmq::Context::new(); let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); @@ -580,6 +566,11 @@ pub enum OwnedValue { Uuid(Uuid), } +/// Holds data meant for an influxdb measurement in transit to the +/// writing thread. +/// +/// TODO: convert `Map` to `SmallVec`? +/// #[derive(Clone, Debug)] pub struct OwnedMeasurement { pub key: &'static str, @@ -689,13 +680,13 @@ mod tests { #[bench] fn influx_writer_send_price(b: &mut Bencher) { - let m = InfluxWriter::default(); + let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); b.iter(|| { measure!(m, test, - tag[ticker; t!(xmr-btc).to_str()], - tag[exchange; "plnx"], - d128[bid; d128::zero()], - d128[ask; d128::zero()], + t(ticker, t!(xmr-btc).as_str()), + t(exchange, "plnx"), + d(bid, d128::zero()), + d(ask, d128::zero()), ); }); } @@ -911,7 +902,7 @@ mod tests { buf.push_str(&buf_copy); println!("{}", buf); - let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let url = Url::parse_with_params("localhost", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); let client = Client::new(); match client.post(url.clone()) .body(&buf) @@ -984,7 +975,7 @@ mod tests { buf.push_str(&buf_copy); println!("{}", buf); - let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let url = Url::parse_with_params("localhost", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); let client = Client::new(); match client.post(url.clone()) .body(&buf) @@ -1002,60 +993,4 @@ mod tests { } } } - - // macro_rules! make_measurement { - // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; - // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; - // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) }; - // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; - // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; - // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; - // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; - // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) }; - // - // (@count_tags) => {0usize}; - // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; - // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; - // - // (@count_fields) => {0usize}; - // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; - // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; - // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)}; - // - // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ - // let n_tags = measure!(@count_tags $($t)*); - // let n_fields = measure!(@count_fields $($t)*); - // let mut meas = - // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); - // $( - // measure!(@kv $t, meas, $($tail)*); - // )* - // //let _ = $m.send(meas); - // meas - // }}; - // } - // - // #[test] - // fn it_checks_n_tags_is_correct() { - // let (tx, _): (Sender, Receiver) = channel(); - // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1); - // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2); - // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0); - // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0); - // - // let m4 = - // make_measurement!(tx, test, - // tag[a;"b"], - // tag[c;"d"], - // int[n; 1], - // tag[e;"f"], - // float[x; 1.234], - // tag[g;"h"], - // time[1], - // ); - // assert_eq!(m4.n_tags, 4); - // assert_eq!(m4.n_fields, 2); - // } - - } diff --git a/src/latency.rs b/src/latency.rs index 7ce77d3..61febb7 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -242,7 +242,7 @@ impl Manager { let nanos = DurationWindow::nanos(dur); measurements.send( OwnedMeasurement::new("gdax_trade_api") - .add_tag("ticker", ticker.to_str()) + .add_tag("ticker", ticker.as_str()) .add_field("nanos", OwnedValue::Integer(nanos as i64)) .set_timestamp(influx::now())).unwrap(); }