Browse Source

InfluxWriter refactor (mostly)

master
Jonathan Strong 6 years ago
parent
commit
e9e6e01ebe
4 changed files with 109 additions and 104 deletions
  1. +1
    -0
      Cargo.toml
  2. +1
    -1
      src/hist.rs
  3. +94
    -97
      src/influx.rs
  4. +13
    -6
      src/lib.rs

+ 1
- 0
Cargo.toml View File

@@ -29,4 +29,5 @@ pubsub = { path = "../pubsub" }
[features] [features]
no-thrash = [] no-thrash = []
trace = [] trace = []
debug = []
test = [] test = []

+ 1
- 1
src/hist.rs View File

@@ -102,7 +102,7 @@ impl HistLog {
let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs(); let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
let path = dir.join(&format!("{}-interval-log-{}.v2z", series, seconds)); let path = dir.join(&format!("{}-interval-log-{}.v2z", series, seconds));
let file = fs::File::create(&path).unwrap(); let file = fs::File::create(&path).unwrap();
thread::Builder::new().name(format!("HistLog::scribe::{}", series)).spawn(move || {
thread::Builder::new().name(format!("mm:hist:{}", series)).spawn(move || {
let mut buf = io::LineWriter::new(file); let mut buf = io::LineWriter::new(file);
let mut wtr = let mut wtr =
IntervalLogWriterBuilder::new() IntervalLogWriterBuilder::new()


+ 94
- 97
src/influx.rs View File

@@ -2,7 +2,8 @@
//! //!


use std::io::Read; use std::io::Read;
use std::sync::mpsc::{Sender, channel, SendError};
use std::sync::Arc;
use std::sync::mpsc::{Sender, Receiver, channel, SendError};
use std::thread; use std::thread;
use std::fs; use std::fs;
use std::time::Duration; use std::time::Duration;
@@ -22,7 +23,7 @@ use fnv::FnvHasher;
use decimal::d128; use decimal::d128;
use uuid::Uuid; use uuid::Uuid;


use super::{nanos, file_logger};
use super::{nanos, file_logger, LOG_LEVEL};
use warnings::Warning; use warnings::Warning;


const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
@@ -33,7 +34,7 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
const ZMQ_RCV_HWM: i32 = 0; const ZMQ_RCV_HWM: i32 = 0;
const ZMQ_SND_HWM: i32 = 0; const ZMQ_SND_HWM: i32 = 0;


const BUFFER_SIZE: u8 = 80;
const BUFFER_SIZE: u16 = 80;


pub use super::{dur_nanos, dt_nanos}; pub use super::{dur_nanos, dt_nanos};


@@ -171,29 +172,35 @@ macro_rules! measure {
/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
/// measurements have accumulated. /// measurements have accumulated.
/// ///
#[derive(Debug)]
pub struct InfluxWriter { pub struct InfluxWriter {
host: &'static str, host: &'static str,
db: &'static str, db: &'static str,
tx: Sender<OwnedMeasurement>,
kill_switch: Sender<()>,
thread: Option<thread::JoinHandle<()>>,
tx: Sender<Option<OwnedMeasurement>>,
thread: Option<Arc<thread::JoinHandle<()>>>,
} }


impl Default for InfluxWriter { impl Default for InfluxWriter {
fn default() -> Self { fn default() -> Self {
InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE)
if cfg!(any(test, feature = "test")) {
InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 1)
} else if cfg!(feature = "localhost") {
InfluxWriter::new("localhost", "mm2", "/home/jstrong/src/logging/var/log/influx-default.log", BUFFER_SIZE)
} else {
InfluxWriter::new("washington.0ptimus.internal", "mm2", "var/influx-default.log", BUFFER_SIZE)
}
} }
} }


impl Clone for InfluxWriter { impl Clone for InfluxWriter {
fn clone(&self) -> Self { fn clone(&self) -> Self {
let (tx, _) = channel();
debug_assert!(self.thread.is_some());
let thread = self.thread.as_ref().map(|x| Arc::clone(x));
InfluxWriter { InfluxWriter {
host: self.host, host: self.host,
db: self.db, db: self.db,
tx: self.tx.clone(), tx: self.tx.clone(),
kill_switch: tx,
thread: None,
thread,
} }
} }
} }
@@ -201,145 +208,123 @@ impl Clone for InfluxWriter {
impl InfluxWriter { impl InfluxWriter {
/// Sends the `OwnedMeasurement` to the serialization thread. /// Sends the `OwnedMeasurement` to the serialization thread.
/// ///
pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<OwnedMeasurement>> {
self.tx.send(m)
pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
self.tx.send(Some(m))
} }


pub fn tx(&self) -> Sender<OwnedMeasurement> {
pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
self.tx.clone() self.tx.clone()
} }


#[allow(unused_assignments)] #[allow(unused_assignments)]
pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self {
let (kill_switch, terminate) = channel();
let (tx, rx) = channel();
let logger = file_logger(log_path, Severity::Info);
let thread = thread::spawn(move || {
pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u16) -> Self {
let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
let buffer_size = if cfg!(feature = "trace") { 0u16 } else { buffer_size };
let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
debug!(logger, "initializing url"; debug!(logger, "initializing url";
"DB_HOST" => host, "DB_HOST" => host,
"DB_NAME" => db); "DB_NAME" => db);

#[cfg(not(test))]
let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse"); let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
#[cfg(not(test))]
let client = Client::new(); let client = Client::new();


debug!(logger, "initializing buffers"); debug!(logger, "initializing buffers");
let mut meas_buf = String::with_capacity(32 * 32 * 32);
//let mut meas_buf = String::with_capacity(32 * 32 * 32);
let mut buf = String::with_capacity(32 * 32 * 32); let mut buf = String::with_capacity(32 * 32 * 32);
let mut count = 0; let mut count = 0;
let send = |buf: &str| {
let resp = client.post(url.clone())
.body(buf)
.send();
match resp {

Ok(Response { status, .. }) if status == StatusCode::NoContent => {
debug!(logger, "server responded ok: 204 NoContent");
}

Ok(mut resp) => {
let mut server_resp = String::with_capacity(1024);
let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
error!(logger, "influx server error";
"status" => resp.status.to_string(),
"body" => server_resp);
}

Err(why) => {
error!(logger, "http request failed: {:?}", why);
}
}
};


let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
trace!(logger, "appending serialized measurement to buffer";
"prev" => prev,
"buf.len()" => buf.len());
let next2 = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
// trace!(logger, "appending serialized measurement to buffer";
// "prev" => prev,
// "buf.len()" => buf.len());


match prev { match prev {
0 if buffer_size > 0 => { 0 if buffer_size > 0 => {
buf.push_str(s);
serialize_owned(m, buf);
1 1
} }


n if n < buffer_size => { n if n < buffer_size => {
buf.push_str("\n"); buf.push_str("\n");
buf.push_str(s);
serialize_owned(m, buf);
n + 1 n + 1
} }


_ => {
n => {
buf.push_str("\n"); buf.push_str("\n");
if s.len() > 0 {
buf.push_str(s);
}
trace!(logger, "sending buffer to influx";
"buf.len()" => buf.len());

#[cfg(not(test))]
{
let resp = client.post(url.clone())
.body(buf.as_str())
.send();
match resp {

Ok(Response { status, .. }) if status == StatusCode::NoContent => {
trace!(logger, "server responded ok: 204 NoContent");
}

Ok(mut resp) => {
let mut server_resp = String::with_capacity(1024);
let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
error!(logger, "influx server error";
"status" => resp.status.to_string(),
"body" => server_resp);
}

Err(why) => {
error!(logger, "http request failed: {:?}", why);
}
}
}
serialize_owned(m, buf);
//if s.len() > 0 { buf.push_str(s); }
debug!(logger, "sending buffer to influx"; "len" => n);
send(buf);
buf.clear(); buf.clear();
0 0
} }
} }
}; };


let mut rcvd_msg = false;

loop { loop {
rcvd_msg = false;
let _ = rx.recv_timeout(Duration::from_millis(10))
.map(|mut meas: OwnedMeasurement| {
// if we didn't set the timestamp, it would end up
// being whenever we accumulated `BUFFER_SIZE` messages,
// which might be some period of time after we received
// the message.
//
match rx.try_recv() {
Ok(Some(mut meas)) => {
if meas.timestamp.is_none() { if meas.timestamp.is_none() {
meas.timestamp = Some(now()); meas.timestamp = Some(now());
} }


trace!(logger, "rcvd new OwnedMeasurement"; "count" => count); trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
serialize_owned(&meas, &mut meas_buf);
count = next(count, &meas_buf, &mut buf);
meas_buf.clear();
rcvd_msg = true;
});

let end = terminate.try_recv()
.map(|_| {
let _ = next(::std::u8::MAX, "", &mut buf);
true
}).unwrap_or(false);

if end { break }

#[cfg(feature = "no-thrash")]
{
if !rcvd_msg {
thread::sleep(Duration::new(0, 5000));
count = next2(count, &meas, &mut buf);
}

Ok(None) => {
if buf.len() > 0 { send(&buf) }
break
}

_ => {
#[cfg(feature = "no-thrash")]
thread::sleep(Duration::new(0, 0))
} }
} }
} }

debug!(logger, "goodbye");
});
}).unwrap();


InfluxWriter { InfluxWriter {
host, host,
db, db,
tx, tx,
kill_switch,
thread: Some(thread)
thread: Some(Arc::new(thread))
} }
} }
} }


impl Drop for InfluxWriter { impl Drop for InfluxWriter {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.kill_switch.send(()).unwrap();
if let Some(thread) = self.thread.take() {
let _ = thread.join();
if let Some(arc) = self.thread.take() {
if let Ok(thread) = Arc::try_unwrap(arc) {
let _ = self.tx.send(None);
let _ = thread.join();
}
} }
} }
} }
@@ -664,11 +649,23 @@ mod tests {
a.f(); a.f();
} }


#[test]
fn it_clones_an_influx_writer_to_check_both_drop() {
let influx = InfluxWriter::default();
measure!(influx, drop_test, i(a, 1), i(b, 2));
{
let influx = influx.clone();
thread::spawn(move || {
measure!(influx, drop_test, i(a, 3), i(b, 4));
});
}
}

#[bench] #[bench]
fn influx_writer_send_basic(b: &mut Bencher) { fn influx_writer_send_basic(b: &mut Bencher) {
let m = InfluxWriter::default();
let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
b.iter(|| { b.iter(|| {
measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]);
measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
}); });
} }




+ 13
- 6
src/lib.rs View File

@@ -41,13 +41,20 @@ pub mod warnings;
pub mod latency; pub mod latency;
pub mod hist; pub mod hist;


#[cfg(feature = "trace")]
pub const LOG_LEVEL : Severity = Severity::Trace;
#[cfg(all(feature = "debug", not(feature = "trace")))]
pub const LOG_LEVEL : Severity = Severity::Debug;
#[cfg(not(any(feature = "debug", feature = "trace")))]
pub const LOG_LEVEL : Severity = Severity::Info;

/// converts a chrono::DateTime to an integer timestamp (ns) /// converts a chrono::DateTime to an integer timestamp (ns)
/// ///
pub fn nanos(t: DateTime<Utc>) -> u64 { pub fn nanos(t: DateTime<Utc>) -> u64 {
(t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64) (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64)
} }


#[cfg(not(any(test, feature = "test")))]
//#[cfg(not(any(test, feature = "test")))]
pub fn file_logger(path: &str, level: Severity) -> slog::Logger { pub fn file_logger(path: &str, level: Severity) -> slog::Logger {
let mut builder = FileLoggerBuilder::new(path); let mut builder = FileLoggerBuilder::new(path);
builder.level(level); builder.level(level);
@@ -55,11 +62,11 @@ pub fn file_logger(path: &str, level: Severity) -> slog::Logger {
builder.build().unwrap() builder.build().unwrap()
} }


#[cfg(any(test, feature = "test"))]
pub fn file_logger(_: &str, _: Severity) -> slog::Logger {
use slog::*;
Logger::root(Discard, o!())
}
// #[cfg(any(test, feature = "test"))]
// pub fn file_logger(_: &str, _: Severity) -> slog::Logger {
// use slog::*;
// Logger::root(Discard, o!())
// }


pub fn async_file_logger(path: &str, level: Severity) -> slog::Logger { pub fn async_file_logger(path: &str, level: Severity) -> slog::Logger {
let drain = file_logger(path, level); let drain = file_logger(path, level);


Loading…
Cancel
Save