|
@@ -5,6 +5,7 @@ use std::io::Read; |
|
|
use std::sync::Arc; |
|
|
use std::sync::Arc; |
|
|
use std::sync::mpsc::{Sender, Receiver, channel, SendError}; |
|
|
use std::sync::mpsc::{Sender, Receiver, channel, SendError}; |
|
|
use std::thread; |
|
|
use std::thread; |
|
|
|
|
|
#[cfg(feature = "warnings")] |
|
|
use std::fs; |
|
|
use std::fs; |
|
|
use std::time::Duration; |
|
|
use std::time::Duration; |
|
|
use std::hash::BuildHasherDefault; |
|
|
use std::hash::BuildHasherDefault; |
|
@@ -17,7 +18,6 @@ use influent::measurement::{Measurement, Value}; |
|
|
use zmq; |
|
|
use zmq; |
|
|
#[allow(unused_imports)] |
|
|
#[allow(unused_imports)] |
|
|
use chrono::{DateTime, Utc}; |
|
|
use chrono::{DateTime, Utc}; |
|
|
use sloggers::types::Severity; |
|
|
|
|
|
use ordermap::OrderMap; |
|
|
use ordermap::OrderMap; |
|
|
use fnv::FnvHasher; |
|
|
use fnv::FnvHasher; |
|
|
use decimal::d128; |
|
|
use decimal::d128; |
|
@@ -27,24 +27,6 @@ use super::{nanos, file_logger, LOG_LEVEL}; |
|
|
#[cfg(feature = "warnings")] |
|
|
#[cfg(feature = "warnings")] |
|
|
use warnings::Warning; |
|
|
use warnings::Warning; |
|
|
|
|
|
|
|
|
const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; |
|
|
|
|
|
const ZMQ_RCV_HWM: i32 = 0; |
|
|
|
|
|
const ZMQ_SND_HWM: i32 = 0; |
|
|
|
|
|
|
|
|
|
|
|
const BUFFER_SIZE: u16 = 80; |
|
|
|
|
|
|
|
|
|
|
|
#[cfg(not(any(test, feature = "test")))] |
|
|
|
|
|
const DB_NAME: &'static str = "mm2"; |
|
|
|
|
|
|
|
|
|
|
|
#[cfg(any(test, feature = "test"))] |
|
|
|
|
|
const DB_NAME: &'static str = "mm2_test"; |
|
|
|
|
|
|
|
|
|
|
|
#[cfg(not(any(feature = "scholes")))] |
|
|
|
|
|
const DB_HOST: &'static str = "http://127.0.0.1:8086/write"; |
|
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "scholes")] |
|
|
|
|
|
const DB_HOST: &'static str = "http://159.203.81.249:8086/write"; |
|
|
|
|
|
|
|
|
|
|
|
pub use super::{dur_nanos, dt_nanos}; |
|
|
pub use super::{dur_nanos, dt_nanos}; |
|
|
|
|
|
|
|
|
pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>; |
|
|
pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>; |
|
@@ -191,11 +173,11 @@ pub struct InfluxWriter { |
|
|
|
|
|
|
|
|
impl Default for InfluxWriter { |
|
|
impl Default for InfluxWriter { |
|
|
fn default() -> Self { |
|
|
fn default() -> Self { |
|
|
if cfg!(any(test, feature = "test")) { |
|
|
|
|
|
InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 1) |
|
|
|
|
|
} else { |
|
|
|
|
|
InfluxWriter::new("localhost", "mm2", "/home/jstrong/src/logging/var/log/influx-default.log", BUFFER_SIZE) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
//if cfg!(any(test, feature = "test")) { |
|
|
|
|
|
// InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0) |
|
|
|
|
|
//} else { |
|
|
|
|
|
InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 4000) |
|
|
|
|
|
//} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@@ -341,17 +323,19 @@ impl Drop for InfluxWriter { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; |
|
|
|
|
|
|
|
|
pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { |
|
|
pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { |
|
|
let socket = ctx.socket(zmq::PULL)?; |
|
|
let socket = ctx.socket(zmq::PULL)?; |
|
|
socket.bind(WRITER_ADDR)?; |
|
|
socket.bind(WRITER_ADDR)?; |
|
|
socket.set_rcvhwm(ZMQ_RCV_HWM)?; |
|
|
|
|
|
|
|
|
socket.set_rcvhwm(0)?; |
|
|
Ok(socket) |
|
|
Ok(socket) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { |
|
|
pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { |
|
|
let socket = ctx.socket(zmq::PUSH)?; |
|
|
let socket = ctx.socket(zmq::PUSH)?; |
|
|
socket.connect(WRITER_ADDR)?; |
|
|
socket.connect(WRITER_ADDR)?; |
|
|
socket.set_sndhwm(ZMQ_SND_HWM)?; |
|
|
|
|
|
|
|
|
socket.set_sndhwm(0)?; |
|
|
Ok(socket) |
|
|
Ok(socket) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@@ -515,7 +499,9 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { |
|
|
#[cfg(feature = "warnings")] |
|
|
#[cfg(feature = "warnings")] |
|
|
#[deprecated(since="0.4", note="Replace with InfluxWriter")] |
|
|
#[deprecated(since="0.4", note="Replace with InfluxWriter")] |
|
|
pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> { |
|
|
pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> { |
|
|
|
|
|
assert!(false); |
|
|
thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || { |
|
|
thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || { |
|
|
|
|
|
const DB_HOST: &'static str = "http://127.0.0.1:8086/write"; |
|
|
let _ = fs::create_dir("/tmp/mm"); |
|
|
let _ = fs::create_dir("/tmp/mm"); |
|
|
let ctx = zmq::Context::new(); |
|
|
let ctx = zmq::Context::new(); |
|
|
let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); |
|
|
let socket = pull(&ctx).expect("influx::writer failed to create pull socket"); |
|
@@ -580,6 +566,11 @@ pub enum OwnedValue { |
|
|
Uuid(Uuid), |
|
|
Uuid(Uuid), |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Holds data meant for an influxdb measurement in transit to the |
|
|
|
|
|
/// writing thread. |
|
|
|
|
|
/// |
|
|
|
|
|
/// TODO: convert `Map` to `SmallVec`? |
|
|
|
|
|
/// |
|
|
#[derive(Clone, Debug)] |
|
|
#[derive(Clone, Debug)] |
|
|
pub struct OwnedMeasurement { |
|
|
pub struct OwnedMeasurement { |
|
|
pub key: &'static str, |
|
|
pub key: &'static str, |
|
@@ -689,13 +680,13 @@ mod tests { |
|
|
|
|
|
|
|
|
#[bench] |
|
|
#[bench] |
|
|
fn influx_writer_send_price(b: &mut Bencher) { |
|
|
fn influx_writer_send_price(b: &mut Bencher) { |
|
|
let m = InfluxWriter::default(); |
|
|
|
|
|
|
|
|
let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000); |
|
|
b.iter(|| { |
|
|
b.iter(|| { |
|
|
measure!(m, test, |
|
|
measure!(m, test, |
|
|
tag[ticker; t!(xmr-btc).to_str()], |
|
|
|
|
|
tag[exchange; "plnx"], |
|
|
|
|
|
d128[bid; d128::zero()], |
|
|
|
|
|
d128[ask; d128::zero()], |
|
|
|
|
|
|
|
|
t(ticker, t!(xmr-btc).as_str()), |
|
|
|
|
|
t(exchange, "plnx"), |
|
|
|
|
|
d(bid, d128::zero()), |
|
|
|
|
|
d(ask, d128::zero()), |
|
|
); |
|
|
); |
|
|
}); |
|
|
}); |
|
|
} |
|
|
} |
|
@@ -911,7 +902,7 @@ mod tests { |
|
|
buf.push_str(&buf_copy); |
|
|
buf.push_str(&buf_copy); |
|
|
println!("{}", buf); |
|
|
println!("{}", buf); |
|
|
|
|
|
|
|
|
let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); |
|
|
|
|
|
|
|
|
let url = Url::parse_with_params("localhost", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); |
|
|
let client = Client::new(); |
|
|
let client = Client::new(); |
|
|
match client.post(url.clone()) |
|
|
match client.post(url.clone()) |
|
|
.body(&buf) |
|
|
.body(&buf) |
|
@@ -984,7 +975,7 @@ mod tests { |
|
|
buf.push_str(&buf_copy); |
|
|
buf.push_str(&buf_copy); |
|
|
println!("{}", buf); |
|
|
println!("{}", buf); |
|
|
|
|
|
|
|
|
let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse"); |
|
|
|
|
|
|
|
|
let url = Url::parse_with_params("localhost", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); |
|
|
let client = Client::new(); |
|
|
let client = Client::new(); |
|
|
match client.post(url.clone()) |
|
|
match client.post(url.clone()) |
|
|
.body(&buf) |
|
|
.body(&buf) |
|
@@ -1002,60 +993,4 @@ mod tests { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// macro_rules! make_measurement { |
|
|
|
|
|
// (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; |
|
|
|
|
|
// (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; |
|
|
|
|
|
// (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) }; |
|
|
|
|
|
// (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; |
|
|
|
|
|
// (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; |
|
|
|
|
|
// (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; |
|
|
|
|
|
// (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) }; |
|
|
|
|
|
// (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) }; |
|
|
|
|
|
// |
|
|
|
|
|
// (@count_tags) => {0usize}; |
|
|
|
|
|
// (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)}; |
|
|
|
|
|
// (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)}; |
|
|
|
|
|
// |
|
|
|
|
|
// (@count_fields) => {0usize}; |
|
|
|
|
|
// (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; |
|
|
|
|
|
// (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)}; |
|
|
|
|
|
// (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)}; |
|
|
|
|
|
// |
|
|
|
|
|
// ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{ |
|
|
|
|
|
// let n_tags = measure!(@count_tags $($t)*); |
|
|
|
|
|
// let n_fields = measure!(@count_fields $($t)*); |
|
|
|
|
|
// let mut meas = |
|
|
|
|
|
// $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields); |
|
|
|
|
|
// $( |
|
|
|
|
|
// measure!(@kv $t, meas, $($tail)*); |
|
|
|
|
|
// )* |
|
|
|
|
|
// //let _ = $m.send(meas); |
|
|
|
|
|
// meas |
|
|
|
|
|
// }}; |
|
|
|
|
|
// } |
|
|
|
|
|
// |
|
|
|
|
|
// #[test] |
|
|
|
|
|
// fn it_checks_n_tags_is_correct() { |
|
|
|
|
|
// let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel(); |
|
|
|
|
|
// assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1); |
|
|
|
|
|
// assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2); |
|
|
|
|
|
// assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0); |
|
|
|
|
|
// assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0); |
|
|
|
|
|
// |
|
|
|
|
|
// let m4 = |
|
|
|
|
|
// make_measurement!(tx, test, |
|
|
|
|
|
// tag[a;"b"], |
|
|
|
|
|
// tag[c;"d"], |
|
|
|
|
|
// int[n; 1], |
|
|
|
|
|
// tag[e;"f"], |
|
|
|
|
|
// float[x; 1.234], |
|
|
|
|
|
// tag[g;"h"], |
|
|
|
|
|
// time[1], |
|
|
|
|
|
// ); |
|
|
|
|
|
// assert_eq!(m4.n_tags, 4); |
|
|
|
|
|
// assert_eq!(m4.n_fields, 2); |
|
|
|
|
|
// } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
} |