|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680 |
- //! Utilities to efficiently send data to influx
- //!
-
- #![cfg_attr(feature = "unstable", feature(test))]
-
- #[cfg(all(feature = "unstable", test))]
- extern crate test;
- #[macro_use]
- extern crate slog;
-
- use std::io::Read;
- use std::sync::Arc;
- use std::{thread, mem};
- use std::time::*;
- use std::collections::VecDeque;
- use std::convert::TryInto;
- use crossbeam_channel::{Sender, Receiver, bounded, SendError};
- use hyper::status::StatusCode;
- use hyper::client::response::Response;
- use hyper::Url;
- use hyper::client::Client;
- use slog::Drain;
- use chrono::prelude::*;
- use decimal::d128;
- use uuid::Uuid;
- use smallvec::SmallVec;
- use slog::Logger;
- use pretty_toa::ThousandsSep;
-
- /// whether non-finite `f64` and `d128` values should be skipped
- /// during serialization to influxdb line format. influx does not
- /// handle `NaN` values at all. the other option is a marker value,
- /// previously `-999.0` had been used.
- pub const SKIP_NAN_VALUES: bool = true;
-
- pub const DROP_DEADLINE: Duration = Duration::from_secs(30);
-
- pub type Credentials = hyper::header::Authorization<hyper::header::Basic>;
-
- /// Created this so I know what types can be passed through the
- /// `measure!` macro, which used to convert with `as i64` and
- /// `as f64` until I accidentally passed a function name, and it
- /// still compiled, but with garbage numbers.
- pub trait AsI64 {
- fn as_i64(x: Self) -> i64;
- }
-
- impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
- impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
- impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
- impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
- impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
- impl AsI64 for i128 { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
- impl AsI64 for u128 { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
- impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
- impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
- impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
- impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
- impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
- impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
-
- /// Created this so I know what types can be passed through the
- /// `measure!` macro, which used to convert with `as i64` and
- /// `as f64` until I accidentally passed a function name, and it
- /// still compiled, but with garbage numbers.
- pub trait AsF64 {
- fn as_f64(x: Self) -> f64;
- }
-
- impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
- impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
- impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
- impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
- impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
- impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
- impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
-
- /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
- ///
- /// The macro both creates an `OwnedMeasurement` from the supplied tags and
- /// values, as well as sends it with the `Sender`.
- ///
- /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
- /// measurement (see `tests` mod).
- ///
- /// # Examples
- ///
- /// ```
- /// #[macro_use]
- /// extern crate influx_writer;
- ///
- /// use influx_writer::{OwnedValue, OwnedMeasurement, AsI64};
- ///
- /// use decimal::d128;
- ///
- /// fn main() {
- /// let (tx, rx) = crossbeam_channel::bounded(1024);
- ///
- /// // "shorthand" syntax
- ///
- /// measure!(tx, test, t(color, "red"), i(n, 1));
- ///
- /// let meas: OwnedMeasurement = rx.recv().unwrap();
- ///
- /// assert_eq!(meas.key, "test");
- /// assert_eq!(meas.get_tag("color"), Some("red"));
- /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
- ///
- /// measure!(tx, test,
- /// t(one, "a"), t(two, "b"), i(three, 2),
- /// f(four, 1.2345), s(five, String::from("d")),
- /// b(six, true), i(seven, 1 + 2),
- /// tm(1)
- /// );
- ///
- /// let meas: OwnedMeasurement = rx.recv().unwrap();
- ///
- /// assert_eq!(meas.key, "test");
- /// assert_eq!(meas.get_tag("one"), Some("a"));
- /// assert_eq!(meas.get_tag("two"), Some("b"));
- /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
- /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
- /// assert_eq!(meas.timestamp, Some(1));
- ///
- /// // use the @make_meas flag to skip sending a measurement, instead merely
- /// // creating it.
- ///
- /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, t(color, "red"), i(n, 1));
- ///
- /// // each variant also has shorthand aliases
- ///
- /// let meas: OwnedMeasurement = measure!(@make_meas abcd, t(color, "red"), i(n, 1), d(price, d128::zero()));
- /// }
- /// ```
- ///
- #[macro_export]
- macro_rules! measure {
- (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
- (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
- (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
- //(@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
- (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
- (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
- (@kv v, $meas:ident, $k:expr) => { measure!(@ea t, $meas, "version", $k) };
- (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
- (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
- (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Integer(AsI64::as_i64($v))) };
- (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Float(AsF64::as_f64($v))) };
- (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::String($v)) };
- (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128($v)) };
- (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Uuid($v)) };
- (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Boolean(bool::from($v))) };
-
- (@ea D, $meas:ident, $k:expr, $v:expr) => {
- match $v {
- Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128(v)) }
- None => {}
- }
- };
- (@ea I, $meas:ident, $k:expr, $v:expr) => {
- if $v.is_some() {
- $meas = $meas.add_field($k, $crate::OwnedValue::Integer(AsI64::as_i64($v.unwrap())));
- }
- };
- (@ea F, $meas:ident, $k:expr, $v:expr) => {
- if $v.is_some() {
- $meas = $meas.add_field($k, $crate::OwnedValue::Float(AsF64::as_f64($v.unwrap())));
- }
- };
- // (@ea T, $meas:ident, $k:expr, $v:expr) => {
- // let maybe_v: Option<
- // match $v {
- // Some(v) => { $meas = $meas.add_tag($k, v) }
- // None => {}
- // }
- // };
- (@ea U, $meas:ident, $k:expr, $v:expr) => {
- match $v {
- Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::Uuid(v)) }
- None => {}
- }
- };
- (@ea B, $meas:ident, $k:expr, $v:expr) => {
- match $v {
- Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::Boolean(v)) }
- None => {}
- }
- };
- (@ea S, $meas:ident, $k:expr, $v:expr) => {
- match $v {
- Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::String(v)) }
- None => {}
- }
- };
-
- (@as_expr $e:expr) => {$e};
-
- (@count_tags) => {0usize};
- (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
- (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
-
- (@count_fields) => {0usize};
- (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
- (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
- (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
-
- (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
- measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
- };
-
- (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
- let n_tags = measure!(@count_tags $($t)*);
- let n_fields = measure!(@count_fields $($t)*);
- let mut meas =
- $crate::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
- $(
- measure!(@kv $t, meas, $($tail)*);
- )*
- meas
- }};
-
- ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
- measure!($m, $name, $($t [ $($tail)* ] ),+)
- };
-
- ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
- #[allow(unused_imports)]
- use $crate::{AsI64, AsF64};
- let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
- let _ = $m.send(measurement);
- }};
- }
-
- /// converts a chrono::DateTime to an integer timestamp (ns)
- ///
- #[inline]
- pub fn nanos(t: DateTime<Utc>) -> u64 {
- (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64)
- }
-
- #[inline]
- pub fn secs(d: Duration) -> f64 {
- d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
- }
-
- #[inline]
- pub fn inanos(t: DateTime<Utc>) -> i64 {
- t.timestamp() * 1_000_000_000i64 + t.timestamp_subsec_nanos() as i64
- }
-
- //#[deprecated(since="0.4.3", note="Use `nanos(DateTime<Utc>) -> u64` instead")]
- pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
- (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
- }
-
- #[inline]
- pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
- (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
- }
-
- #[inline]
- pub fn nanos_utc(t: i64) -> DateTime<Utc> {
- Utc.timestamp(t / 1_000_000_000, (t % 1_000_000_000) as u32)
- }
-
- #[derive(Clone, Debug)]
- struct Point<T, V> {
- pub time: T,
- pub value: V
- }
-
- struct DurationWindow {
- pub size: Duration,
- pub mean: Duration,
- pub sum: Duration,
- pub count: u32,
- pub items: VecDeque<Point<Instant, Duration>>
- }
-
- #[allow(dead_code)]
- impl DurationWindow {
- #[inline]
- pub fn update(&mut self, time: Instant, value: Duration) {
- self.add(time, value);
- self.refresh(time);
- }
-
- #[inline]
- pub fn refresh(&mut self, t: Instant) -> &Self {
- if !self.items.is_empty() {
- let (n_remove, sum, count) =
- self.items.iter()
- .take_while(|x| t - x.time > self.size)
- .fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
- (n_remove + 1, sum - x.value, count - 1)
- });
- self.sum = sum;
- self.count = count;
- for _ in 0..n_remove {
- self.items.pop_front();
- }
- }
-
- if self.count > 0 {
- self.mean = self.sum / self.count.into();
- }
-
- self
- }
-
- #[inline]
- pub fn add(&mut self, time: Instant, value: Duration) {
- let p = Point { time, value };
- self.sum += p.value;
- self.count += 1;
- self.items.push_back(p);
- }
- }
-
- /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
- /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
- /// measurements have accumulated.
- ///
- #[derive(Debug)]
- pub struct InfluxWriter {
- host: String,
- db: String,
- tx: Sender<Option<OwnedMeasurement>>,
- thread: Option<Arc<thread::JoinHandle<()>>>,
- }
-
- impl Default for InfluxWriter {
- fn default() -> Self {
- InfluxWriter::new("localhost", "test")
- }
- }
-
- impl Clone for InfluxWriter {
- fn clone(&self) -> Self {
- let thread = self.thread.as_ref().map(|x| Arc::clone(x));
- InfluxWriter {
- host: self.host.to_string(),
- db: self.db.to_string(),
- tx: self.tx.clone(),
- thread,
- }
- }
- }
-
- impl InfluxWriter {
- pub fn host(&self) -> &str { self.host.as_str() }
-
- pub fn db(&self) -> &str { self.db.as_str() }
-
- /// Sends the `OwnedMeasurement` to the serialization thread.
- ///
- #[inline]
- pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
- //if self.thread.is_none() {
- // let _ = self.tx.try_send(Some(m));
- // Ok(())
- //} else {
- self.tx.send(Some(m))
- //}
- }
-
- #[inline]
- pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
-
- #[inline]
- pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
-
- #[inline]
- pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
-
- #[inline]
- pub fn rsecs(&self, d: Duration) -> f64 {
- ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
- * 1000.0)
- .round()
- / 1000.0
- }
-
- #[inline]
- pub fn secs(&self, d: Duration) -> f64 {
- d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
- }
-
- pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
- self.tx.clone()
- }
-
- #[inline]
- pub fn is_full(&self) -> bool { self.tx.is_full() }
-
- /// provides a shell interface that immediately drops measurements sent to it
- pub fn placeholder() -> Self {
- let (tx, _) = bounded(1);
- Self {
- host: String::new(),
- db: String::new(),
- tx,
- thread: None,
- }
- }
-
- pub fn is_placeholder(&self) -> bool {
- self.thread.is_none() && self.host == ""
- }
-
- pub fn new(host: &str, db: &str) -> Self {
- let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
- Self::with_logger_and_opt_creds(host, db, None, &noop_logger)
- }
-
- pub fn get_credentials(username: String, password: Option<String>) -> Credentials {
- hyper::header::Authorization(
- hyper::header::Basic { username, password }
- )
- }
-
- fn http_req<'a>(client: &'a Client, url: Url, body: &'a str, creds: &Option<Credentials>) -> hyper::client::RequestBuilder<'a> {
- let req = client.post(url.clone())
- .body(body);
- if let Some(auth) = creds {
- req.header(auth.clone())
- } else {
- req
- }
- }
-
- #[allow(unused_assignments)]
- pub fn with_logger(host: &str, db: &str, logger: &Logger) -> Self {
- Self::with_logger_and_opt_creds(host, db, None, logger)
- }
-
- pub fn with_logger_and_opt_creds(host: &str, db: &str, creds: Option<Credentials>, logger: &Logger) -> Self {
- let logger = logger.new(o!(
- "host" => host.to_string(),
- "db" => db.to_string()));
- let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(4096);
- let url =
- Url::parse_with_params(&format!("http://{}:8086/write", host),
- &[("db", db), ("precision", "ns")])
- .expect("influx writer url should parse");
- let thread = thread::Builder::new().name(format!("inflx:{}", db)).spawn(move || {
- use std::time::*;
- use crossbeam_channel as chan;
-
- #[cfg(feature = "no-influx-buffer")]
- const N_BUFFER_LINES: usize = 0;
-
- const N_BUFFER_LINES: usize = 1024;
- const MAX_PENDING: Duration = Duration::from_secs(3);
- const INITIAL_BUFFER_CAPACITY: usize = 4096;
- const MAX_BACKLOG: usize = 1024;
- const MAX_OUTSTANDING_HTTP: usize = 64;
- const DEBUG_HB_EVERY: usize = 1024 * 96;
- const INFO_HB_EVERY: usize = 1024 * 1024;
- const N_HTTP_ATTEMPTS: u32 = 15;
- const INITIAL_BACKLOG: usize = MAX_OUTSTANDING_HTTP * 2;
-
- let client = Arc::new(Client::new());
- let creds = Arc::new(creds);
-
- info!(logger, "initializing InfluxWriter ...";
- "N_BUFFER_LINES" => N_BUFFER_LINES,
- "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING),
- "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP,
- "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
- "INITIAL_BACKLOG" => INITIAL_BACKLOG,
- "MAX_BACKLOG" => MAX_BACKLOG,
- );
-
- // pre-allocated buffers ready for use if the active one is stasheed
- // during an outage
- let mut spares: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
-
- // queue failed sends here until problem resolved, then send again. in worst
- // case scenario, loop back around on buffers queued in `backlog`, writing
- // over the oldest first.
- //
- let mut backlog: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
-
- for _ in 0..INITIAL_BACKLOG {
- spares.push_back(String::with_capacity(INITIAL_BUFFER_CAPACITY));
- }
-
- struct Resp {
- pub buf: String,
- pub took: Duration,
- }
-
- let mut db_health = DurationWindow {
- size: Duration::from_secs(120),
- mean: Duration::new(10, 0),
- sum: Duration::new(0, 0),
- count: 0,
- items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP),
- };
-
- let (http_tx, http_rx) = chan::bounded(32);
-
- let mut buf = spares.pop_front().unwrap();
- let mut count = 0;
- let mut extras = 0; // any new Strings we intro to the system
- let mut n_rcvd = 0;
- let mut n_pts: u64 = 0;
- let mut in_flight_buffer_bytes = 0;
- let mut last = Instant::now();
- let mut active: bool;
- let mut last_clear = Instant::now();
- let mut last_memory_check = Instant::now();
- let mut loop_time: Instant;
-
- let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
- INITIAL_BACKLOG + extras - s.len() - b.len() - 1
- };
-
- assert_eq!(n_out(&spares, &backlog, extras), 0);
-
- let count_allocated_memory = |spares: &VecDeque<String>, backlog: &VecDeque<String>, in_flight_buffer_bytes: &usize| -> usize {
- spares.iter().map(|x| x.capacity()).sum::<usize>()
- + backlog.iter().map(|x| x.capacity()).sum::<usize>()
- + (*in_flight_buffer_bytes)
- };
-
- let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize, in_flight_buffer_bytes: &mut usize| {
- if n_outstanding >= MAX_OUTSTANDING_HTTP {
- backlog.push_back(buf);
- return
- }
- let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
- let tx = http_tx.clone();
- let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "in flight req at spawn time" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
- let client = Arc::clone(&client);
- let creds = Arc::clone(&creds);
- *in_flight_buffer_bytes = *in_flight_buffer_bytes + buf.capacity();
- debug!(logger, "launching http thread");
- let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
- let logger = thread_logger;
- debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len());
- let start = Instant::now();
- for n_req in 0..N_HTTP_ATTEMPTS {
- let throttle = Duration::from_secs(2) * n_req * n_req;
- if n_req > 0 {
- warn!(logger, "InfluxWriter http thread: pausing before next request";
- "n_req" => n_req,
- "throttle" => %format_args!("{:?}", throttle),
- "elapsed" => %format_args!("{:?}", Instant::now() - start));
- thread::sleep(throttle); // 0, 2, 8, 16, 32
- }
- let sent = Instant::now();
- let req = Self::http_req(&client, url.clone(), buf.as_str(), &creds);
- let resp = req.send();
- let rcvd = Instant::now();
- let took = rcvd - sent;
- let mut n_tx = 0u32;
- match resp {
- Ok(Response { status, .. }) if status == StatusCode::NoContent => {
- debug!(logger, "server responded ok: 204 NoContent");
- buf.clear();
- let mut resp = Some(Ok(Resp { buf, took }));
- loop {
- n_tx += 1;
- match tx.try_send(resp.take().unwrap()) {
- Ok(_) => {
- if n_req > 0 {
- info!(logger, "successfully recovered from failed request with retry";
- "n_req" => n_req,
- "n_tx" => n_tx,
- "elapsed" => %format_args!("{:?}", Instant::now() - start));
- }
- return
- }
-
- Err(chan::TrySendError::Full(r)) => {
- let throttle = Duration::from_millis(1000) * n_tx;
- warn!(logger, "channel full: InfluxWriter http thread failed to return buf";
- "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle));
- resp = Some(r);
- thread::sleep(throttle);
- }
-
- Err(chan::TrySendError::Disconnected(_)) => {
- warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return";
- "n_tx" => n_tx, "n_req" => n_req);
- return
- }
- }
- }
- }
-
- Ok(mut resp) => {
- let mut server_resp = String::new();
- let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
- error!(logger, "influx server error (request took {:?})", took;
- "status" => %resp.status,
- "body" => server_resp);
- }
-
- Err(e) => {
- error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e);
- }
- }
-
- }
- let took = Instant::now() - start;
- warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
- "took" => %format_args!("{:?}", took));
- let buflen = buf.len();
- let n_lines = buf.lines().count();
- if let Err(e) = tx.send(Err(Resp { buf, took })) {
- crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e;
- "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines);
- }
- });
-
- if let Err(e) = thread_res {
- crit!(logger, "failed to spawn thread: {}", e);
- }
- };
-
- let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
- match prev {
- 0 if N_BUFFER_LINES > 0 => {
- serialize_owned(m, buf);
- Ok(1)
- }
-
- n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => {
- buf.push_str("\n");
- serialize_owned(m, buf);
- Ok(n + 1)
- }
-
- n => {
- buf.push_str("\n");
- serialize_owned(m, buf);
- Err(n + 1)
- }
- }
- };
-
- 'event: loop {
- loop_time = Instant::now();
- active = false;
-
- if loop_time - last_memory_check > Duration::from_secs(300) {
- let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
- let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
- info!(logger, "InfluxWriter: allocated memory: {:.1}MB", allocated_mb;
- "allocated bytes" => allocated_bytes,
- "in flight buffer bytes" => in_flight_buffer_bytes,
- "spares.len()" => spares.len(),
- "backlog.len()" => backlog.len(),
- );
- last_memory_check = loop_time;
- }
- match rx.recv() {
- Ok(Some(mut meas)) => {
- n_rcvd += 1;
- n_pts += meas.fields.len() as u64;
- active = true;
-
- if n_rcvd % INFO_HB_EVERY == 0 {
- let n_outstanding = n_out(&spares, &backlog, extras);
- let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
- let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
- info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep();
- "total pts written" => n_pts.thousands_sep(),
- "n_outstanding" => n_outstanding,
- "spares.len()" => spares.len(),
- "n_rcvd" => n_rcvd,
- "n_active_buf" => count,
- "db_health" => %format_args!("{:?}", db_health.mean),
- "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
- "backlog.len()" => backlog.len());
- } else if n_rcvd % DEBUG_HB_EVERY == 0 {
- let n_outstanding = n_out(&spares, &backlog, extras);
- let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
- let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
- debug!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep();
- "n_outstanding" => n_outstanding,
- "spares.len()" => spares.len(),
- "n_rcvd" => n_rcvd,
- "n_active_buf" => count,
- "db_health" => %format_args!("{:?}", db_health.mean),
- "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
- "backlog.len()" => backlog.len());
- }
-
- if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
-
- if meas.fields.is_empty() {
- meas.fields.push(("n", OwnedValue::Integer(1)));
- }
-
- //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
-
- count = match next(count, &meas, &mut buf, loop_time, last) {
- Ok(n) => n,
- Err(_n) => {
- let mut count = 0;
- let mut next: String = match spares.pop_front() {
- Some(x) => x,
-
- None => {
- let n_outstanding = n_out(&spares, &backlog, extras);
- if n_outstanding > MAX_BACKLOG {
- warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog";
- "total pts written" => n_pts.thousands_sep(),
- "n_outstanding" => n_outstanding,
- "spares.len()" => spares.len(),
- "n_rcvd" => n_rcvd,
- "backlog.len()" => backlog.len());
- match backlog.pop_front() {
- // Note: this does not clear the backlog buffer,
- // instead we will just write more and more until
- // we are out of memory. I expect that will never
- // happen.
- //
- Some(x) => {
- count = 1; // otherwise, no '\n' added in `next(..)` - we are
- // sending a "full" buffer to be extended
- x
- }
-
- None => {
- extras += 1;
- crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String";
- "total pts written" => n_pts.thousands_sep(),
- "n_outstanding" => n_outstanding,
- "spares.len()" => spares.len(),
- "backlog.len()" => backlog.len(),
- "n_rcvd" => n_rcvd,
- "extras" => extras);
- String::new()
- }
- }
- } else {
- extras += 1;
- let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY;
- let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
- info!(logger, "InfluxWriter: allocating new buffer: zero spares avail";
- "total pts written" => n_pts.thousands_sep(),
- "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
- "n_outstanding" => n_outstanding,
- "extras" => extras,
- );
- String::with_capacity(INITIAL_BUFFER_CAPACITY)
- }
- }
- };
- // after swap, buf in next, so want to send next
- //
- mem::swap(&mut buf, &mut next);
- let n_outstanding = n_out(&spares, &backlog, extras);
- send(next, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
- last = loop_time;
- count
- }
- };
- }
-
- Ok(None) => {
- let start = Instant::now();
- let mut hb = Instant::now();
- warn!(logger, "terminate signal rcvd"; "count" => count);
- if buf.len() > 0 {
- info!(logger, "InfluxWriter: sending remaining buffer to influx on terminate"; "count" => count);
- let meas = OwnedMeasurement::new("influx_writer").add_field("n", OwnedValue::Integer(1));
- let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last);
- let n_outstanding = n_out(&spares, &backlog, extras);
- let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
- mem::swap(&mut buf, &mut placeholder);
- send(placeholder, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
- }
- let mut n_ok = 0;
- let mut n_err = 0;
- loop {
- loop_time = Instant::now();
- let n_outstanding = n_out(&spares, &backlog, extras);
- if backlog.is_empty() && n_outstanding < 1 {
- info!(logger, "InfluxWriter: cleared any remaining backlog";
- "total pts written" => n_pts.thousands_sep(),
- "n_outstanding" => n_outstanding,
- "spares.len()" => spares.len(),
- "backlog.len()" => backlog.len(),
- "n_cleared_ok" => n_ok,
- "n_cleared_err" => n_err,
- "n_rcvd" => n_rcvd,
- "extras" => extras,
- "elapsed" => %format_args!("{:?}", loop_time - start));
- break 'event
- }
-
- if loop_time.saturating_duration_since(start) > DROP_DEADLINE {
- crit!(logger, "drop deadline exceeded! commencing dirty exit :( ";
- "total pts written" => n_pts.thousands_sep(),
- "elapsed" => ?(loop_time.saturating_duration_since(start)),
- "n outstanding" => n_outstanding,
- "backlog.len()" => backlog.len(),
- );
- break 'event
- }
-
- if loop_time - hb > Duration::from_secs(5) {
- info!(logger, "InfluxWriter still clearing backlog ..";
- "total pts written" => n_pts.thousands_sep(),
- "n_outstanding" => n_outstanding,
- "spares.len()" => spares.len(),
- "backlog.len()" => backlog.len(),
- "n_cleared_ok" => n_ok,
- "n_cleared_err" => n_err,
- "extras" => extras,
- "n_rcvd" => n_rcvd,
- "elapsed" => %format_args!("{:?}", loop_time - start));
- hb = loop_time;
- }
- if let Some(buf) = backlog.pop_front() {
- let n_outstanding = n_out(&spares, &backlog, extras);
- debug!(logger, "InfluxWriter: resending queued buffer from backlog";
- "backlog.len()" => backlog.len(),
- "spares.len()" => spares.len(),
- "n_rcvd" => n_rcvd,
- "n_outstanding" => n_outstanding);
- send(buf, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
- }
-
- 'rx: loop {
- match http_rx.try_recv() {
- Ok(Ok(Resp { buf, .. })) => {
- n_ok += 1;
- in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
- if spares.len() <= INITIAL_BACKLOG {
- spares.push_back(buf); // needed so `n_outstanding` count remains accurate
- } else {
- extras = extras.saturating_sub(1);
- }
- }
- Ok(Err(Resp { buf, .. })) => {
- warn!(logger, "InfluxWriter: requeueing failed request"; "buf.len()" => buf.len());
- n_err += 1;
- in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
- backlog.push_front(buf);
- }
- Err(chan::TryRecvError::Disconnected) => {
- crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting";
- "total pts written" => n_pts.thousands_sep(),
- "n_outstanding" => n_outstanding,
- "backlog.len()" => backlog.len(),
- "n_cleared_ok" => n_ok,
- "n_cleared_err" => n_err,
- "extras" => extras,
- "n_rcvd" => n_rcvd,
- "elapsed" => %format_args!("{:?}", loop_time - start));
- break 'event
- }
- Err(_) => break 'rx
- }
- }
- thread::sleep(Duration::from_millis(1));
- }
- }
-
- _ => {}
- }
-
- db_health.refresh(loop_time);
- let n_outstanding = n_out(&spares, &backlog, extras);
- let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
- if (n_outstanding < MAX_OUTSTANDING_HTTP
- || loop_time.saturating_duration_since(last_clear) > Duration::from_secs(60))
- && healthy {
-
- if let Some(queued) = backlog.pop_front() {
- let n_outstanding = n_out(&spares, &backlog, extras);
- send(queued, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
- active = true;
- }
- last_clear = loop_time;
- }
-
- loop {
- match http_rx.try_recv() {
- Ok(Ok(Resp { buf, took })) => {
- db_health.add(loop_time, took);
- let in_flight_before = in_flight_buffer_bytes.clone();
- in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
- if spares.len() <= INITIAL_BACKLOG {
- spares.push_back(buf);
- } else {
- extras = extras.saturating_sub(1);
- debug!(logger, "InfluxWriter: dropping buffer to reduce memory back to INITIAL_BACKLOG size";
- "spares.len()" => spares.len(),
- "extras" => extras,
- "in flight before" => in_flight_before,
- "in in_flight_buffer_bytes" => in_flight_buffer_bytes,
- );
- }
-
- //spares.push_back(buf);
- active = true;
- }
-
- Ok(Err(Resp { buf, took })) => {
- db_health.add(loop_time, took);
- in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
- backlog.push_front(buf);
- active = true;
- }
-
- Err(chan::TryRecvError::Disconnected) => {
- crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting";
- "total pts written" => n_pts.thousands_sep(),
- "n_outstanding" => n_outstanding,
- "backlog.len()" => backlog.len(),
- "n_rcvd" => n_rcvd,
- "extras" => extras);
- break 'event
- }
-
- Err(_) => break
- }
- }
-
- if !active {
- thread::sleep(Duration::new(0, 1))
- }
- }
- thread::sleep(Duration::from_millis(10));
- }).unwrap();
-
- InfluxWriter {
- host: host.to_string(),
- db: db.to_string(),
- tx,
- thread: Some(Arc::new(thread))
- }
- }
- }
-
- impl Drop for InfluxWriter {
- fn drop(&mut self) {
- if let Some(arc) = self.thread.take() {
- if let Ok(thread) = Arc::try_unwrap(arc) {
- let _ = self.tx.send(None);
- let _ = thread.join();
- }
- }
- }
- }
-
- /// This removes offending things rather than escaping them.
- ///
- fn escape_tag(s: &str) -> String {
- s.replace(" ", "")
- .replace(",", "")
- .replace("\"", "")
- }
-
- fn escape(s: &str) -> String {
- s.replace(" ", "\\ ")
- .replace(",", "\\,")
- }
-
- fn as_string(s: &str) -> String {
- // the second replace removes double escapes
- //
- format!("\"{}\"", s.replace("\"", "\\\"")
- .replace(r#"\\""#, r#"\""#))
- }
-
- #[test]
- fn it_checks_as_string_does_not_double_escape() {
- let raw = "this is \\\"an escaped string\\\" so it's problematic";
- let escaped = as_string(&raw);
- assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
- }
-
- fn as_boolean(b: &bool) -> &str {
- if *b { "t" } else { "f" }
- }
-
- pub fn now() -> i64 {
- nanos(Utc::now()) as i64
- }
-
- /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
- ///
- /// The serialized measurement is appended to the end of the string without
- /// any regard for what exited in it previously.
- ///
- pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
- line.push_str(&escape_tag(measurement.key));
-
- let add_tag = |line: &mut String, key: &str, value: &str| {
- line.push_str(",");
- line.push_str(&escape_tag(key));
- line.push_str("=");
- line.push_str(&escape(value));
- };
-
- for (key, value) in measurement.tags.iter() {
- #[cfg(not(feature = "string-tags"))]
- add_tag(line, key, value);
-
- #[cfg(feature = "string-tags")]
- add_tag(line, key, value.as_str());
- }
-
- let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| -> bool {
-
- if SKIP_NAN_VALUES && ! value.is_finite() { return false }
-
- if is_first { line.push_str(" "); } else { line.push_str(","); }
- line.push_str(&escape_tag(key));
- line.push_str("=");
- match *value {
- OwnedValue::String(ref s) => line.push_str(&as_string(s)),
- OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
- OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
-
- OwnedValue::D128(ref d) => {
- if d.is_finite() {
- line.push_str(&format!("{}", d));
- } else {
- line.push_str("-999.0");
- }
- }
-
- OwnedValue::Float(ref f) => {
- if f.is_finite() {
- line.push_str(&format!("{}", f));
- } else {
- line.push_str("-999.0");
- }
- }
-
- OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
- };
-
- true
- };
-
- // use this counter to ensure that at least one field was
- // serialized. since NaN values may be skipped, the serialization
- // would be wrong if no fields ended up being serialized. instead,
- // track it, and if there are none serialized, add a n=1 to make
- // the measurement serialize properly
- //
- // this also controls what value is passed to the `is_first` argument
- // of `add_field`
- let mut n_fields_serialized = 0;
-
- for kv in measurement.fields.iter() {
- if add_field(line, kv.0, &kv.1, n_fields_serialized == 0) {
- n_fields_serialized += 1;
- }
- }
-
- // supply a minimum of one field (n=1)
- //
- // TODO: could potentially clobber a "n" tag? do we care?
- //
- if n_fields_serialized == 0 { add_field(line, "n", &OwnedValue::Integer(1), true); }
-
- if let Some(t) = measurement.timestamp {
- line.push_str(" ");
- line.push_str(&t.to_string());
- }
- }
-
- #[derive(Debug, Clone, PartialEq)]
- pub enum OwnedValue {
- String(String),
- Float(f64),
- Integer(i64),
- Boolean(bool),
- D128(d128),
- Uuid(Uuid),
- }
-
- impl OwnedValue {
- /// if `self` is a `Float` or `D128` variant, checks
- /// whether the contained value is finite
- ///
- /// # Examples
- ///
- /// ```
- /// use std::str::FromStr;
- /// use influx_writer::OwnedValue;
- ///
- /// let v1 = OwnedValue::Float(f64::NAN);
- /// assert!( ! v1.is_finite());
- /// let v2 = OwnedValue::Float(1.234f64);
- /// assert!(v2.is_finite());
- ///
- /// let v3 = OwnedValue::D128(decimal::d128::from_str("NaN").unwrap());
- /// assert!( ! v3.is_finite());
- /// let v4 = OwnedValue::D128(decimal::d128::from_str("42.42").unwrap());
- /// assert!(v4.is_finite());
- ///
- /// // other variants are always "finite"
- /// assert!(OwnedValue::String("NaN".into()).is_finite());
- /// ```
- pub fn is_finite(&self) -> bool {
- match self {
- OwnedValue::Float(x) => x.is_finite(),
- OwnedValue::D128(x) => x.is_finite(),
- _ => true,
- }
- }
- }
-
- /// Holds data meant for an influxdb measurement in transit to the
- /// writing thread.
- ///
- #[derive(Clone, Debug)]
- pub struct OwnedMeasurement {
- pub key: &'static str,
- pub timestamp: Option<i64>,
- //pub fields: Map<&'static str, OwnedValue>,
- //pub tags: Map<&'static str, &'static str>,
- pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
- #[cfg(not(feature = "string-tags"))]
- pub tags: SmallVec<[(&'static str, &'static str); 8]>,
- #[cfg(feature = "string-tags")]
- pub tags: SmallVec<[(&'static str, String); 8]>,
- }
-
- impl OwnedMeasurement {
- pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
- OwnedMeasurement {
- key,
- timestamp: None,
- tags: SmallVec::with_capacity(n_tags),
- fields: SmallVec::with_capacity(n_fields),
- }
- }
-
- pub fn new(key: &'static str) -> Self {
- OwnedMeasurement {
- key,
- timestamp: None,
- tags: SmallVec::new(),
- fields: SmallVec::new(),
- }
- }
-
- /// Unusual consuming `self` signature because primarily used by
- /// the `measure!` macro.
- #[cfg(not(feature = "string-tags"))]
- pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
- self.tags.push((key, value));
- self
- }
-
- #[cfg(feature = "string-tags")]
- pub fn add_tag<S: ToString>(mut self, key: &'static str, value: S) -> Self {
- self.tags.push((key, value.to_string()));
- self
- }
-
- /// Unusual consuming `self` signature because primarily used by
- /// the `measure!` macro.
- pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
- self.fields.push((key, value));
- self
- }
-
- pub fn set_timestamp(mut self, timestamp: i64) -> Self {
- self.timestamp = Some(timestamp);
- self
- }
-
- #[cfg(not(feature = "string-tags"))]
- pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
- match self.tags.iter().position(|kv| kv.0 == key) {
- Some(i) => {
- self.tags.get_mut(i)
- .map(|x| {
- x.0 = value;
- });
- self
- }
-
- None => {
- self.add_tag(key, value)
- }
- }
- }
-
- pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
- self.fields.iter()
- .find(|kv| kv.0 == key)
- .map(|kv| &kv.1)
- }
-
- #[cfg(feature = "string-tags")]
- pub fn get_tag(&self, key: &'static str) -> Option<&str> {
- self.tags.iter()
- .find(|kv| kv.0 == key)
- .map(|kv| kv.1.as_str())
- }
-
- #[cfg(not(feature = "string-tags"))]
- pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
- self.tags.iter()
- .find(|kv| kv.0 == key)
- .map(|kv| kv.1)
- }
- }
-
- #[allow(unused)]
- #[cfg(test)]
- mod tests {
- use std::str::FromStr;
- use super::*;
- #[cfg(feature = "unstable")]
- use test::{black_box, Bencher};
-
- #[test]
- fn check_uppercase_shorthands_on_optional_field_and_tag_values() {
- let e: Option<i64> = None;
- let h: Option<f64> = None;
- let meas = measure!(@make_meas test,
- //T(a, Some("one")), T(b, None), t(c, "three"),
- S(a, Some("one".to_string())), S(b, None), s(c, "three".to_string()),
- I(d, Some(4)), I(e), i(f, 6),
- F(g, Some(7.0)), F(h), f(i, 9.0),
- );
- assert_eq!(meas.get_field("a").unwrap(), &OwnedValue::String("one".to_string()));
- assert!(meas.get_field("b").is_none());
- assert_eq!(meas.get_field("c").unwrap(), &OwnedValue::String("three".to_string()));
-
- assert_eq!(meas.get_field("d").unwrap(), &OwnedValue::Integer(4));
- assert!(meas.get_field("e").is_none());
- assert_eq!(meas.get_field("f").unwrap(), &OwnedValue::Integer(6));
-
- assert_eq!(meas.get_field("g").unwrap(), &OwnedValue::Float(7.0));
- assert!(meas.get_field("h").is_none());
- assert_eq!(meas.get_field("i").unwrap(), &OwnedValue::Float(9.0));
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn send_to_disconnected_channel(b: &mut Bencher) {
- let (tx, _): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(1);
- let time = now();
- b.iter(|| {
- const VERSION: &str = "1.0.0";
- let color = "red";
- let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
- tx.send(Some(m))
- })
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn try_send_to_disconnected_channel(b: &mut Bencher) {
- let (tx, _): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(1);
- let time = now();
- b.iter(|| {
- const VERSION: &str = "1.0.0";
- let color = "red";
- let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
- tx.try_send(Some(m))
- })
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn send_to_disconnected_channel_via_placeholder(b: &mut Bencher) {
- let time = now();
- let influx = InfluxWriter::placeholder();
- b.iter(|| {
- const VERSION: &str = "1.0.0";
- let color = "red";
- measure!(influx, test, i(n, 1), t(color), v(VERSION), tm(time));
- })
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn send_to_connected_channel_via_measure(b: &mut Bencher) {
- let time = now();
- let influx = InfluxWriter::new("localhost", "test");
- b.iter(|| {
- const VERSION: &str = "1.0.0";
- let color = "red";
- measure!(influx, bench, i(n, 1), t(color), v(VERSION), tm(time));
- })
- }
-
- #[ignore]
- #[cfg(feature = "unstable")]
- #[bench]
- fn measure_ten(b: &mut Bencher) {
- let influx = InfluxWriter::new("localhost", "test");
- let mut n = 0;
- b.iter(|| {
- for _ in 0..10 {
- let time = influx.nanos(Utc::now());
- n += 1;
- measure!(influx, million, i(n), tm(time));
- }
- });
- }
-
- #[test]
- fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
- const VERSION: &str = "0.3.90";
- let tag_value = "one";
- let color = "red";
- let time = Utc::now();
- let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
- assert_eq!(m.get_tag("color"), Some("red"));
- assert_eq!(m.get_tag("version"), Some(VERSION));
- assert_eq!(m.timestamp, Some(nanos(time) as i64));
- }
-
- #[test]
- fn it_uses_the_v_for_version_shortcut() {
- const VERSION: &str = "0.3.90";
- let tag_value = "one";
- let color = "red";
- let time = now();
- let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
- assert_eq!(m.get_tag("color"), Some("red"));
- assert_eq!(m.get_tag("version"), Some(VERSION));
- assert_eq!(m.timestamp, Some(time));
- }
-
- #[test]
- fn it_uses_the_new_tag_k_only_shortcut() {
- let tag_value = "one";
- let color = "red";
- let time = now();
- let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
- assert_eq!(m.get_tag("color"), Some("red"));
- assert_eq!(m.get_tag("tag_value"), Some("one"));
- assert_eq!(m.timestamp, Some(time));
- }
-
- #[test]
- fn it_uses_measure_macro_parenthesis_syntax() {
- let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
- assert_eq!(m.key, "test");
- assert_eq!(m.get_tag("a"), Some("b"));
- assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
- assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
- assert_eq!(m.timestamp, Some(1));
- }
-
- #[test]
- fn it_uses_measure_macro_on_a_self_attribute() {
- struct A {
- pub influx: InfluxWriter,
- }
-
- impl A {
- fn f(&self) {
- measure!(self.influx, test, t(color, "red"), i(n, 1));
- }
- }
-
- let a = A { influx: InfluxWriter::default() };
-
- a.f();
- }
-
- #[test]
- fn it_clones_an_influx_writer_to_check_both_drop() {
- let influx = InfluxWriter::default();
- measure!(influx, drop_test, i(a, 1), i(b, 2));
- {
- let influx = influx.clone();
- thread::spawn(move || {
- measure!(influx, drop_test, i(a, 3), i(b, 4));
- });
- }
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn influx_writer_send_basic(b: &mut Bencher) {
- let m = InfluxWriter::new("localhost", "test");
- b.iter(|| {
- measure!(m, test, t(color; "red"), i(n, 1)); //, float[p; 1.234]);
- });
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn influx_writer_send_price(b: &mut Bencher) {
- let m = InfluxWriter::new("localhost", "test");
- b.iter(|| {
- measure!(m, test,
- t(ticker, "xmr_btc"),
- t(exchange, "plnx"),
- d(bid, d128::zero()),
- d(ask, d128::zero()),
- );
- });
- }
-
- #[test]
- fn it_checks_color_tag_error_in_non_doctest() {
- let (tx, rx) = bounded(1024);
- measure!(tx, test, t(color,"red"), i(n,1));
- let meas: OwnedMeasurement = rx.recv().unwrap();
- assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
- }
-
- #[test]
- fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
- let meas = measure!(@make_meas test_measurement,
- t(one, "a"), t(two, "b"), i(three, 2),
- f(four, 1.2345), s(five, String::from("d")),
- b(six, true), i(seven, 1 + 2),
- tm(1)
- );
- assert_eq!(meas.key, "test_measurement");
- assert_eq!(meas.get_tag("one"), Some("a"));
- assert_eq!(meas.get_tag("two"), Some("b"));
- assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
- assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
- assert_eq!(meas.timestamp, Some(1));
- }
-
- #[test]
- fn it_uses_measure_macro_for_d128_and_uuid() {
-
- let (tx, rx) = bounded(1024);
- let one = "a";
- let two = d128::zero();
- let three = Uuid::new_v4();
- let time = now();
- measure!(tx, test_measurement, t(one), d(two), u(three), tm(time));
-
- thread::sleep(Duration::from_millis(10));
- let meas: OwnedMeasurement = rx.try_recv().unwrap();
- assert_eq!(meas.key, "test_measurement");
- assert_eq!(meas.get_tag("one"), Some("a"));
- assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
- assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(three)));
- assert_eq!(meas.timestamp, Some(time));
- }
-
- #[test]
- fn it_uses_the_measure_macro_alt_syntax() {
-
- let (tx, rx) = bounded(1024);
- measure!(tx, test_measurement,
- t(one, "a"), t(two, "b"), i(three, 2),
- f(four, 1.2345), s(five, String::from("d")),
- b(six, true), i(seven, 1 + 2),
- tm(1)
- );
-
- thread::sleep(Duration::from_millis(10));
- let meas: OwnedMeasurement = rx.try_recv().unwrap();
- assert_eq!(meas.key, "test_measurement");
- assert_eq!(meas.get_tag("one"), Some("a"));
- assert_eq!(meas.get_tag("two"), Some("b"));
- assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
- assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
- assert_eq!(meas.timestamp, Some(1));
- }
-
- #[test]
- fn it_checks_that_fields_are_separated_correctly() {
- let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
- assert_eq!(m.key, "test");
- assert_eq!(m.get_tag("a"), Some("one"));
- assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
-
- let mut buf = String::new();
- serialize_owned(&m, &mut buf);
- assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
- }
-
- #[test]
- fn try_to_break_measure_macro() {
- let (tx, _) = bounded(1024);
- measure!(tx, one, t(x,"y"), i(n,1));
- measure!(tx, one, t(x,"y"), i(n,1),);
-
- struct A {
- pub one: i32,
- pub two: i32,
- }
-
- struct B {
- pub a: A
- }
-
- let b = B { a: A { one: 1, two: 2 } };
-
- let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
-
- assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn measure_macro_small(b: &mut Bencher) {
- let (tx, rx) = bounded(1024);
- let listener = thread::spawn(move || {
- loop { if rx.recv().is_err() { break } }
- });
- b.iter(|| {
- measure!(tx, test, t(color, "red"), i(n, 1), tm(now()));
- });
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn measure_macro_medium(b: &mut Bencher) {
- let (tx, rx) = bounded(1024);
- let listener = thread::spawn(move || {
- loop { if rx.recv().is_err() { break } }
- });
- b.iter(|| {
- measure!(tx, test, t(color, "red"), t(mood, "playful"),
- t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322),
- i(n, 1), tm(now()));
- });
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn serialize_owned_longer(b: &mut Bencher) {
- let mut buf = String::with_capacity(1024);
- let m =
- OwnedMeasurement::new("test")
- .add_tag("one", "a")
- .add_tag("two", "b")
- .add_tag("ticker", "xmr_btc")
- .add_tag("exchange", "plnx")
- .add_tag("side", "bid")
- .add_field("three", OwnedValue::Float(1.2345))
- .add_field("four", OwnedValue::Integer(57))
- .add_field("five", OwnedValue::Boolean(true))
- .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
- .set_timestamp(now());
- b.iter(|| {
- serialize_owned(&m, &mut buf);
- buf.clear()
- });
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn serialize_owned_simple(b: &mut Bencher) {
- let mut buf = String::with_capacity(1024);
- let m =
- OwnedMeasurement::new("test")
- .add_tag("one", "a")
- .add_tag("two", "b")
- .add_field("three", OwnedValue::Float(1.2345))
- .add_field("four", OwnedValue::Integer(57))
- .set_timestamp(now());
- b.iter(|| {
- serialize_owned(&m, &mut buf);
- buf.clear()
- });
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn clone_url_for_thread(b: &mut Bencher) {
- let host = "ahmes";
- let db = "mlp";
- let url =
- Url::parse_with_params(&format!("http://{}:8086/write", host),
- &[("db", db), ("precision", "ns")]).unwrap();
- b.iter(|| {
- url.clone()
- })
- }
-
- #[cfg(feature = "unstable")]
- #[bench]
- fn clone_arc_url_for_thread(b: &mut Bencher) {
- let host = "ahmes";
- let db = "mlp";
- let url =
- Url::parse_with_params(&format!("http://{}:8086/write", host),
- &[("db", db), ("precision", "ns")]).unwrap();
- let url = Arc::new(url);
- b.iter(|| {
- Arc::clone(&url)
- })
- }
-
- #[test]
- fn it_serializes_a_hard_to_serialize_message_from_owned() {
- let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
- let mut buf = String::new();
- let mut server_resp = String::new();
- let m = OwnedMeasurement::new("rust_test")
- .add_field("s", OwnedValue::String(raw.to_string()))
- .set_timestamp(now());
- serialize_owned(&m, &mut buf);
- println!("{}", buf);
- buf.push_str("\n");
- let buf_copy = buf.clone();
- buf.push_str(&buf_copy);
- println!("{}", buf);
-
- let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
- let client = Client::new();
- let req = InfluxWriter::http_req(&client, url.clone(), &buf, &None);
- match req.send() {
-
- Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
-
- Ok(mut resp) => {
- resp.read_to_string(&mut server_resp).unwrap();
- panic!("{}", server_resp);
- }
-
- Err(why) => {
- panic!("{}", why)
- }
- }
- }
-
- #[cfg(feature = "auth-tests")]
- #[test]
- fn it_sends_authenticated_measurements() {
- let creds = InfluxWriter::get_credentials("auth_test_user".into(), Some("hot dog".into()));
- let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
- //let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
- //let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
- //let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
- //let root = slog::Logger::root(drain, o!("version" => "0.1"));
- //let influx = InfluxWriter::with_logger_and_opt_creds("localhost", "auth_test", Some(creds), &root);
- measure!(influx, auth_test_meas, i(n, 1));
- drop(influx);
- }
-
- #[test]
- fn it_skips_nan_values() {
- assert!(SKIP_NAN_VALUES, "otherwise this test is worthless");
- let m = OwnedMeasurement::new("rust_test")
- .add_field("hello", OwnedValue::Integer(1234))
- .add_field("finite_float", OwnedValue::Float(1.234))
- .add_field("nan_float", OwnedValue::Float(f64::NAN))
- .add_field("inf_float", OwnedValue::Float(f64::INFINITY))
- .add_field("neg_inf_float", OwnedValue::Float(f64::NEG_INFINITY))
- .add_field("finite_d128", OwnedValue::D128(d128::from_str("3.456").unwrap()))
- .add_field("nan_d128", OwnedValue::D128(d128::from_str("NaN").unwrap()))
- .set_timestamp(now());
- let mut buf = String::new();
- serialize_owned(&m, &mut buf);
- dbg!(&buf);
- assert!(buf.contains("hello=1234"));
- assert!(buf.contains("finite_float=1.234"));
- assert!( ! buf.contains("nan_float="));
- assert!( ! buf.contains("inf_float="));
- assert!( ! buf.contains("neg_inf_float="));
- assert!(buf.contains("finite_d128=3.456"));
- assert!( ! buf.contains("nan_d128="));
- }
-
- #[test]
- fn it_supplies_a_field_if_every_field_is_skipped_because_nan() {
- assert!(SKIP_NAN_VALUES, "otherwise this test is worthless");
- let m = OwnedMeasurement::new("rust_test")
- .add_field("nan_float", OwnedValue::Float(f64::NAN));
- let mut buf = String::new();
- serialize_owned(&m, &mut buf);
- dbg!(&buf);
- assert!(buf.contains("n=1i"));
- }
- }
|