diff --git a/src/influx.rs b/src/influx.rs index 4f7bf6b..109a405 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -2,9 +2,11 @@ //! use std::iter::FromIterator; -use std::io::Read; +use std::io::{Write, Read}; use std::sync::mpsc::{Sender, Receiver, channel}; use std::thread; +use std::collections::HashMap; +use std::fs::{self, OpenOptions}; use hyper::status::StatusCode; use hyper::client::response::Response; @@ -17,7 +19,7 @@ use chrono::{DateTime, Utc, TimeZone}; use super::nanos; use warnings::Warning; -const WRITER_ADDR: &'static str = "ipc://mm-influx"; +const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853"; const DB_NAME: &'static str = "mm"; const DB_HOST: &'static str = "http://localhost:8086/write"; @@ -39,9 +41,8 @@ pub fn push(ctx: &zmq::Context) -> Result { } fn escape(s: &str) -> String { - s - .replace(" ", "\\ ") - .replace(",", "\\,") + s.replace(" ", "\\ ") + .replace(",", "\\,") } fn as_string(s: &str) -> String { @@ -130,24 +131,53 @@ pub fn serialize(measurement: &Measurement, line: &mut String) { } } +pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { + line.push_str(&escape(measurement.key)); -pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { + for (tag, value) in measurement.tags.iter() { + line.push_str(","); + line.push_str(&escape(tag)); + line.push_str("="); + line.push_str(&escape(value)); + } - let ctx = zmq::Context::new(); + let mut was_spaced = false; - let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); + for (field, value) in measurement.fields.iter() { + line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }}); + line.push_str(&escape(field)); + line.push_str("="); - let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + match value { + &OwnedValue::String(ref s) => line.push_str(&as_string(s)), + &OwnedValue::Integer(ref i) => line.push_str(&as_integer(i)), + &OwnedValue::Float(ref f) => line.push_str(&as_float(f)), + &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)) + }; + } - let client = Client::new(); + match measurement.timestamp { + Some(t) => { + line.push_str(" "); + line.push_str(&t.to_string()); + } + _ => {} + } +} - let mut buf = String::with_capacity(4096); - let mut server_resp = String::with_capacity(4096); - let mut count = 0; +pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { thread::spawn(move || { + let _ = fs::create_dir("/tmp/mm"); + let ctx = zmq::Context::new(); + let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); + let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let client = Client::new(); + let mut buf = String::with_capacity(4096); + let mut server_resp = String::with_capacity(4096); + let mut count = 0; loop { if let Ok(bytes) = socket.recv_bytes(0) { if let Ok(msg) = String::from_utf8(bytes) { @@ -171,18 +201,11 @@ pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { Ok(Response { status, .. }) if status == StatusCode::NoContent => {} Ok(mut resp) => { - //let mut body = String::with_capacity(4096); - //let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); - //println!("Influx write error: Server responded {} (sent '{}' to {}):\n{}", - //warnings.send(Warning::Error(buf.clone())); - //print!("\n\n\n\n\n{}", buf); warnings.send( Warning::Error( format!("Influx server: {}", server_resp))); server_resp.clear(); - - //resp.status, String::from_utf8_lossy(&bytes), url, body); } Err(why) => { @@ -201,6 +224,143 @@ pub fn writer(warnings: Sender) -> thread::JoinHandle<()> { }) } +#[derive(Debug, Clone, PartialEq)] +pub enum OwnedValue { + String(String), + Float(f64), + Integer(i64), + Boolean(bool) +} + +pub struct OwnedMeasurement { + pub key: &'static str, + pub timestamp: Option, + pub fields: HashMap<&'static str, OwnedValue>, + pub tags: HashMap<&'static str, &'static str> +} + +impl OwnedMeasurement { + pub fn new(key: &'static str) -> Self { + OwnedMeasurement { + key, + timestamp: None, + fields: HashMap::new(), + tags: HashMap::new() + } + } + + pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self { + self.tags.insert(key, value); + self + } + + pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self { + self.fields.insert(key, value); + self + } + + pub fn set_timestamp(mut self, timestamp: i64) -> Self { + self.timestamp = Some(timestamp); + self + } +} + +pub fn dur_nanos(d: ::std::time::Duration) -> i64 { + (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64 +} + +//pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) } + +/// exactly like `writer`, but also returns a `Sender` and accepts +/// incoming `Measurement`s that way *in addition* to the old socket/`String` +/// method +/// +pub fn writer_str_or_meas(warnings: Sender) -> (thread::JoinHandle<()>, Sender) { + let (tx, rx) = channel(); + let thread = thread::spawn(move || { + let _ = fs::create_dir("/tmp/mm"); + let ctx = zmq::Context::new(); + let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); + let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let client = Client::new(); + let mut meas_buf = String::with_capacity(4096); + let mut buf = String::with_capacity(4096); + let mut server_resp = String::with_capacity(4096); + let mut count = 0; + + let next = |prev: u8, s: &str, buf: &mut String| -> u8 { + match prev { + 0 => { + buf.push_str(s); + 1 + } + n @ 1...40 => { + buf.push_str("\n"); + buf.push_str(s); + n + 1 + } + _ => { + buf.push_str("\n"); + buf.push_str(s); + match client.post(url.clone()) + .body(buf.as_str()) + .send() { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => {} + + Ok(mut resp) => { + let mut server_resp = String::with_capacity(1024); + server_resp.push_str(&format!("sent at {}:\n", Utc::now())); + server_resp.push_str(&buf); + server_resp.push_str("\nreceived:\n"); + resp.read_to_string(&mut server_resp); //.unwrap_or(0); + OpenOptions::new() + .create(true) + .append(true) + .open("/home/jstrong/src/market-maker/influx-errors.txt") + .map_err(|e| { + warnings.send(Warning::Error(format!("failed to save influx error: {}", e))); + }).map(|mut file| { + write!(file, "{}", server_resp); + }); + server_resp.truncate(120); + warnings.send( + Warning::Error( + format!("Influx server: {}", server_resp))); + } + + Err(why) => { + warnings.send( + Warning::Error( + format!("Influx write error: {}", why))); + } + } + buf.clear(); + 0 + } + } + }; + + loop { + rx.try_recv() + .map(|meas| { + serialize_owned(&meas, &mut meas_buf); + count = next(count, &meas_buf, &mut buf); + meas_buf.clear(); + }); + + socket.recv_bytes(0).ok() + .and_then(|bytes| { + String::from_utf8(bytes).ok() + }).map(|s| { + count = next(count, &s, &mut buf); + }); + } + }); + (thread, tx) +} + + mod tests { use super::*; @@ -270,4 +430,40 @@ mod tests { } + #[test] + fn it_serializes_a_hard_to_serialize_message_from_owned() { + let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; + let mut buf = String::new(); + let mut server_resp = String::new(); + let mut m = OwnedMeasurement::new("rust_test") + .add_field("s", OwnedValue::String(raw.to_string())) + .set_timestamp(now()); + serialize_owned(&m, &mut buf); + println!("{}", buf); + buf.push_str("\n"); + let buf_copy = buf.clone(); + buf.push_str(&buf_copy); + println!("{}", buf); + + let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); + let client = Client::new(); + match client.post(url.clone()) + .body(&buf) + .send() { + + Ok(Response { status, .. }) if status == StatusCode::NoContent => {} + + Ok(mut resp) => { + resp.read_to_string(&mut server_resp); //.unwrap_or(0); + panic!("{}", server_resp); + } + + Err(why) => { + panic!(why) + } + } + + } + + } diff --git a/src/latency.rs b/src/latency.rs index 084e578..17803ba 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -239,9 +239,9 @@ impl LatencyManager { let mut plnx_priv = DurationWindow::new(w.duration()); let mut plnx_order = DurationWindow::new(w.duration()); - /// yes I am intentionally breaking from the hard-typed duration - /// window ... that was a stupid idea - /// + // yes I am intentionally breaking from the hard-typed duration + // window ... that was a stupid idea + // let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30)); let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300)); //let mut gdax_ws_nolock = DurationWindow::new(w.duration());