|
@@ -188,8 +188,8 @@ macro_rules! measure { |
|
|
/// |
|
|
/// |
|
|
#[derive(Debug)] |
|
|
#[derive(Debug)] |
|
|
pub struct InfluxWriter { |
|
|
pub struct InfluxWriter { |
|
|
host: &'static str, |
|
|
|
|
|
db: &'static str, |
|
|
|
|
|
|
|
|
host: String, |
|
|
|
|
|
db: String, |
|
|
tx: Sender<Option<OwnedMeasurement>>, |
|
|
tx: Sender<Option<OwnedMeasurement>>, |
|
|
thread: Option<Arc<thread::JoinHandle<()>>>, |
|
|
thread: Option<Arc<thread::JoinHandle<()>>>, |
|
|
} |
|
|
} |
|
@@ -211,8 +211,8 @@ impl Clone for InfluxWriter { |
|
|
debug_assert!(self.thread.is_some()); |
|
|
debug_assert!(self.thread.is_some()); |
|
|
let thread = self.thread.as_ref().map(|x| Arc::clone(x)); |
|
|
let thread = self.thread.as_ref().map(|x| Arc::clone(x)); |
|
|
InfluxWriter { |
|
|
InfluxWriter { |
|
|
host: self.host, |
|
|
|
|
|
db: self.db, |
|
|
|
|
|
|
|
|
host: self.host.to_string(), |
|
|
|
|
|
db: self.db.to_string(), |
|
|
tx: self.tx.clone(), |
|
|
tx: self.tx.clone(), |
|
|
thread, |
|
|
thread, |
|
|
} |
|
|
} |
|
@@ -231,24 +231,27 @@ impl InfluxWriter { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#[allow(unused_assignments)] |
|
|
#[allow(unused_assignments)] |
|
|
pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u16) -> Self { |
|
|
|
|
|
|
|
|
pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self { |
|
|
let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel(); |
|
|
let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel(); |
|
|
let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread |
|
|
let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread |
|
|
|
|
|
|
|
|
#[cfg(feature = "no-influx-buffer")] |
|
|
#[cfg(feature = "no-influx-buffer")] |
|
|
let buffer_size = 0u16; |
|
|
let buffer_size = 0u16; |
|
|
|
|
|
|
|
|
|
|
|
debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size); |
|
|
|
|
|
|
|
|
|
|
|
let url = |
|
|
|
|
|
Url::parse_with_params(&format!("http://{}:8086/write", host), |
|
|
|
|
|
&[("db", db), ("precision", "ns")]) |
|
|
|
|
|
.expect("influx writer url should parse"); |
|
|
|
|
|
|
|
|
let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { |
|
|
let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || { |
|
|
debug!(logger, "initializing url"; |
|
|
|
|
|
"DB_HOST" => host, |
|
|
|
|
|
"DB_NAME" => db); |
|
|
|
|
|
let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse"); |
|
|
|
|
|
let client = Client::new(); |
|
|
let client = Client::new(); |
|
|
|
|
|
|
|
|
debug!(logger, "initializing buffers"); |
|
|
debug!(logger, "initializing buffers"); |
|
|
//let mut meas_buf = String::with_capacity(32 * 32 * 32); |
|
|
|
|
|
let mut buf = String::with_capacity(32 * 32 * 32); |
|
|
let mut buf = String::with_capacity(32 * 32 * 32); |
|
|
let mut count = 0; |
|
|
let mut count = 0; |
|
|
|
|
|
|
|
|
let send = |buf: &str| { |
|
|
let send = |buf: &str| { |
|
|
let resp = client.post(url.clone()) |
|
|
let resp = client.post(url.clone()) |
|
|
.body(buf) |
|
|
.body(buf) |
|
@@ -273,11 +276,7 @@ impl InfluxWriter { |
|
|
} |
|
|
} |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
let next2 = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 { |
|
|
|
|
|
// trace!(logger, "appending serialized measurement to buffer"; |
|
|
|
|
|
// "prev" => prev, |
|
|
|
|
|
// "buf.len()" => buf.len()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 { |
|
|
match prev { |
|
|
match prev { |
|
|
0 if buffer_size > 0 => { |
|
|
0 if buffer_size > 0 => { |
|
|
serialize_owned(m, buf); |
|
|
serialize_owned(m, buf); |
|
@@ -293,7 +292,6 @@ impl InfluxWriter { |
|
|
n => { |
|
|
n => { |
|
|
buf.push_str("\n"); |
|
|
buf.push_str("\n"); |
|
|
serialize_owned(m, buf); |
|
|
serialize_owned(m, buf); |
|
|
//if s.len() > 0 { buf.push_str(s); } |
|
|
|
|
|
debug!(logger, "sending buffer to influx"; "len" => n); |
|
|
debug!(logger, "sending buffer to influx"; "len" => n); |
|
|
send(buf); |
|
|
send(buf); |
|
|
buf.clear(); |
|
|
buf.clear(); |
|
@@ -303,15 +301,14 @@ impl InfluxWriter { |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
loop { |
|
|
loop { |
|
|
//match rx.try_recv() { |
|
|
|
|
|
match rx.recv() { |
|
|
match rx.recv() { |
|
|
Ok(Some(mut meas)) => { |
|
|
Ok(Some(mut meas)) => { |
|
|
if meas.timestamp.is_none() { |
|
|
|
|
|
meas.timestamp = Some(now()); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
trace!(logger, "rcvd new OwnedMeasurement"; "count" => count); |
|
|
|
|
|
count = next2(count, &meas, &mut buf); |
|
|
|
|
|
|
|
|
if meas.timestamp.is_none() { meas.timestamp = Some(now()) } |
|
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } } |
|
|
|
|
|
|
|
|
|
|
|
count = next(count, &meas, &mut buf); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Ok(None) => { |
|
|
Ok(None) => { |
|
@@ -323,7 +320,6 @@ impl InfluxWriter { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_ => { |
|
|
_ => { |
|
|
//#[cfg(feature = "no-thrash")] |
|
|
|
|
|
thread::sleep(Duration::new(0, 1)) |
|
|
thread::sleep(Duration::new(0, 1)) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@@ -331,8 +327,8 @@ impl InfluxWriter { |
|
|
}).unwrap(); |
|
|
}).unwrap(); |
|
|
|
|
|
|
|
|
InfluxWriter { |
|
|
InfluxWriter { |
|
|
host, |
|
|
|
|
|
db, |
|
|
|
|
|
|
|
|
host: host.to_string(), |
|
|
|
|
|
db: db.to_string(), |
|
|
tx, |
|
|
tx, |
|
|
thread: Some(Arc::new(thread)) |
|
|
thread: Some(Arc::new(thread)) |
|
|
} |
|
|
} |
|
|