From 48427c65d328335c0b1d288350a58aca8fde66f0 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 30 Aug 2017 21:18:28 -0400 Subject: [PATCH] adds file logging to influx writer --- src/influx.rs | 81 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/src/influx.rs b/src/influx.rs index 3d88f9e..b142178 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -7,6 +7,7 @@ use std::sync::mpsc::{Sender, Receiver, channel}; use std::thread; use std::collections::HashMap; use std::fs::{self, OpenOptions}; +use std::time::Duration; use hyper::status::StatusCode; use hyper::client::response::Response; @@ -16,7 +17,7 @@ use influent::measurement::{Measurement, Value}; use zmq; use chrono::{DateTime, Utc, TimeZone}; -use super::nanos; +use super::{nanos, file_logger}; use warnings::Warning; const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; @@ -294,17 +295,26 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 { pub fn writer_str_or_meas(warnings: Sender) -> (thread::JoinHandle<()>, Sender) { let (tx, rx) = channel(); let thread = thread::spawn(move || { + let logger = file_logger("var/log/influx.log"); + info!(logger, "initializing zmq"); let _ = fs::create_dir("/tmp/mm"); let ctx = zmq::Context::new(); let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); + info!(logger, "initializing url"; + "DB_HOST" => DB_HOST, + "DB_NAME" => DB_NAME); let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); let client = Client::new(); + info!(logger, "initializing buffers"); let mut meas_buf = String::with_capacity(4096); let mut buf = String::with_capacity(4096); let mut server_resp = String::with_capacity(4096); let mut count = 0; let next = |prev: u8, s: &str, buf: &mut String| -> u8 { + debug!(logger, "appending serialized measurement to buffer"; + "prev" => prev, + "buf.len()" => buf.len()); match prev { 0 => { buf.push_str(s); @@ -318,37 +328,46 @@ pub fn writer_str_or_meas(warnings: Sender) -> (thread::JoinHandle<()>, _ => { buf.push_str("\n"); buf.push_str(s); - match client.post(url.clone()) + debug!(logger, "sending buffer to influx"; + "buf.len()" => buf.len()); + let resp = client.post(url.clone()) .body(buf.as_str()) - .send() { + .send(); + match resp { - Ok(Response { status, .. }) if status == StatusCode::NoContent => {} + Ok(Response { status, .. }) if status == StatusCode::NoContent => { + debug!(logger, "server responded ok: 204 NoContent"); + } Ok(mut resp) => { let mut server_resp = String::with_capacity(1024); - server_resp.push_str(&format!("sent at {}:\n", Utc::now())); - server_resp.push_str(&buf); - server_resp.push_str("\nreceived:\n"); + //server_resp.push_str(&format!("sent at {}:\n", Utc::now())); + //server_resp.push_str(&buf); + //server_resp.push_str("\nreceived:\n"); resp.read_to_string(&mut server_resp); //.unwrap_or(0); - OpenOptions::new() - .create(true) - .append(true) - .open("/home/jstrong/src/market-maker/influx-errors.txt") - .map_err(|e| { - warnings.send(Warning::Error(format!("failed to save influx error: {}", e))); - }).map(|mut file| { - write!(file, "{}", server_resp); - }); - server_resp.truncate(120); - warnings.send( - Warning::Error( - format!("Influx server: {}", server_resp))); + error!(logger, "influx server error"; + "status" => resp.status.to_string(), + "body" => server_resp); + // OpenOptions::new() + // .create(true) + // .append(true) + // .open("/home/jstrong/src/market-maker/influx-errors.txt") + // .map_err(|e| { + // warnings.send(Warning::Error(format!("failed to save influx error: {}", e))); + // }).map(|mut file| { + // write!(file, "{}", server_resp); + // }); + // server_resp.truncate(120); + // warnings.send( + // Warning::Error( + // format!("Influx server: {}", server_resp))); } Err(why) => { - warnings.send( - Warning::Error( - format!("Influx write error: {}", why))); + error!(logger, "http request failed: {:?}", why); + // warnings.send( + // Warning::Error( + // format!("Influx write error: {}", why))); } } buf.clear(); @@ -357,21 +376,35 @@ pub fn writer_str_or_meas(warnings: Sender) -> (thread::JoinHandle<()>, } }; + let mut rcvd_msg = false; + loop { + rcvd_msg = false; rx.try_recv() .map(|meas| { + debug!(logger, "rcvd new OwnedMeasurement"; + "count" => count); serialize_owned(&meas, &mut meas_buf); count = next(count, &meas_buf, &mut buf); meas_buf.clear(); + rcvd_msg = true; }); - socket.recv_bytes(0).ok() + socket.recv_bytes(zmq::DONTWAIT).ok() .and_then(|bytes| { String::from_utf8(bytes).ok() }).map(|s| { + debug!(logger, "rcvd new serialized"; + "count" => count); count = next(count, &s, &mut buf); + rcvd_msg = true; }); + + if !rcvd_msg { + thread::sleep(Duration::from_millis(1) / 10); + } } + crit!(logger, "goodbye"); }); (thread, tx) }