From 9932cec369362a9ee1ab1010d53925e95f6f734c Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 21 Feb 2018 16:22:59 -0500 Subject: [PATCH] misc --- src/influx.rs | 46 +++++++++++++++++++++------------------------- src/lib.rs | 4 ++++ src/warnings.rs | 2 +- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/influx.rs b/src/influx.rs index f1062f9..84caa00 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -188,8 +188,8 @@ macro_rules! measure { /// #[derive(Debug)] pub struct InfluxWriter { - host: &'static str, - db: &'static str, + host: String, + db: String, tx: Sender>, thread: Option>>, } @@ -211,8 +211,8 @@ impl Clone for InfluxWriter { debug_assert!(self.thread.is_some()); let thread = self.thread.as_ref().map(|x| Arc::clone(x)); InfluxWriter { - host: self.host, - db: self.db, + host: self.host.to_string(), + db: self.db.to_string(), tx: self.tx.clone(), thread, } @@ -231,24 +231,27 @@ impl InfluxWriter { } #[allow(unused_assignments)] - pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u16) -> Self { + pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self { let (tx, rx): (Sender>, Receiver>) = channel(); let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread #[cfg(feature = "no-influx-buffer")] let buffer_size = 0u16; + debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size); + + let url = + Url::parse_with_params(&format!("http://{}:8086/write", host), + &[("db", db), ("precision", "ns")]) + .expect("influx writer url should parse"); + let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { - debug!(logger, "initializing url"; - "DB_HOST" => host, - "DB_NAME" => db); - let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse"); let client = Client::new(); debug!(logger, "initializing buffers"); - //let mut meas_buf = String::with_capacity(32 * 32 * 32); let mut buf = String::with_capacity(32 * 32 * 32); let mut count = 0; + let send = |buf: &str| { let resp = client.post(url.clone()) .body(buf) @@ -273,11 +276,7 @@ impl InfluxWriter { } }; - let next2 = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 { - // trace!(logger, "appending serialized measurement to buffer"; - // "prev" => prev, - // "buf.len()" => buf.len()); - + let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 { match prev { 0 if buffer_size > 0 => { serialize_owned(m, buf); @@ -293,7 +292,6 @@ impl InfluxWriter { n => { buf.push_str("\n"); serialize_owned(m, buf); - //if s.len() > 0 { buf.push_str(s); } debug!(logger, "sending buffer to influx"; "len" => n); send(buf); buf.clear(); @@ -303,15 +301,14 @@ impl InfluxWriter { }; loop { - //match rx.try_recv() { match rx.recv() { Ok(Some(mut meas)) => { - if meas.timestamp.is_none() { - meas.timestamp = Some(now()); - } - trace!(logger, "rcvd new OwnedMeasurement"; "count" => count); - count = next2(count, &meas, &mut buf); + if meas.timestamp.is_none() { meas.timestamp = Some(now()) } + + #[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } } + + count = next(count, &meas, &mut buf); } Ok(None) => { @@ -323,7 +320,6 @@ impl InfluxWriter { } _ => { - //#[cfg(feature = "no-thrash")] thread::sleep(Duration::new(0, 1)) } } @@ -331,8 +327,8 @@ impl InfluxWriter { }).unwrap(); InfluxWriter { - host, - db, + host: host.to_string(), + db: db.to_string(), tx, thread: Some(Arc::new(thread)) } diff --git a/src/lib.rs b/src/lib.rs index a315e9b..c026570 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,8 @@ pub const LOG_LEVEL : Severity = Severity::Debug; #[cfg(not(any(feature = "debug", feature = "trace")))] pub const LOG_LEVEL : Severity = Severity::Info; +const CHANNEL_SIZE: usize = 40_000; + /// converts a chrono::DateTime to an integer timestamp (ns) /// pub fn nanos(t: DateTime) -> u64 { @@ -59,6 +61,7 @@ pub fn file_logger(path: &str, level: Severity) -> slog::Logger { let mut builder = FileLoggerBuilder::new(path); builder.level(level); builder.timezone(TimeZone::Utc); + builder.channel_size(CHANNEL_SIZE); builder.build().unwrap() } @@ -67,6 +70,7 @@ pub fn truncating_file_logger(path: &str, level: Severity) -> slog::Logger { builder.level(level); builder.timezone(TimeZone::Utc); builder.truncate(); + builder.channel_size(CHANNEL_SIZE); builder.build().unwrap() } diff --git a/src/warnings.rs b/src/warnings.rs index 2772b6e..2eb0be3 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -60,7 +60,7 @@ macro_rules! notice { } #[macro_export] -macro_rules! error { +macro_rules! error_w { ($warnings:ident, $($args:tt)*) => ( { $warnings.send(Warning::Error( ( format!($($args)*) ) ) ).unwrap();