|
@@ -1,5 +1,5 @@ |
|
|
//! Utilities to efficiently send data to influx |
|
|
//! Utilities to efficiently send data to influx |
|
|
//! |
|
|
|
|
|
|
|
|
//! |
|
|
|
|
|
|
|
|
use std::io::Read; |
|
|
use std::io::Read; |
|
|
use std::sync::Arc; |
|
|
use std::sync::Arc; |
|
@@ -128,13 +128,13 @@ impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } } |
|
|
/// assert_eq!(meas.timestamp, Some(1)); |
|
|
/// assert_eq!(meas.timestamp, Some(1)); |
|
|
/// |
|
|
/// |
|
|
/// // use the @make_meas flag to skip sending a measurement, instead merely |
|
|
/// // use the @make_meas flag to skip sending a measurement, instead merely |
|
|
/// // creating it. |
|
|
|
|
|
|
|
|
/// // creating it. |
|
|
/// |
|
|
/// |
|
|
/// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]); |
|
|
/// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]); |
|
|
/// |
|
|
/// |
|
|
/// // each variant also has shorthand aliases |
|
|
/// // each variant also has shorthand aliases |
|
|
/// |
|
|
/// |
|
|
/// let meas: OwnedMeasurement = |
|
|
|
|
|
|
|
|
/// let meas: OwnedMeasurement = |
|
|
/// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]); |
|
|
/// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]); |
|
|
/// } |
|
|
/// } |
|
|
/// ``` |
|
|
/// ``` |
|
@@ -165,7 +165,7 @@ macro_rules! measure { |
|
|
(@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) }; |
|
|
(@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) }; |
|
|
|
|
|
|
|
|
(@as_expr $e:expr) => {$e}; |
|
|
(@as_expr $e:expr) => {$e}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(@count_tags) => {0usize}; |
|
|
(@count_tags) => {0usize}; |
|
|
(@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; |
|
|
(@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; |
|
|
(@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; |
|
|
(@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; |
|
@@ -182,7 +182,7 @@ macro_rules! measure { |
|
|
(@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ |
|
|
(@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ |
|
|
let n_tags = measure!(@count_tags $($t)*); |
|
|
let n_tags = measure!(@count_tags $($t)*); |
|
|
let n_fields = measure!(@count_fields $($t)*); |
|
|
let n_fields = measure!(@count_fields $($t)*); |
|
|
let mut meas = |
|
|
|
|
|
|
|
|
let mut meas = |
|
|
$crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); |
|
|
$crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); |
|
|
$( |
|
|
$( |
|
|
measure!(@kv $t, meas, $($tail)*); |
|
|
measure!(@kv $t, meas, $($tail)*); |
|
@@ -202,10 +202,10 @@ macro_rules! measure { |
|
|
}}; |
|
|
}}; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s |
|
|
|
|
|
/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` |
|
|
|
|
|
/// measurements have accumulated. |
|
|
|
|
|
/// |
|
|
|
|
|
|
|
|
/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s |
|
|
|
|
|
/// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE` |
|
|
|
|
|
/// measurements have accumulated. |
|
|
|
|
|
/// |
|
|
#[derive(Debug)] |
|
|
#[derive(Debug)] |
|
|
pub struct InfluxWriter { |
|
|
pub struct InfluxWriter { |
|
|
host: String, |
|
|
host: String, |
|
@@ -238,8 +238,8 @@ impl Clone for InfluxWriter { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
impl InfluxWriter { |
|
|
impl InfluxWriter { |
|
|
/// Sends the `OwnedMeasurement` to the serialization thread. |
|
|
|
|
|
/// |
|
|
|
|
|
|
|
|
/// Sends the `OwnedMeasurement` to the serialization thread. |
|
|
|
|
|
/// |
|
|
#[cfg_attr(feature = "inlines", inline)] |
|
|
#[cfg_attr(feature = "inlines", inline)] |
|
|
pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> { |
|
|
pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> { |
|
|
self.tx.send(Some(m)) |
|
|
self.tx.send(Some(m)) |
|
@@ -365,14 +365,14 @@ impl InfluxWriter { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Ok(None) => { |
|
|
Ok(None) => { |
|
|
if buf.len() > 0 { |
|
|
|
|
|
|
|
|
if buf.len() > 0 { |
|
|
debug!(logger, "sending buffer to influx"; "len" => count); |
|
|
debug!(logger, "sending buffer to influx"; "len" => count); |
|
|
send(&buf) |
|
|
|
|
|
|
|
|
send(&buf) |
|
|
} |
|
|
} |
|
|
break |
|
|
break |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
_ => { |
|
|
|
|
|
|
|
|
_ => { |
|
|
thread::sleep(Duration::new(0, 1)) |
|
|
thread::sleep(Duration::new(0, 1)) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@@ -463,9 +463,9 @@ pub fn now() -> i64 { |
|
|
|
|
|
|
|
|
/// Serialize the measurement into influx line protocol |
|
|
/// Serialize the measurement into influx line protocol |
|
|
/// and append to the buffer. |
|
|
/// and append to the buffer. |
|
|
/// |
|
|
|
|
|
|
|
|
/// |
|
|
/// # Examples |
|
|
/// # Examples |
|
|
/// |
|
|
|
|
|
|
|
|
/// |
|
|
/// ``` |
|
|
/// ``` |
|
|
/// extern crate influent; |
|
|
/// extern crate influent; |
|
|
/// extern crate logging; |
|
|
/// extern crate logging; |
|
@@ -480,9 +480,9 @@ pub fn now() -> i64 { |
|
|
/// m.add_field("x", Value::Integer(1)); |
|
|
/// m.add_field("x", Value::Integer(1)); |
|
|
/// serialize(&m, &mut buf); |
|
|
/// serialize(&m, &mut buf); |
|
|
/// } |
|
|
/// } |
|
|
/// |
|
|
|
|
|
|
|
|
/// |
|
|
/// ``` |
|
|
/// ``` |
|
|
/// |
|
|
|
|
|
|
|
|
/// |
|
|
pub fn serialize(measurement: &Measurement, line: &mut String) { |
|
|
pub fn serialize(measurement: &Measurement, line: &mut String) { |
|
|
line.push_str(&escape(measurement.key)); |
|
|
line.push_str(&escape(measurement.key)); |
|
|
|
|
|
|
|
@@ -517,11 +517,11 @@ pub fn serialize(measurement: &Measurement, line: &mut String) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. |
|
|
|
|
|
/// |
|
|
|
|
|
|
|
|
/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. |
|
|
|
|
|
/// |
|
|
/// The serialized measurement is appended to the end of the string without |
|
|
/// The serialized measurement is appended to the end of the string without |
|
|
/// any regard for what exited in it previously. |
|
|
|
|
|
/// |
|
|
|
|
|
|
|
|
/// any regard for what exited in it previously. |
|
|
|
|
|
/// |
|
|
pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { |
|
|
pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { |
|
|
line.push_str(&escape_tag(measurement.key)); |
|
|
line.push_str(&escape_tag(measurement.key)); |
|
|
|
|
|
|
|
@@ -604,7 +604,7 @@ pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> { |
|
|
if let Ok(bytes) = socket.recv_bytes(0) { |
|
|
if let Ok(bytes) = socket.recv_bytes(0) { |
|
|
if let Ok(msg) = String::from_utf8(bytes) { |
|
|
if let Ok(msg) = String::from_utf8(bytes) { |
|
|
count = match count { |
|
|
count = match count { |
|
|
0 => { |
|
|
|
|
|
|
|
|
0 => { |
|
|
buf.push_str(&msg); |
|
|
buf.push_str(&msg); |
|
|
1 |
|
|
1 |
|
|
} |
|
|
} |
|
@@ -736,7 +736,6 @@ impl OwnedMeasurement { |
|
|
.find(|kv| kv.0 == key) |
|
|
.find(|kv| kv.0 == key) |
|
|
.map(|kv| kv.1) |
|
|
.map(|kv| kv.1) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#[allow(unused_imports, unused_variables)] |
|
|
#[allow(unused_imports, unused_variables)] |
|
@@ -831,8 +830,8 @@ mod tests { |
|
|
fn influx_writer_send_price(b: &mut Bencher) { |
|
|
fn influx_writer_send_price(b: &mut Bencher) { |
|
|
let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); |
|
|
let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); |
|
|
b.iter(|| { |
|
|
b.iter(|| { |
|
|
measure!(m, test, |
|
|
|
|
|
t(ticker, t!(xmr-btc).as_str()), |
|
|
|
|
|
|
|
|
measure!(m, test, |
|
|
|
|
|
t(ticker, t!(xmr-btc).as_str()), |
|
|
t(exchange, "plnx"), |
|
|
t(exchange, "plnx"), |
|
|
d(bid, d128::zero()), |
|
|
d(bid, d128::zero()), |
|
|
d(ask, d128::zero()), |
|
|
d(ask, d128::zero()), |
|
@@ -991,13 +990,13 @@ mod tests { |
|
|
loop { if rx.recv().is_err() { break } } |
|
|
loop { if rx.recv().is_err() { break } } |
|
|
}); |
|
|
}); |
|
|
b.iter(|| { |
|
|
b.iter(|| { |
|
|
measure!(tx, test, |
|
|
|
|
|
tag[color; "red"], |
|
|
|
|
|
|
|
|
measure!(tx, test, |
|
|
|
|
|
tag[color; "red"], |
|
|
tag[mood => "playful"], |
|
|
tag[mood => "playful"], |
|
|
tag [ ticker => "xmr_btc" ], |
|
|
tag [ ticker => "xmr_btc" ], |
|
|
float[ price => 1.2345 ], |
|
|
float[ price => 1.2345 ], |
|
|
float[ amount => 56.323], |
|
|
float[ amount => 56.323], |
|
|
int[n; 1], |
|
|
|
|
|
|
|
|
int[n; 1], |
|
|
time[now()] |
|
|
time[now()] |
|
|
); |
|
|
); |
|
|
}); |
|
|
}); |
|
@@ -1069,13 +1068,12 @@ mod tests { |
|
|
panic!(why) |
|
|
panic!(why) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#[bench] |
|
|
#[bench] |
|
|
fn serialize_owned_longer(b: &mut Bencher) { |
|
|
fn serialize_owned_longer(b: &mut Bencher) { |
|
|
let mut buf = String::with_capacity(1024); |
|
|
let mut buf = String::with_capacity(1024); |
|
|
let m = |
|
|
|
|
|
|
|
|
let m = |
|
|
OwnedMeasurement::new("test") |
|
|
OwnedMeasurement::new("test") |
|
|
.add_tag("one", "a") |
|
|
.add_tag("one", "a") |
|
|
.add_tag("two", "b") |
|
|
.add_tag("two", "b") |
|
@@ -1096,7 +1094,7 @@ mod tests { |
|
|
#[bench] |
|
|
#[bench] |
|
|
fn serialize_owned_simple(b: &mut Bencher) { |
|
|
fn serialize_owned_simple(b: &mut Bencher) { |
|
|
let mut buf = String::with_capacity(1024); |
|
|
let mut buf = String::with_capacity(1024); |
|
|
let m = |
|
|
|
|
|
|
|
|
let m = |
|
|
OwnedMeasurement::new("test") |
|
|
OwnedMeasurement::new("test") |
|
|
.add_tag("one", "a") |
|
|
.add_tag("one", "a") |
|
|
.add_tag("two", "b") |
|
|
.add_tag("two", "b") |
|
@@ -1109,7 +1107,6 @@ mod tests { |
|
|
}); |
|
|
}); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
#[test] |
|
|
fn it_serializes_a_hard_to_serialize_message_from_owned() { |
|
|
fn it_serializes_a_hard_to_serialize_message_from_owned() { |
|
|
let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; |
|
|
let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; |
|
|