You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1681 lines
66KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. #![cfg_attr(feature = "unstable", feature(test))]
  4. #[cfg(all(feature = "unstable", test))]
  5. extern crate test;
  6. #[macro_use]
  7. extern crate slog;
  8. use std::io::Read;
  9. use std::sync::Arc;
  10. use std::{thread, mem};
  11. use std::time::*;
  12. use std::collections::VecDeque;
  13. use std::convert::TryInto;
  14. use crossbeam_channel::{Sender, Receiver, bounded, SendError};
  15. use hyper::status::StatusCode;
  16. use hyper::client::response::Response;
  17. use hyper::Url;
  18. use hyper::client::Client;
  19. use slog::Drain;
  20. use chrono::prelude::*;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use smallvec::SmallVec;
  24. use slog::Logger;
  25. use pretty_toa::ThousandsSep;
  26. /// whether non-finite `f64` and `d128` values should be skipped
  27. /// during serialization to influxdb line format. influx does not
  28. /// handle `NaN` values at all. the other option is a marker value,
  29. /// previously `-999.0` had been used.
  30. pub const SKIP_NAN_VALUES: bool = true;
  31. pub const DROP_DEADLINE: Duration = Duration::from_secs(30);
  32. pub type Credentials = hyper::header::Authorization<hyper::header::Basic>;
  33. /// Created this so I know what types can be passed through the
  34. /// `measure!` macro, which used to convert with `as i64` and
  35. /// `as f64` until I accidentally passed a function name, and it
  36. /// still compiled, but with garbage numbers.
  37. pub trait AsI64 {
  38. fn as_i64(x: Self) -> i64;
  39. }
  40. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  41. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  42. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  43. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
  44. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
  45. impl AsI64 for i128 { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
  46. impl AsI64 for u128 { fn as_i64(x: Self) -> i64 { x.try_into().unwrap_or(-999) } }
  47. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  48. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  49. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  50. impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  51. impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  52. impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  53. /// Created this so I know what types can be passed through the
  54. /// `measure!` macro, which used to convert with `as i64` and
  55. /// `as f64` until I accidentally passed a function name, and it
  56. /// still compiled, but with garbage numbers.
  57. pub trait AsF64 {
  58. fn as_f64(x: Self) -> f64;
  59. }
  60. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  61. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  62. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  63. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  64. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  65. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  66. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  67. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  68. ///
  69. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  70. /// values, as well as sends it with the `Sender`.
  71. ///
  72. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  73. /// measurement (see `tests` mod).
  74. ///
  75. /// # Examples
  76. ///
  77. /// ```
  78. /// #[macro_use]
  79. /// extern crate influx_writer;
  80. ///
  81. /// use influx_writer::{OwnedValue, OwnedMeasurement, AsI64};
  82. ///
  83. /// use decimal::d128;
  84. ///
  85. /// fn main() {
  86. /// let (tx, rx) = crossbeam_channel::bounded(1024);
  87. ///
  88. /// // "shorthand" syntax
  89. ///
  90. /// measure!(tx, test, t(color, "red"), i(n, 1));
  91. ///
  92. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  93. ///
  94. /// assert_eq!(meas.key, "test");
  95. /// assert_eq!(meas.get_tag("color"), Some("red"));
  96. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  97. ///
  98. /// measure!(tx, test,
  99. /// t(one, "a"), t(two, "b"), i(three, 2),
  100. /// f(four, 1.2345), s(five, String::from("d")),
  101. /// b(six, true), i(seven, 1 + 2),
  102. /// tm(1)
  103. /// );
  104. ///
  105. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  106. ///
  107. /// assert_eq!(meas.key, "test");
  108. /// assert_eq!(meas.get_tag("one"), Some("a"));
  109. /// assert_eq!(meas.get_tag("two"), Some("b"));
  110. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  111. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  112. /// assert_eq!(meas.timestamp, Some(1));
  113. ///
  114. /// // use the @make_meas flag to skip sending a measurement, instead merely
  115. /// // creating it.
  116. ///
  117. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, t(color, "red"), i(n, 1));
  118. ///
  119. /// // each variant also has shorthand aliases
  120. ///
  121. /// let meas: OwnedMeasurement = measure!(@make_meas abcd, t(color, "red"), i(n, 1), d(price, d128::zero()));
  122. /// }
  123. /// ```
  124. ///
  125. #[macro_export]
  126. macro_rules! measure {
  127. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  128. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  129. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  130. //(@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  131. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  132. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  133. (@kv v, $meas:ident, $k:expr) => { measure!(@ea t, $meas, "version", $k) };
  134. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  135. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  136. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Integer(AsI64::as_i64($v))) };
  137. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Float(AsF64::as_f64($v))) };
  138. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::String($v)) };
  139. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128($v)) };
  140. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Uuid($v)) };
  141. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Boolean(bool::from($v))) };
  142. (@ea D, $meas:ident, $k:expr, $v:expr) => {
  143. match $v {
  144. Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128(v)) }
  145. None => {}
  146. }
  147. };
  148. (@ea I, $meas:ident, $k:expr, $v:expr) => {
  149. if $v.is_some() {
  150. $meas = $meas.add_field($k, $crate::OwnedValue::Integer(AsI64::as_i64($v.unwrap())));
  151. }
  152. };
  153. (@ea F, $meas:ident, $k:expr, $v:expr) => {
  154. if $v.is_some() {
  155. $meas = $meas.add_field($k, $crate::OwnedValue::Float(AsF64::as_f64($v.unwrap())));
  156. }
  157. };
  158. // (@ea T, $meas:ident, $k:expr, $v:expr) => {
  159. // let maybe_v: Option<
  160. // match $v {
  161. // Some(v) => { $meas = $meas.add_tag($k, v) }
  162. // None => {}
  163. // }
  164. // };
  165. (@ea U, $meas:ident, $k:expr, $v:expr) => {
  166. match $v {
  167. Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::Uuid(v)) }
  168. None => {}
  169. }
  170. };
  171. (@ea B, $meas:ident, $k:expr, $v:expr) => {
  172. match $v {
  173. Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::Boolean(v)) }
  174. None => {}
  175. }
  176. };
  177. (@ea S, $meas:ident, $k:expr, $v:expr) => {
  178. match $v {
  179. Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::String(v)) }
  180. None => {}
  181. }
  182. };
  183. (@as_expr $e:expr) => {$e};
  184. (@count_tags) => {0usize};
  185. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  186. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  187. (@count_fields) => {0usize};
  188. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  189. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  190. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  191. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  192. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  193. };
  194. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  195. let n_tags = measure!(@count_tags $($t)*);
  196. let n_fields = measure!(@count_fields $($t)*);
  197. let mut meas =
  198. $crate::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  199. $(
  200. measure!(@kv $t, meas, $($tail)*);
  201. )*
  202. meas
  203. }};
  204. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  205. measure!($m, $name, $($t [ $($tail)* ] ),+)
  206. };
  207. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  208. #[allow(unused_imports)]
  209. use $crate::{AsI64, AsF64};
  210. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  211. let _ = $m.send(measurement);
  212. }};
  213. }
  214. /// converts a chrono::DateTime to an integer timestamp (ns)
  215. ///
  216. #[inline]
  217. pub fn nanos(t: DateTime<Utc>) -> u64 {
  218. (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64)
  219. }
  220. #[inline]
  221. pub fn secs(d: Duration) -> f64 {
  222. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  223. }
  224. #[inline]
  225. pub fn inanos(t: DateTime<Utc>) -> i64 {
  226. t.timestamp() * 1_000_000_000i64 + t.timestamp_subsec_nanos() as i64
  227. }
  228. //#[deprecated(since="0.4.3", note="Use `nanos(DateTime<Utc>) -> u64` instead")]
  229. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  230. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  231. }
  232. #[inline]
  233. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  234. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  235. }
  236. #[inline]
  237. pub fn nanos_utc(t: i64) -> DateTime<Utc> {
  238. Utc.timestamp(t / 1_000_000_000, (t % 1_000_000_000) as u32)
  239. }
  240. #[derive(Clone, Debug)]
  241. struct Point<T, V> {
  242. pub time: T,
  243. pub value: V
  244. }
  245. struct DurationWindow {
  246. pub size: Duration,
  247. pub mean: Duration,
  248. pub sum: Duration,
  249. pub count: u32,
  250. pub items: VecDeque<Point<Instant, Duration>>
  251. }
  252. #[allow(dead_code)]
  253. impl DurationWindow {
  254. #[inline]
  255. pub fn update(&mut self, time: Instant, value: Duration) {
  256. self.add(time, value);
  257. self.refresh(time);
  258. }
  259. #[inline]
  260. pub fn refresh(&mut self, t: Instant) -> &Self {
  261. if !self.items.is_empty() {
  262. let (n_remove, sum, count) =
  263. self.items.iter()
  264. .take_while(|x| t - x.time > self.size)
  265. .fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
  266. (n_remove + 1, sum - x.value, count - 1)
  267. });
  268. self.sum = sum;
  269. self.count = count;
  270. for _ in 0..n_remove {
  271. self.items.pop_front();
  272. }
  273. }
  274. if self.count > 0 {
  275. self.mean = self.sum / self.count.into();
  276. }
  277. self
  278. }
  279. #[inline]
  280. pub fn add(&mut self, time: Instant, value: Duration) {
  281. let p = Point { time, value };
  282. self.sum += p.value;
  283. self.count += 1;
  284. self.items.push_back(p);
  285. }
  286. }
  287. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  288. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  289. /// measurements have accumulated.
  290. ///
  291. #[derive(Debug)]
  292. pub struct InfluxWriter {
  293. host: String,
  294. db: String,
  295. tx: Sender<Option<OwnedMeasurement>>,
  296. thread: Option<Arc<thread::JoinHandle<()>>>,
  297. }
  298. impl Default for InfluxWriter {
  299. fn default() -> Self {
  300. InfluxWriter::new("localhost", "test")
  301. }
  302. }
  303. impl Clone for InfluxWriter {
  304. fn clone(&self) -> Self {
  305. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  306. InfluxWriter {
  307. host: self.host.to_string(),
  308. db: self.db.to_string(),
  309. tx: self.tx.clone(),
  310. thread,
  311. }
  312. }
  313. }
  314. impl InfluxWriter {
  315. pub fn host(&self) -> &str { self.host.as_str() }
  316. pub fn db(&self) -> &str { self.db.as_str() }
  317. /// Sends the `OwnedMeasurement` to the serialization thread.
  318. ///
  319. #[inline]
  320. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  321. //if self.thread.is_none() {
  322. // let _ = self.tx.try_send(Some(m));
  323. // Ok(())
  324. //} else {
  325. self.tx.send(Some(m))
  326. //}
  327. }
  328. #[inline]
  329. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  330. #[inline]
  331. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  332. #[inline]
  333. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  334. #[inline]
  335. pub fn rsecs(&self, d: Duration) -> f64 {
  336. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  337. * 1000.0)
  338. .round()
  339. / 1000.0
  340. }
  341. #[inline]
  342. pub fn secs(&self, d: Duration) -> f64 {
  343. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  344. }
  345. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  346. self.tx.clone()
  347. }
  348. #[inline]
  349. pub fn is_full(&self) -> bool { self.tx.is_full() }
  350. /// provides a shell interface that immediately drops measurements sent to it
  351. pub fn placeholder() -> Self {
  352. let (tx, _) = bounded(1);
  353. Self {
  354. host: String::new(),
  355. db: String::new(),
  356. tx,
  357. thread: None,
  358. }
  359. }
  360. pub fn is_placeholder(&self) -> bool {
  361. self.thread.is_none() && self.host == ""
  362. }
  363. pub fn new(host: &str, db: &str) -> Self {
  364. let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
  365. Self::with_logger_and_opt_creds(host, db, None, &noop_logger)
  366. }
  367. pub fn get_credentials(username: String, password: Option<String>) -> Credentials {
  368. hyper::header::Authorization(
  369. hyper::header::Basic { username, password }
  370. )
  371. }
  372. fn http_req<'a>(client: &'a Client, url: Url, body: &'a str, creds: &Option<Credentials>) -> hyper::client::RequestBuilder<'a> {
  373. let req = client.post(url.clone())
  374. .body(body);
  375. if let Some(auth) = creds {
  376. req.header(auth.clone())
  377. } else {
  378. req
  379. }
  380. }
  381. #[allow(unused_assignments)]
  382. pub fn with_logger(host: &str, db: &str, logger: &Logger) -> Self {
  383. Self::with_logger_and_opt_creds(host, db, None, logger)
  384. }
  385. pub fn with_logger_and_opt_creds(host: &str, db: &str, creds: Option<Credentials>, logger: &Logger) -> Self {
  386. let logger = logger.new(o!(
  387. "host" => host.to_string(),
  388. "db" => db.to_string()));
  389. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(4096);
  390. let url =
  391. Url::parse_with_params(&format!("http://{}:8086/write", host),
  392. &[("db", db), ("precision", "ns")])
  393. .expect("influx writer url should parse");
  394. let thread = thread::Builder::new().name(format!("inflx:{}", db)).spawn(move || {
  395. use std::time::*;
  396. use crossbeam_channel as chan;
  397. #[cfg(feature = "no-influx-buffer")]
  398. const N_BUFFER_LINES: usize = 0;
  399. const N_BUFFER_LINES: usize = 1024;
  400. const MAX_PENDING: Duration = Duration::from_secs(3);
  401. const INITIAL_BUFFER_CAPACITY: usize = 4096;
  402. const MAX_BACKLOG: usize = 1024;
  403. const MAX_OUTSTANDING_HTTP: usize = 64;
  404. const DEBUG_HB_EVERY: usize = 1024 * 96;
  405. const INFO_HB_EVERY: usize = 1024 * 1024;
  406. const N_HTTP_ATTEMPTS: u32 = 15;
  407. const INITIAL_BACKLOG: usize = MAX_OUTSTANDING_HTTP * 2;
  408. let client = Arc::new(Client::new());
  409. let creds = Arc::new(creds);
  410. info!(logger, "initializing InfluxWriter ...";
  411. "N_BUFFER_LINES" => N_BUFFER_LINES,
  412. "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING),
  413. "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP,
  414. "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
  415. "INITIAL_BACKLOG" => INITIAL_BACKLOG,
  416. "MAX_BACKLOG" => MAX_BACKLOG,
  417. );
  418. // pre-allocated buffers ready for use if the active one is stasheed
  419. // during an outage
  420. let mut spares: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
  421. // queue failed sends here until problem resolved, then send again. in worst
  422. // case scenario, loop back around on buffers queued in `backlog`, writing
  423. // over the oldest first.
  424. //
  425. let mut backlog: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
  426. for _ in 0..INITIAL_BACKLOG {
  427. spares.push_back(String::with_capacity(INITIAL_BUFFER_CAPACITY));
  428. }
  429. struct Resp {
  430. pub buf: String,
  431. pub took: Duration,
  432. }
  433. let mut db_health = DurationWindow {
  434. size: Duration::from_secs(120),
  435. mean: Duration::new(10, 0),
  436. sum: Duration::new(0, 0),
  437. count: 0,
  438. items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP),
  439. };
  440. let (http_tx, http_rx) = chan::bounded(32);
  441. let mut buf = spares.pop_front().unwrap();
  442. let mut count = 0;
  443. let mut extras = 0; // any new Strings we intro to the system
  444. let mut n_rcvd = 0;
  445. let mut n_pts: u64 = 0;
  446. let mut in_flight_buffer_bytes = 0;
  447. let mut last = Instant::now();
  448. let mut active: bool;
  449. let mut last_clear = Instant::now();
  450. let mut last_memory_check = Instant::now();
  451. let mut loop_time: Instant;
  452. let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
  453. INITIAL_BACKLOG + extras - s.len() - b.len() - 1
  454. };
  455. assert_eq!(n_out(&spares, &backlog, extras), 0);
  456. let count_allocated_memory = |spares: &VecDeque<String>, backlog: &VecDeque<String>, in_flight_buffer_bytes: &usize| -> usize {
  457. spares.iter().map(|x| x.capacity()).sum::<usize>()
  458. + backlog.iter().map(|x| x.capacity()).sum::<usize>()
  459. + (*in_flight_buffer_bytes)
  460. };
  461. let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize, in_flight_buffer_bytes: &mut usize| {
  462. if n_outstanding >= MAX_OUTSTANDING_HTTP {
  463. backlog.push_back(buf);
  464. return
  465. }
  466. let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
  467. let tx = http_tx.clone();
  468. let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "in flight req at spawn time" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
  469. let client = Arc::clone(&client);
  470. let creds = Arc::clone(&creds);
  471. *in_flight_buffer_bytes = *in_flight_buffer_bytes + buf.capacity();
  472. debug!(logger, "launching http thread");
  473. let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
  474. let logger = thread_logger;
  475. debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len());
  476. let start = Instant::now();
  477. for n_req in 0..N_HTTP_ATTEMPTS {
  478. let throttle = Duration::from_secs(2) * n_req * n_req;
  479. if n_req > 0 {
  480. warn!(logger, "InfluxWriter http thread: pausing before next request";
  481. "n_req" => n_req,
  482. "throttle" => %format_args!("{:?}", throttle),
  483. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  484. thread::sleep(throttle); // 0, 2, 8, 16, 32
  485. }
  486. let sent = Instant::now();
  487. let req = Self::http_req(&client, url.clone(), buf.as_str(), &creds);
  488. let resp = req.send();
  489. let rcvd = Instant::now();
  490. let took = rcvd - sent;
  491. let mut n_tx = 0u32;
  492. match resp {
  493. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  494. debug!(logger, "server responded ok: 204 NoContent");
  495. buf.clear();
  496. let mut resp = Some(Ok(Resp { buf, took }));
  497. loop {
  498. n_tx += 1;
  499. match tx.try_send(resp.take().unwrap()) {
  500. Ok(_) => {
  501. if n_req > 0 {
  502. info!(logger, "successfully recovered from failed request with retry";
  503. "n_req" => n_req,
  504. "n_tx" => n_tx,
  505. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  506. }
  507. return
  508. }
  509. Err(chan::TrySendError::Full(r)) => {
  510. let throttle = Duration::from_millis(1000) * n_tx;
  511. warn!(logger, "channel full: InfluxWriter http thread failed to return buf";
  512. "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle));
  513. resp = Some(r);
  514. thread::sleep(throttle);
  515. }
  516. Err(chan::TrySendError::Disconnected(_)) => {
  517. warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return";
  518. "n_tx" => n_tx, "n_req" => n_req);
  519. return
  520. }
  521. }
  522. }
  523. }
  524. Ok(mut resp) => {
  525. let mut server_resp = String::new();
  526. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  527. error!(logger, "influx server error (request took {:?})", took;
  528. "status" => %resp.status,
  529. "body" => server_resp);
  530. }
  531. Err(e) => {
  532. error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e);
  533. }
  534. }
  535. }
  536. let took = Instant::now() - start;
  537. warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
  538. "took" => %format_args!("{:?}", took));
  539. let buflen = buf.len();
  540. let n_lines = buf.lines().count();
  541. if let Err(e) = tx.send(Err(Resp { buf, took })) {
  542. crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e;
  543. "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines);
  544. }
  545. });
  546. if let Err(e) = thread_res {
  547. crit!(logger, "failed to spawn thread: {}", e);
  548. }
  549. };
  550. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
  551. match prev {
  552. 0 if N_BUFFER_LINES > 0 => {
  553. serialize_owned(m, buf);
  554. Ok(1)
  555. }
  556. n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => {
  557. buf.push_str("\n");
  558. serialize_owned(m, buf);
  559. Ok(n + 1)
  560. }
  561. n => {
  562. buf.push_str("\n");
  563. serialize_owned(m, buf);
  564. Err(n + 1)
  565. }
  566. }
  567. };
  568. 'event: loop {
  569. loop_time = Instant::now();
  570. active = false;
  571. if loop_time - last_memory_check > Duration::from_secs(300) {
  572. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  573. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  574. info!(logger, "InfluxWriter: allocated memory: {:.1}MB", allocated_mb;
  575. "allocated bytes" => allocated_bytes,
  576. "in flight buffer bytes" => in_flight_buffer_bytes,
  577. "spares.len()" => spares.len(),
  578. "backlog.len()" => backlog.len(),
  579. );
  580. last_memory_check = loop_time;
  581. }
  582. match rx.recv() {
  583. Ok(Some(mut meas)) => {
  584. n_rcvd += 1;
  585. n_pts += meas.fields.len() as u64;
  586. active = true;
  587. if n_rcvd % INFO_HB_EVERY == 0 {
  588. let n_outstanding = n_out(&spares, &backlog, extras);
  589. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  590. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  591. info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep();
  592. "total pts written" => n_pts.thousands_sep(),
  593. "n_outstanding" => n_outstanding,
  594. "spares.len()" => spares.len(),
  595. "n_rcvd" => n_rcvd,
  596. "n_active_buf" => count,
  597. "db_health" => %format_args!("{:?}", db_health.mean),
  598. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  599. "backlog.len()" => backlog.len());
  600. } else if n_rcvd % DEBUG_HB_EVERY == 0 {
  601. let n_outstanding = n_out(&spares, &backlog, extras);
  602. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  603. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  604. debug!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep();
  605. "n_outstanding" => n_outstanding,
  606. "spares.len()" => spares.len(),
  607. "n_rcvd" => n_rcvd,
  608. "n_active_buf" => count,
  609. "db_health" => %format_args!("{:?}", db_health.mean),
  610. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  611. "backlog.len()" => backlog.len());
  612. }
  613. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  614. if meas.fields.is_empty() {
  615. meas.fields.push(("n", OwnedValue::Integer(1)));
  616. }
  617. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  618. count = match next(count, &meas, &mut buf, loop_time, last) {
  619. Ok(n) => n,
  620. Err(_n) => {
  621. let mut count = 0;
  622. let mut next: String = match spares.pop_front() {
  623. Some(x) => x,
  624. None => {
  625. let n_outstanding = n_out(&spares, &backlog, extras);
  626. if n_outstanding > MAX_BACKLOG {
  627. warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog";
  628. "total pts written" => n_pts.thousands_sep(),
  629. "n_outstanding" => n_outstanding,
  630. "spares.len()" => spares.len(),
  631. "n_rcvd" => n_rcvd,
  632. "backlog.len()" => backlog.len());
  633. match backlog.pop_front() {
  634. // Note: this does not clear the backlog buffer,
  635. // instead we will just write more and more until
  636. // we are out of memory. I expect that will never
  637. // happen.
  638. //
  639. Some(x) => {
  640. count = 1; // otherwise, no '\n' added in `next(..)` - we are
  641. // sending a "full" buffer to be extended
  642. x
  643. }
  644. None => {
  645. extras += 1;
  646. crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String";
  647. "total pts written" => n_pts.thousands_sep(),
  648. "n_outstanding" => n_outstanding,
  649. "spares.len()" => spares.len(),
  650. "backlog.len()" => backlog.len(),
  651. "n_rcvd" => n_rcvd,
  652. "extras" => extras);
  653. String::new()
  654. }
  655. }
  656. } else {
  657. extras += 1;
  658. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY;
  659. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  660. info!(logger, "InfluxWriter: allocating new buffer: zero spares avail";
  661. "total pts written" => n_pts.thousands_sep(),
  662. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  663. "n_outstanding" => n_outstanding,
  664. "extras" => extras,
  665. );
  666. String::with_capacity(INITIAL_BUFFER_CAPACITY)
  667. }
  668. }
  669. };
  670. // after swap, buf in next, so want to send next
  671. //
  672. mem::swap(&mut buf, &mut next);
  673. let n_outstanding = n_out(&spares, &backlog, extras);
  674. send(next, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  675. last = loop_time;
  676. count
  677. }
  678. };
  679. }
  680. Ok(None) => {
  681. let start = Instant::now();
  682. let mut hb = Instant::now();
  683. warn!(logger, "terminate signal rcvd"; "count" => count);
  684. if buf.len() > 0 {
  685. info!(logger, "InfluxWriter: sending remaining buffer to influx on terminate"; "count" => count);
  686. let meas = OwnedMeasurement::new("influx_writer").add_field("n", OwnedValue::Integer(1));
  687. let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last);
  688. let n_outstanding = n_out(&spares, &backlog, extras);
  689. let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
  690. mem::swap(&mut buf, &mut placeholder);
  691. send(placeholder, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  692. }
  693. let mut n_ok = 0;
  694. let mut n_err = 0;
  695. loop {
  696. loop_time = Instant::now();
  697. let n_outstanding = n_out(&spares, &backlog, extras);
  698. if backlog.is_empty() && n_outstanding < 1 {
  699. info!(logger, "InfluxWriter: cleared any remaining backlog";
  700. "total pts written" => n_pts.thousands_sep(),
  701. "n_outstanding" => n_outstanding,
  702. "spares.len()" => spares.len(),
  703. "backlog.len()" => backlog.len(),
  704. "n_cleared_ok" => n_ok,
  705. "n_cleared_err" => n_err,
  706. "n_rcvd" => n_rcvd,
  707. "extras" => extras,
  708. "elapsed" => %format_args!("{:?}", loop_time - start));
  709. break 'event
  710. }
  711. if loop_time.saturating_duration_since(start) > DROP_DEADLINE {
  712. crit!(logger, "drop deadline exceeded! commencing dirty exit :( ";
  713. "total pts written" => n_pts.thousands_sep(),
  714. "elapsed" => ?(loop_time.saturating_duration_since(start)),
  715. "n outstanding" => n_outstanding,
  716. "backlog.len()" => backlog.len(),
  717. );
  718. break 'event
  719. }
  720. if loop_time - hb > Duration::from_secs(5) {
  721. info!(logger, "InfluxWriter still clearing backlog ..";
  722. "total pts written" => n_pts.thousands_sep(),
  723. "n_outstanding" => n_outstanding,
  724. "spares.len()" => spares.len(),
  725. "backlog.len()" => backlog.len(),
  726. "n_cleared_ok" => n_ok,
  727. "n_cleared_err" => n_err,
  728. "extras" => extras,
  729. "n_rcvd" => n_rcvd,
  730. "elapsed" => %format_args!("{:?}", loop_time - start));
  731. hb = loop_time;
  732. }
  733. if let Some(buf) = backlog.pop_front() {
  734. let n_outstanding = n_out(&spares, &backlog, extras);
  735. debug!(logger, "InfluxWriter: resending queued buffer from backlog";
  736. "backlog.len()" => backlog.len(),
  737. "spares.len()" => spares.len(),
  738. "n_rcvd" => n_rcvd,
  739. "n_outstanding" => n_outstanding);
  740. send(buf, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  741. }
  742. 'rx: loop {
  743. match http_rx.try_recv() {
  744. Ok(Ok(Resp { buf, .. })) => {
  745. n_ok += 1;
  746. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  747. if spares.len() <= INITIAL_BACKLOG {
  748. spares.push_back(buf); // needed so `n_outstanding` count remains accurate
  749. } else {
  750. extras = extras.saturating_sub(1);
  751. }
  752. }
  753. Ok(Err(Resp { buf, .. })) => {
  754. warn!(logger, "InfluxWriter: requeueing failed request"; "buf.len()" => buf.len());
  755. n_err += 1;
  756. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  757. backlog.push_front(buf);
  758. }
  759. Err(chan::TryRecvError::Disconnected) => {
  760. crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting";
  761. "total pts written" => n_pts.thousands_sep(),
  762. "n_outstanding" => n_outstanding,
  763. "backlog.len()" => backlog.len(),
  764. "n_cleared_ok" => n_ok,
  765. "n_cleared_err" => n_err,
  766. "extras" => extras,
  767. "n_rcvd" => n_rcvd,
  768. "elapsed" => %format_args!("{:?}", loop_time - start));
  769. break 'event
  770. }
  771. Err(_) => break 'rx
  772. }
  773. }
  774. thread::sleep(Duration::from_millis(1));
  775. }
  776. }
  777. _ => {}
  778. }
  779. db_health.refresh(loop_time);
  780. let n_outstanding = n_out(&spares, &backlog, extras);
  781. let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
  782. if (n_outstanding < MAX_OUTSTANDING_HTTP
  783. || loop_time.saturating_duration_since(last_clear) > Duration::from_secs(60))
  784. && healthy {
  785. if let Some(queued) = backlog.pop_front() {
  786. let n_outstanding = n_out(&spares, &backlog, extras);
  787. send(queued, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  788. active = true;
  789. }
  790. last_clear = loop_time;
  791. }
  792. loop {
  793. match http_rx.try_recv() {
  794. Ok(Ok(Resp { buf, took })) => {
  795. db_health.add(loop_time, took);
  796. let in_flight_before = in_flight_buffer_bytes.clone();
  797. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  798. if spares.len() <= INITIAL_BACKLOG {
  799. spares.push_back(buf);
  800. } else {
  801. extras = extras.saturating_sub(1);
  802. debug!(logger, "InfluxWriter: dropping buffer to reduce memory back to INITIAL_BACKLOG size";
  803. "spares.len()" => spares.len(),
  804. "extras" => extras,
  805. "in flight before" => in_flight_before,
  806. "in in_flight_buffer_bytes" => in_flight_buffer_bytes,
  807. );
  808. }
  809. //spares.push_back(buf);
  810. active = true;
  811. }
  812. Ok(Err(Resp { buf, took })) => {
  813. db_health.add(loop_time, took);
  814. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  815. backlog.push_front(buf);
  816. active = true;
  817. }
  818. Err(chan::TryRecvError::Disconnected) => {
  819. crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting";
  820. "total pts written" => n_pts.thousands_sep(),
  821. "n_outstanding" => n_outstanding,
  822. "backlog.len()" => backlog.len(),
  823. "n_rcvd" => n_rcvd,
  824. "extras" => extras);
  825. break 'event
  826. }
  827. Err(_) => break
  828. }
  829. }
  830. if !active {
  831. thread::sleep(Duration::new(0, 1))
  832. }
  833. }
  834. thread::sleep(Duration::from_millis(10));
  835. }).unwrap();
  836. InfluxWriter {
  837. host: host.to_string(),
  838. db: db.to_string(),
  839. tx,
  840. thread: Some(Arc::new(thread))
  841. }
  842. }
  843. }
  844. impl Drop for InfluxWriter {
  845. fn drop(&mut self) {
  846. if let Some(arc) = self.thread.take() {
  847. if let Ok(thread) = Arc::try_unwrap(arc) {
  848. let _ = self.tx.send(None);
  849. let _ = thread.join();
  850. }
  851. }
  852. }
  853. }
  854. /// This removes offending things rather than escaping them.
  855. ///
  856. fn escape_tag(s: &str) -> String {
  857. s.replace(" ", "")
  858. .replace(",", "")
  859. .replace("\"", "")
  860. }
  861. fn escape(s: &str) -> String {
  862. s.replace(" ", "\\ ")
  863. .replace(",", "\\,")
  864. }
  865. fn as_string(s: &str) -> String {
  866. // the second replace removes double escapes
  867. //
  868. format!("\"{}\"", s.replace("\"", "\\\"")
  869. .replace(r#"\\""#, r#"\""#))
  870. }
  871. #[test]
  872. fn it_checks_as_string_does_not_double_escape() {
  873. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  874. let escaped = as_string(&raw);
  875. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  876. }
  877. fn as_boolean(b: &bool) -> &str {
  878. if *b { "t" } else { "f" }
  879. }
  880. pub fn now() -> i64 {
  881. nanos(Utc::now()) as i64
  882. }
  883. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  884. ///
  885. /// The serialized measurement is appended to the end of the string without
  886. /// any regard for what exited in it previously.
  887. ///
  888. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  889. line.push_str(&escape_tag(measurement.key));
  890. let add_tag = |line: &mut String, key: &str, value: &str| {
  891. line.push_str(",");
  892. line.push_str(&escape_tag(key));
  893. line.push_str("=");
  894. line.push_str(&escape(value));
  895. };
  896. for (key, value) in measurement.tags.iter() {
  897. #[cfg(not(feature = "string-tags"))]
  898. add_tag(line, key, value);
  899. #[cfg(feature = "string-tags")]
  900. add_tag(line, key, value.as_str());
  901. }
  902. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| -> bool {
  903. if SKIP_NAN_VALUES && ! value.is_finite() { return false }
  904. if is_first { line.push_str(" "); } else { line.push_str(","); }
  905. line.push_str(&escape_tag(key));
  906. line.push_str("=");
  907. match *value {
  908. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  909. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  910. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  911. OwnedValue::D128(ref d) => {
  912. if d.is_finite() {
  913. line.push_str(&format!("{}", d));
  914. } else {
  915. line.push_str("-999.0");
  916. }
  917. }
  918. OwnedValue::Float(ref f) => {
  919. if f.is_finite() {
  920. line.push_str(&format!("{}", f));
  921. } else {
  922. line.push_str("-999.0");
  923. }
  924. }
  925. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  926. };
  927. true
  928. };
  929. // use this counter to ensure that at least one field was
  930. // serialized. since NaN values may be skipped, the serialization
  931. // would be wrong if no fields ended up being serialized. instead,
  932. // track it, and if there are none serialized, add a n=1 to make
  933. // the measurement serialize properly
  934. //
  935. // this also controls what value is passed to the `is_first` argument
  936. // of `add_field`
  937. let mut n_fields_serialized = 0;
  938. for kv in measurement.fields.iter() {
  939. if add_field(line, kv.0, &kv.1, n_fields_serialized == 0) {
  940. n_fields_serialized += 1;
  941. }
  942. }
  943. // supply a minimum of one field (n=1)
  944. //
  945. // TODO: could potentially clobber a "n" tag? do we care?
  946. //
  947. if n_fields_serialized == 0 { add_field(line, "n", &OwnedValue::Integer(1), true); }
  948. if let Some(t) = measurement.timestamp {
  949. line.push_str(" ");
  950. line.push_str(&t.to_string());
  951. }
  952. }
  953. #[derive(Debug, Clone, PartialEq)]
  954. pub enum OwnedValue {
  955. String(String),
  956. Float(f64),
  957. Integer(i64),
  958. Boolean(bool),
  959. D128(d128),
  960. Uuid(Uuid),
  961. }
  962. impl OwnedValue {
  963. /// if `self` is a `Float` or `D128` variant, checks
  964. /// whether the contained value is finite
  965. ///
  966. /// # Examples
  967. ///
  968. /// ```
  969. /// use std::str::FromStr;
  970. /// use influx_writer::OwnedValue;
  971. ///
  972. /// let v1 = OwnedValue::Float(f64::NAN);
  973. /// assert!( ! v1.is_finite());
  974. /// let v2 = OwnedValue::Float(1.234f64);
  975. /// assert!(v2.is_finite());
  976. ///
  977. /// let v3 = OwnedValue::D128(decimal::d128::from_str("NaN").unwrap());
  978. /// assert!( ! v3.is_finite());
  979. /// let v4 = OwnedValue::D128(decimal::d128::from_str("42.42").unwrap());
  980. /// assert!(v4.is_finite());
  981. ///
  982. /// // other variants are always "finite"
  983. /// assert!(OwnedValue::String("NaN".into()).is_finite());
  984. /// ```
  985. pub fn is_finite(&self) -> bool {
  986. match self {
  987. OwnedValue::Float(x) => x.is_finite(),
  988. OwnedValue::D128(x) => x.is_finite(),
  989. _ => true,
  990. }
  991. }
  992. }
  993. /// Holds data meant for an influxdb measurement in transit to the
  994. /// writing thread.
  995. ///
  996. #[derive(Clone, Debug)]
  997. pub struct OwnedMeasurement {
  998. pub key: &'static str,
  999. pub timestamp: Option<i64>,
  1000. //pub fields: Map<&'static str, OwnedValue>,
  1001. //pub tags: Map<&'static str, &'static str>,
  1002. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  1003. #[cfg(not(feature = "string-tags"))]
  1004. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  1005. #[cfg(feature = "string-tags")]
  1006. pub tags: SmallVec<[(&'static str, String); 8]>,
  1007. }
  1008. impl OwnedMeasurement {
  1009. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  1010. OwnedMeasurement {
  1011. key,
  1012. timestamp: None,
  1013. tags: SmallVec::with_capacity(n_tags),
  1014. fields: SmallVec::with_capacity(n_fields),
  1015. }
  1016. }
  1017. pub fn new(key: &'static str) -> Self {
  1018. OwnedMeasurement {
  1019. key,
  1020. timestamp: None,
  1021. tags: SmallVec::new(),
  1022. fields: SmallVec::new(),
  1023. }
  1024. }
  1025. /// Unusual consuming `self` signature because primarily used by
  1026. /// the `measure!` macro.
  1027. #[cfg(not(feature = "string-tags"))]
  1028. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  1029. self.tags.push((key, value));
  1030. self
  1031. }
  1032. #[cfg(feature = "string-tags")]
  1033. pub fn add_tag<S: ToString>(mut self, key: &'static str, value: S) -> Self {
  1034. self.tags.push((key, value.to_string()));
  1035. self
  1036. }
  1037. /// Unusual consuming `self` signature because primarily used by
  1038. /// the `measure!` macro.
  1039. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  1040. self.fields.push((key, value));
  1041. self
  1042. }
  1043. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  1044. self.timestamp = Some(timestamp);
  1045. self
  1046. }
  1047. #[cfg(not(feature = "string-tags"))]
  1048. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  1049. match self.tags.iter().position(|kv| kv.0 == key) {
  1050. Some(i) => {
  1051. self.tags.get_mut(i)
  1052. .map(|x| {
  1053. x.0 = value;
  1054. });
  1055. self
  1056. }
  1057. None => {
  1058. self.add_tag(key, value)
  1059. }
  1060. }
  1061. }
  1062. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  1063. self.fields.iter()
  1064. .find(|kv| kv.0 == key)
  1065. .map(|kv| &kv.1)
  1066. }
  1067. #[cfg(feature = "string-tags")]
  1068. pub fn get_tag(&self, key: &'static str) -> Option<&str> {
  1069. self.tags.iter()
  1070. .find(|kv| kv.0 == key)
  1071. .map(|kv| kv.1.as_str())
  1072. }
  1073. #[cfg(not(feature = "string-tags"))]
  1074. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  1075. self.tags.iter()
  1076. .find(|kv| kv.0 == key)
  1077. .map(|kv| kv.1)
  1078. }
  1079. }
  1080. #[allow(unused)]
  1081. #[cfg(test)]
  1082. mod tests {
  1083. use std::str::FromStr;
  1084. use super::*;
  1085. #[cfg(feature = "unstable")]
  1086. use test::{black_box, Bencher};
  1087. #[test]
  1088. fn check_uppercase_shorthands_on_optional_field_and_tag_values() {
  1089. let e: Option<i64> = None;
  1090. let h: Option<f64> = None;
  1091. let meas = measure!(@make_meas test,
  1092. //T(a, Some("one")), T(b, None), t(c, "three"),
  1093. S(a, Some("one".to_string())), S(b, None), s(c, "three".to_string()),
  1094. I(d, Some(4)), I(e), i(f, 6),
  1095. F(g, Some(7.0)), F(h), f(i, 9.0),
  1096. );
  1097. assert_eq!(meas.get_field("a").unwrap(), &OwnedValue::String("one".to_string()));
  1098. assert!(meas.get_field("b").is_none());
  1099. assert_eq!(meas.get_field("c").unwrap(), &OwnedValue::String("three".to_string()));
  1100. assert_eq!(meas.get_field("d").unwrap(), &OwnedValue::Integer(4));
  1101. assert!(meas.get_field("e").is_none());
  1102. assert_eq!(meas.get_field("f").unwrap(), &OwnedValue::Integer(6));
  1103. assert_eq!(meas.get_field("g").unwrap(), &OwnedValue::Float(7.0));
  1104. assert!(meas.get_field("h").is_none());
  1105. assert_eq!(meas.get_field("i").unwrap(), &OwnedValue::Float(9.0));
  1106. }
  1107. #[cfg(feature = "unstable")]
  1108. #[bench]
  1109. fn send_to_disconnected_channel(b: &mut Bencher) {
  1110. let (tx, _): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(1);
  1111. let time = now();
  1112. b.iter(|| {
  1113. const VERSION: &str = "1.0.0";
  1114. let color = "red";
  1115. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  1116. tx.send(Some(m))
  1117. })
  1118. }
  1119. #[cfg(feature = "unstable")]
  1120. #[bench]
  1121. fn try_send_to_disconnected_channel(b: &mut Bencher) {
  1122. let (tx, _): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(1);
  1123. let time = now();
  1124. b.iter(|| {
  1125. const VERSION: &str = "1.0.0";
  1126. let color = "red";
  1127. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  1128. tx.try_send(Some(m))
  1129. })
  1130. }
  1131. #[cfg(feature = "unstable")]
  1132. #[bench]
  1133. fn send_to_disconnected_channel_via_placeholder(b: &mut Bencher) {
  1134. let time = now();
  1135. let influx = InfluxWriter::placeholder();
  1136. b.iter(|| {
  1137. const VERSION: &str = "1.0.0";
  1138. let color = "red";
  1139. measure!(influx, test, i(n, 1), t(color), v(VERSION), tm(time));
  1140. })
  1141. }
  1142. #[cfg(feature = "unstable")]
  1143. #[bench]
  1144. fn send_to_connected_channel_via_measure(b: &mut Bencher) {
  1145. let time = now();
  1146. let influx = InfluxWriter::new("localhost", "test");
  1147. b.iter(|| {
  1148. const VERSION: &str = "1.0.0";
  1149. let color = "red";
  1150. measure!(influx, bench, i(n, 1), t(color), v(VERSION), tm(time));
  1151. })
  1152. }
  1153. #[ignore]
  1154. #[cfg(feature = "unstable")]
  1155. #[bench]
  1156. fn measure_ten(b: &mut Bencher) {
  1157. let influx = InfluxWriter::new("localhost", "test");
  1158. let mut n = 0;
  1159. b.iter(|| {
  1160. for _ in 0..10 {
  1161. let time = influx.nanos(Utc::now());
  1162. n += 1;
  1163. measure!(influx, million, i(n), tm(time));
  1164. }
  1165. });
  1166. }
  1167. #[test]
  1168. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  1169. const VERSION: &str = "0.3.90";
  1170. let tag_value = "one";
  1171. let color = "red";
  1172. let time = Utc::now();
  1173. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  1174. assert_eq!(m.get_tag("color"), Some("red"));
  1175. assert_eq!(m.get_tag("version"), Some(VERSION));
  1176. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  1177. }
  1178. #[test]
  1179. fn it_uses_the_v_for_version_shortcut() {
  1180. const VERSION: &str = "0.3.90";
  1181. let tag_value = "one";
  1182. let color = "red";
  1183. let time = now();
  1184. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  1185. assert_eq!(m.get_tag("color"), Some("red"));
  1186. assert_eq!(m.get_tag("version"), Some(VERSION));
  1187. assert_eq!(m.timestamp, Some(time));
  1188. }
  1189. #[test]
  1190. fn it_uses_the_new_tag_k_only_shortcut() {
  1191. let tag_value = "one";
  1192. let color = "red";
  1193. let time = now();
  1194. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  1195. assert_eq!(m.get_tag("color"), Some("red"));
  1196. assert_eq!(m.get_tag("tag_value"), Some("one"));
  1197. assert_eq!(m.timestamp, Some(time));
  1198. }
  1199. #[test]
  1200. fn it_uses_measure_macro_parenthesis_syntax() {
  1201. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  1202. assert_eq!(m.key, "test");
  1203. assert_eq!(m.get_tag("a"), Some("b"));
  1204. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  1205. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1206. assert_eq!(m.timestamp, Some(1));
  1207. }
  1208. #[test]
  1209. fn it_uses_measure_macro_on_a_self_attribute() {
  1210. struct A {
  1211. pub influx: InfluxWriter,
  1212. }
  1213. impl A {
  1214. fn f(&self) {
  1215. measure!(self.influx, test, t(color, "red"), i(n, 1));
  1216. }
  1217. }
  1218. let a = A { influx: InfluxWriter::default() };
  1219. a.f();
  1220. }
  1221. #[test]
  1222. fn it_clones_an_influx_writer_to_check_both_drop() {
  1223. let influx = InfluxWriter::default();
  1224. measure!(influx, drop_test, i(a, 1), i(b, 2));
  1225. {
  1226. let influx = influx.clone();
  1227. thread::spawn(move || {
  1228. measure!(influx, drop_test, i(a, 3), i(b, 4));
  1229. });
  1230. }
  1231. }
  1232. #[cfg(feature = "unstable")]
  1233. #[bench]
  1234. fn influx_writer_send_basic(b: &mut Bencher) {
  1235. let m = InfluxWriter::new("localhost", "test");
  1236. b.iter(|| {
  1237. measure!(m, test, t(color; "red"), i(n, 1)); //, float[p; 1.234]);
  1238. });
  1239. }
  1240. #[cfg(feature = "unstable")]
  1241. #[bench]
  1242. fn influx_writer_send_price(b: &mut Bencher) {
  1243. let m = InfluxWriter::new("localhost", "test");
  1244. b.iter(|| {
  1245. measure!(m, test,
  1246. t(ticker, "xmr_btc"),
  1247. t(exchange, "plnx"),
  1248. d(bid, d128::zero()),
  1249. d(ask, d128::zero()),
  1250. );
  1251. });
  1252. }
  1253. #[test]
  1254. fn it_checks_color_tag_error_in_non_doctest() {
  1255. let (tx, rx) = bounded(1024);
  1256. measure!(tx, test, t(color,"red"), i(n,1));
  1257. let meas: OwnedMeasurement = rx.recv().unwrap();
  1258. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  1259. }
  1260. #[test]
  1261. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  1262. let meas = measure!(@make_meas test_measurement,
  1263. t(one, "a"), t(two, "b"), i(three, 2),
  1264. f(four, 1.2345), s(five, String::from("d")),
  1265. b(six, true), i(seven, 1 + 2),
  1266. tm(1)
  1267. );
  1268. assert_eq!(meas.key, "test_measurement");
  1269. assert_eq!(meas.get_tag("one"), Some("a"));
  1270. assert_eq!(meas.get_tag("two"), Some("b"));
  1271. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1272. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1273. assert_eq!(meas.timestamp, Some(1));
  1274. }
  1275. #[test]
  1276. fn it_uses_measure_macro_for_d128_and_uuid() {
  1277. let (tx, rx) = bounded(1024);
  1278. let one = "a";
  1279. let two = d128::zero();
  1280. let three = Uuid::new_v4();
  1281. let time = now();
  1282. measure!(tx, test_measurement, t(one), d(two), u(three), tm(time));
  1283. thread::sleep(Duration::from_millis(10));
  1284. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1285. assert_eq!(meas.key, "test_measurement");
  1286. assert_eq!(meas.get_tag("one"), Some("a"));
  1287. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  1288. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(three)));
  1289. assert_eq!(meas.timestamp, Some(time));
  1290. }
  1291. #[test]
  1292. fn it_uses_the_measure_macro_alt_syntax() {
  1293. let (tx, rx) = bounded(1024);
  1294. measure!(tx, test_measurement,
  1295. t(one, "a"), t(two, "b"), i(three, 2),
  1296. f(four, 1.2345), s(five, String::from("d")),
  1297. b(six, true), i(seven, 1 + 2),
  1298. tm(1)
  1299. );
  1300. thread::sleep(Duration::from_millis(10));
  1301. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1302. assert_eq!(meas.key, "test_measurement");
  1303. assert_eq!(meas.get_tag("one"), Some("a"));
  1304. assert_eq!(meas.get_tag("two"), Some("b"));
  1305. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1306. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1307. assert_eq!(meas.timestamp, Some(1));
  1308. }
  1309. #[test]
  1310. fn it_checks_that_fields_are_separated_correctly() {
  1311. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  1312. assert_eq!(m.key, "test");
  1313. assert_eq!(m.get_tag("a"), Some("one"));
  1314. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1315. let mut buf = String::new();
  1316. serialize_owned(&m, &mut buf);
  1317. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  1318. }
  1319. #[test]
  1320. fn try_to_break_measure_macro() {
  1321. let (tx, _) = bounded(1024);
  1322. measure!(tx, one, t(x,"y"), i(n,1));
  1323. measure!(tx, one, t(x,"y"), i(n,1),);
  1324. struct A {
  1325. pub one: i32,
  1326. pub two: i32,
  1327. }
  1328. struct B {
  1329. pub a: A
  1330. }
  1331. let b = B { a: A { one: 1, two: 2 } };
  1332. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  1333. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  1334. }
  1335. #[cfg(feature = "unstable")]
  1336. #[bench]
  1337. fn measure_macro_small(b: &mut Bencher) {
  1338. let (tx, rx) = bounded(1024);
  1339. let listener = thread::spawn(move || {
  1340. loop { if rx.recv().is_err() { break } }
  1341. });
  1342. b.iter(|| {
  1343. measure!(tx, test, t(color, "red"), i(n, 1), tm(now()));
  1344. });
  1345. }
  1346. #[cfg(feature = "unstable")]
  1347. #[bench]
  1348. fn measure_macro_medium(b: &mut Bencher) {
  1349. let (tx, rx) = bounded(1024);
  1350. let listener = thread::spawn(move || {
  1351. loop { if rx.recv().is_err() { break } }
  1352. });
  1353. b.iter(|| {
  1354. measure!(tx, test, t(color, "red"), t(mood, "playful"),
  1355. t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322),
  1356. i(n, 1), tm(now()));
  1357. });
  1358. }
  1359. #[cfg(feature = "unstable")]
  1360. #[bench]
  1361. fn serialize_owned_longer(b: &mut Bencher) {
  1362. let mut buf = String::with_capacity(1024);
  1363. let m =
  1364. OwnedMeasurement::new("test")
  1365. .add_tag("one", "a")
  1366. .add_tag("two", "b")
  1367. .add_tag("ticker", "xmr_btc")
  1368. .add_tag("exchange", "plnx")
  1369. .add_tag("side", "bid")
  1370. .add_field("three", OwnedValue::Float(1.2345))
  1371. .add_field("four", OwnedValue::Integer(57))
  1372. .add_field("five", OwnedValue::Boolean(true))
  1373. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  1374. .set_timestamp(now());
  1375. b.iter(|| {
  1376. serialize_owned(&m, &mut buf);
  1377. buf.clear()
  1378. });
  1379. }
  1380. #[cfg(feature = "unstable")]
  1381. #[bench]
  1382. fn serialize_owned_simple(b: &mut Bencher) {
  1383. let mut buf = String::with_capacity(1024);
  1384. let m =
  1385. OwnedMeasurement::new("test")
  1386. .add_tag("one", "a")
  1387. .add_tag("two", "b")
  1388. .add_field("three", OwnedValue::Float(1.2345))
  1389. .add_field("four", OwnedValue::Integer(57))
  1390. .set_timestamp(now());
  1391. b.iter(|| {
  1392. serialize_owned(&m, &mut buf);
  1393. buf.clear()
  1394. });
  1395. }
  1396. #[cfg(feature = "unstable")]
  1397. #[bench]
  1398. fn clone_url_for_thread(b: &mut Bencher) {
  1399. let host = "ahmes";
  1400. let db = "mlp";
  1401. let url =
  1402. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1403. &[("db", db), ("precision", "ns")]).unwrap();
  1404. b.iter(|| {
  1405. url.clone()
  1406. })
  1407. }
  1408. #[cfg(feature = "unstable")]
  1409. #[bench]
  1410. fn clone_arc_url_for_thread(b: &mut Bencher) {
  1411. let host = "ahmes";
  1412. let db = "mlp";
  1413. let url =
  1414. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1415. &[("db", db), ("precision", "ns")]).unwrap();
  1416. let url = Arc::new(url);
  1417. b.iter(|| {
  1418. Arc::clone(&url)
  1419. })
  1420. }
  1421. #[test]
  1422. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  1423. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  1424. let mut buf = String::new();
  1425. let mut server_resp = String::new();
  1426. let m = OwnedMeasurement::new("rust_test")
  1427. .add_field("s", OwnedValue::String(raw.to_string()))
  1428. .set_timestamp(now());
  1429. serialize_owned(&m, &mut buf);
  1430. println!("{}", buf);
  1431. buf.push_str("\n");
  1432. let buf_copy = buf.clone();
  1433. buf.push_str(&buf_copy);
  1434. println!("{}", buf);
  1435. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1436. let client = Client::new();
  1437. let req = InfluxWriter::http_req(&client, url.clone(), &buf, &None);
  1438. match req.send() {
  1439. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1440. Ok(mut resp) => {
  1441. resp.read_to_string(&mut server_resp).unwrap();
  1442. panic!("{}", server_resp);
  1443. }
  1444. Err(why) => {
  1445. panic!("{}", why)
  1446. }
  1447. }
  1448. }
  1449. #[cfg(feature = "auth-tests")]
  1450. #[test]
  1451. fn it_sends_authenticated_measurements() {
  1452. let creds = InfluxWriter::get_credentials("auth_test_user".into(), Some("hot dog".into()));
  1453. let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
  1454. //let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  1455. //let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  1456. //let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  1457. //let root = slog::Logger::root(drain, o!("version" => "0.1"));
  1458. //let influx = InfluxWriter::with_logger_and_opt_creds("localhost", "auth_test", Some(creds), &root);
  1459. measure!(influx, auth_test_meas, i(n, 1));
  1460. drop(influx);
  1461. }
  1462. #[test]
  1463. fn it_skips_nan_values() {
  1464. assert!(SKIP_NAN_VALUES, "otherwise this test is worthless");
  1465. let m = OwnedMeasurement::new("rust_test")
  1466. .add_field("hello", OwnedValue::Integer(1234))
  1467. .add_field("finite_float", OwnedValue::Float(1.234))
  1468. .add_field("nan_float", OwnedValue::Float(f64::NAN))
  1469. .add_field("inf_float", OwnedValue::Float(f64::INFINITY))
  1470. .add_field("neg_inf_float", OwnedValue::Float(f64::NEG_INFINITY))
  1471. .add_field("finite_d128", OwnedValue::D128(d128::from_str("3.456").unwrap()))
  1472. .add_field("nan_d128", OwnedValue::D128(d128::from_str("NaN").unwrap()))
  1473. .set_timestamp(now());
  1474. let mut buf = String::new();
  1475. serialize_owned(&m, &mut buf);
  1476. dbg!(&buf);
  1477. assert!(buf.contains("hello=1234"));
  1478. assert!(buf.contains("finite_float=1.234"));
  1479. assert!( ! buf.contains("nan_float="));
  1480. assert!( ! buf.contains("inf_float="));
  1481. assert!( ! buf.contains("neg_inf_float="));
  1482. assert!(buf.contains("finite_d128=3.456"));
  1483. assert!( ! buf.contains("nan_d128="));
  1484. }
  1485. #[test]
  1486. fn it_supplies_a_field_if_every_field_is_skipped_because_nan() {
  1487. assert!(SKIP_NAN_VALUES, "otherwise this test is worthless");
  1488. let m = OwnedMeasurement::new("rust_test")
  1489. .add_field("nan_float", OwnedValue::Float(f64::NAN));
  1490. let mut buf = String::new();
  1491. serialize_owned(&m, &mut buf);
  1492. dbg!(&buf);
  1493. assert!(buf.contains("n=1i"));
  1494. }
  1495. }