You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1538 lines
61KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. #![feature(test)]
  4. #[cfg(test)]
  5. extern crate test;
  6. #[macro_use]
  7. extern crate slog;
  8. use std::io::Read;
  9. use std::sync::Arc;
  10. use crossbeam_channel::{Sender, Receiver, bounded, SendError};
  11. use std::{thread, mem};
  12. use std::time::*;
  13. use std::collections::VecDeque;
  14. use hyper::status::StatusCode;
  15. use hyper::client::response::Response;
  16. use hyper::Url;
  17. use hyper::client::Client;
  18. use slog::Drain;
  19. use chrono::prelude::*;
  20. use decimal::d128;
  21. use uuid::Uuid;
  22. use smallvec::SmallVec;
  23. use slog::Logger;
  24. use pretty_toa::ThousandsSep;
  25. /// whether non-finite `f64` and `d128` values should be skipped
  26. /// during serialization to influxdb line format. influx does not
  27. /// handle `NaN` values at all. the other option is a marker value,
  28. /// previously `-999.0` had been used.
  29. pub const SKIP_NAN_VALUES: bool = true;
  30. pub type Credentials = hyper::header::Authorization<hyper::header::Basic>;
  31. /// Created this so I know what types can be passed through the
  32. /// `measure!` macro, which used to convert with `as i64` and
  33. /// `as f64` until I accidentally passed a function name, and it
  34. /// still compiled, but with garbage numbers.
  35. pub trait AsI64 {
  36. fn as_i64(x: Self) -> i64;
  37. }
  38. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  39. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  40. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  41. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  42. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  43. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  44. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  45. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  46. impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  47. impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  48. impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  49. /// Created this so I know what types can be passed through the
  50. /// `measure!` macro, which used to convert with `as i64` and
  51. /// `as f64` until I accidentally passed a function name, and it
  52. /// still compiled, but with garbage numbers.
  53. pub trait AsF64 {
  54. fn as_f64(x: Self) -> f64;
  55. }
  56. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  57. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  58. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  59. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  60. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  61. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  62. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  63. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  64. ///
  65. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  66. /// values, as well as sends it with the `Sender`.
  67. ///
  68. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  69. /// measurement (see `tests` mod).
  70. ///
  71. /// # Examples
  72. ///
  73. /// ```
  74. /// #[macro_use]
  75. /// extern crate influx_writer;
  76. ///
  77. /// use influx_writer::{OwnedValue, OwnedMeasurement, AsI64};
  78. ///
  79. /// use decimal::d128;
  80. ///
  81. /// fn main() {
  82. /// let (tx, rx) = crossbeam_channel::bounded(1024);
  83. ///
  84. /// // "shorthand" syntax
  85. ///
  86. /// measure!(tx, test, t(color, "red"), i(n, 1));
  87. ///
  88. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  89. ///
  90. /// assert_eq!(meas.key, "test");
  91. /// assert_eq!(meas.get_tag("color"), Some("red"));
  92. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  93. ///
  94. /// measure!(tx, test,
  95. /// t(one, "a"), t(two, "b"), i(three, 2),
  96. /// f(four, 1.2345), s(five, String::from("d")),
  97. /// b(six, true), i(seven, 1 + 2),
  98. /// tm(1)
  99. /// );
  100. ///
  101. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  102. ///
  103. /// assert_eq!(meas.key, "test");
  104. /// assert_eq!(meas.get_tag("one"), Some("a"));
  105. /// assert_eq!(meas.get_tag("two"), Some("b"));
  106. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  107. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  108. /// assert_eq!(meas.timestamp, Some(1));
  109. ///
  110. /// // use the @make_meas flag to skip sending a measurement, instead merely
  111. /// // creating it.
  112. ///
  113. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, t(color, "red"), i(n, 1));
  114. ///
  115. /// // each variant also has shorthand aliases
  116. ///
  117. /// let meas: OwnedMeasurement = measure!(@make_meas abcd, t(color, "red"), i(n, 1), d(price, d128::zero()));
  118. /// }
  119. /// ```
  120. ///
  121. #[macro_export]
  122. macro_rules! measure {
  123. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  124. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  125. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  126. //(@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  127. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  128. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  129. (@kv v, $meas:ident, $k:expr) => { measure!(@ea t, $meas, "version", $k) };
  130. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  131. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  132. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Integer(AsI64::as_i64($v))) };
  133. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Float(AsF64::as_f64($v))) };
  134. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::String($v)) };
  135. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128($v)) };
  136. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Uuid($v)) };
  137. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Boolean(bool::from($v))) };
  138. (@ea D, $meas:ident, $k:expr, $v:expr) => {
  139. match $v {
  140. Some(v) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128(v)) }
  141. None => {}
  142. }
  143. };
  144. (@as_expr $e:expr) => {$e};
  145. (@count_tags) => {0usize};
  146. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  147. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  148. (@count_fields) => {0usize};
  149. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  150. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  151. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  152. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  153. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  154. };
  155. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  156. let n_tags = measure!(@count_tags $($t)*);
  157. let n_fields = measure!(@count_fields $($t)*);
  158. let mut meas =
  159. $crate::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  160. $(
  161. measure!(@kv $t, meas, $($tail)*);
  162. )*
  163. meas
  164. }};
  165. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  166. measure!($m, $name, $($t [ $($tail)* ] ),+)
  167. };
  168. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  169. #[allow(unused_imports)]
  170. use $crate::{AsI64, AsF64};
  171. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  172. let _ = $m.send(measurement);
  173. }};
  174. }
  175. /// converts a chrono::DateTime to an integer timestamp (ns)
  176. ///
  177. #[inline]
  178. pub fn nanos(t: DateTime<Utc>) -> u64 {
  179. (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64)
  180. }
  181. #[inline]
  182. pub fn secs(d: Duration) -> f64 {
  183. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  184. }
  185. #[inline]
  186. pub fn inanos(t: DateTime<Utc>) -> i64 {
  187. t.timestamp() * 1_000_000_000i64 + t.timestamp_subsec_nanos() as i64
  188. }
  189. //#[deprecated(since="0.4.3", note="Use `nanos(DateTime<Utc>) -> u64` instead")]
  190. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  191. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  192. }
  193. #[inline]
  194. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  195. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  196. }
  197. #[inline]
  198. pub fn nanos_utc(t: i64) -> DateTime<Utc> {
  199. Utc.timestamp(t / 1_000_000_000, (t % 1_000_000_000) as u32)
  200. }
  201. #[derive(Clone, Debug)]
  202. struct Point<T, V> {
  203. pub time: T,
  204. pub value: V
  205. }
  206. struct DurationWindow {
  207. pub size: Duration,
  208. pub mean: Duration,
  209. pub sum: Duration,
  210. pub count: u32,
  211. pub items: VecDeque<Point<Instant, Duration>>
  212. }
  213. #[allow(dead_code)]
  214. impl DurationWindow {
  215. #[inline]
  216. pub fn update(&mut self, time: Instant, value: Duration) {
  217. self.add(time, value);
  218. self.refresh(time);
  219. }
  220. #[inline]
  221. pub fn refresh(&mut self, t: Instant) -> &Self {
  222. if !self.items.is_empty() {
  223. let (n_remove, sum, count) =
  224. self.items.iter()
  225. .take_while(|x| t - x.time > self.size)
  226. .fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
  227. (n_remove + 1, sum - x.value, count - 1)
  228. });
  229. self.sum = sum;
  230. self.count = count;
  231. for _ in 0..n_remove {
  232. self.items.pop_front();
  233. }
  234. }
  235. if self.count > 0 {
  236. self.mean = self.sum / self.count.into();
  237. }
  238. self
  239. }
  240. #[inline]
  241. pub fn add(&mut self, time: Instant, value: Duration) {
  242. let p = Point { time, value };
  243. self.sum += p.value;
  244. self.count += 1;
  245. self.items.push_back(p);
  246. }
  247. }
  248. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  249. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  250. /// measurements have accumulated.
  251. ///
  252. #[derive(Debug)]
  253. pub struct InfluxWriter {
  254. host: String,
  255. db: String,
  256. tx: Sender<Option<OwnedMeasurement>>,
  257. thread: Option<Arc<thread::JoinHandle<()>>>,
  258. }
  259. impl Default for InfluxWriter {
  260. fn default() -> Self {
  261. InfluxWriter::new("localhost", "test")
  262. }
  263. }
  264. impl Clone for InfluxWriter {
  265. fn clone(&self) -> Self {
  266. debug_assert!(self.thread.is_some());
  267. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  268. InfluxWriter {
  269. host: self.host.to_string(),
  270. db: self.db.to_string(),
  271. tx: self.tx.clone(),
  272. thread,
  273. }
  274. }
  275. }
  276. impl InfluxWriter {
  277. pub fn host(&self) -> &str { self.host.as_str() }
  278. pub fn db(&self) -> &str { self.db.as_str() }
  279. /// Sends the `OwnedMeasurement` to the serialization thread.
  280. ///
  281. #[inline]
  282. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  283. self.tx.send(Some(m))
  284. }
  285. #[inline]
  286. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  287. #[inline]
  288. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  289. #[inline]
  290. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  291. #[inline]
  292. pub fn rsecs(&self, d: Duration) -> f64 {
  293. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  294. * 1000.0)
  295. .round()
  296. / 1000.0
  297. }
  298. #[inline]
  299. pub fn secs(&self, d: Duration) -> f64 {
  300. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  301. }
  302. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  303. self.tx.clone()
  304. }
  305. #[inline]
  306. pub fn is_full(&self) -> bool { self.tx.is_full() }
  307. pub fn placeholder() -> Self {
  308. let (tx, _) = bounded(1024);
  309. Self {
  310. host: String::new(),
  311. db: String::new(),
  312. tx,
  313. thread: None,
  314. }
  315. }
  316. pub fn new(host: &str, db: &str) -> Self {
  317. let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
  318. Self::with_logger_and_opt_creds(host, db, None, &noop_logger)
  319. }
  320. pub fn get_credentials(username: String, password: Option<String>) -> Credentials {
  321. hyper::header::Authorization(
  322. hyper::header::Basic { username, password }
  323. )
  324. }
  325. fn http_req<'a>(client: &'a Client, url: Url, body: &'a str, creds: &Option<Credentials>) -> hyper::client::RequestBuilder<'a> {
  326. let req = client.post(url.clone())
  327. .body(body);
  328. if let Some(auth) = creds {
  329. req.header(auth.clone())
  330. } else {
  331. req
  332. }
  333. }
  334. #[allow(unused_assignments)]
  335. pub fn with_logger(host: &str, db: &str, logger: &Logger) -> Self {
  336. Self::with_logger_and_opt_creds(host, db, None, logger)
  337. }
  338. pub fn with_logger_and_opt_creds(host: &str, db: &str, creds: Option<Credentials>, logger: &Logger) -> Self {
  339. let logger = logger.new(o!(
  340. "host" => host.to_string(),
  341. "db" => db.to_string()));
  342. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(4096);
  343. let url =
  344. Url::parse_with_params(&format!("http://{}:8086/write", host),
  345. &[("db", db), ("precision", "ns")])
  346. .expect("influx writer url should parse");
  347. let thread = thread::Builder::new().name(format!("inflx:{}", db)).spawn(move || {
  348. use std::time::*;
  349. use crossbeam_channel as chan;
  350. #[cfg(feature = "no-influx-buffer")]
  351. const N_BUFFER_LINES: usize = 0;
  352. const N_BUFFER_LINES: usize = 1024;
  353. const MAX_PENDING: Duration = Duration::from_secs(3);
  354. const INITIAL_BUFFER_CAPACITY: usize = 4096;
  355. const MAX_BACKLOG: usize = 1024;
  356. const MAX_OUTSTANDING_HTTP: usize = 64;
  357. const DEBUG_HB_EVERY: usize = 1024 * 96;
  358. const INFO_HB_EVERY: usize = 1024 * 1024;
  359. const N_HTTP_ATTEMPTS: u32 = 15;
  360. const INITIAL_BACKLOG: usize = MAX_OUTSTANDING_HTTP * 2;
  361. let client = Arc::new(Client::new());
  362. let creds = Arc::new(creds);
  363. info!(logger, "initializing InfluxWriter ...";
  364. "N_BUFFER_LINES" => N_BUFFER_LINES,
  365. "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING),
  366. "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP,
  367. "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
  368. "INITIAL_BACKLOG" => INITIAL_BACKLOG,
  369. "MAX_BACKLOG" => MAX_BACKLOG,
  370. );
  371. // pre-allocated buffers ready for use if the active one is stasheed
  372. // during an outage
  373. let mut spares: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
  374. // queue failed sends here until problem resolved, then send again. in worst
  375. // case scenario, loop back around on buffers queued in `backlog`, writing
  376. // over the oldest first.
  377. //
  378. let mut backlog: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
  379. for _ in 0..INITIAL_BACKLOG {
  380. spares.push_back(String::with_capacity(INITIAL_BUFFER_CAPACITY));
  381. }
  382. struct Resp {
  383. pub buf: String,
  384. pub took: Duration,
  385. }
  386. let mut db_health = DurationWindow {
  387. size: Duration::from_secs(120),
  388. mean: Duration::new(10, 0),
  389. sum: Duration::new(0, 0),
  390. count: 0,
  391. items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP),
  392. };
  393. let (http_tx, http_rx) = chan::bounded(32);
  394. let mut buf = spares.pop_front().unwrap();
  395. let mut count = 0;
  396. let mut extras = 0; // any new Strings we intro to the system
  397. let mut n_rcvd = 0;
  398. let mut in_flight_buffer_bytes = 0;
  399. let mut last = Instant::now();
  400. let mut active: bool;
  401. let mut last_clear = Instant::now();
  402. let mut last_memory_check = Instant::now();
  403. let mut loop_time: Instant;
  404. let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
  405. INITIAL_BACKLOG + extras - s.len() - b.len() - 1
  406. };
  407. assert_eq!(n_out(&spares, &backlog, extras), 0);
  408. let count_allocated_memory = |spares: &VecDeque<String>, backlog: &VecDeque<String>, in_flight_buffer_bytes: &usize| -> usize {
  409. spares.iter().map(|x| x.capacity()).sum::<usize>()
  410. + backlog.iter().map(|x| x.capacity()).sum::<usize>()
  411. + (*in_flight_buffer_bytes)
  412. };
  413. let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize, in_flight_buffer_bytes: &mut usize| {
  414. if n_outstanding >= MAX_OUTSTANDING_HTTP {
  415. backlog.push_back(buf);
  416. return
  417. }
  418. let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
  419. let tx = http_tx.clone();
  420. let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "in flight req at spawn time" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
  421. let client = Arc::clone(&client);
  422. let creds = Arc::clone(&creds);
  423. *in_flight_buffer_bytes = *in_flight_buffer_bytes + buf.capacity();
  424. debug!(logger, "launching http thread");
  425. let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
  426. let logger = thread_logger;
  427. debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len());
  428. let start = Instant::now();
  429. for n_req in 0..N_HTTP_ATTEMPTS {
  430. let throttle = Duration::from_secs(2) * n_req * n_req;
  431. if n_req > 0 {
  432. warn!(logger, "InfluxWriter http thread: pausing before next request";
  433. "n_req" => n_req,
  434. "throttle" => %format_args!("{:?}", throttle),
  435. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  436. thread::sleep(throttle); // 0, 2, 8, 16, 32
  437. }
  438. let sent = Instant::now();
  439. let req = Self::http_req(&client, url.clone(), buf.as_str(), &creds);
  440. let resp = req.send();
  441. let rcvd = Instant::now();
  442. let took = rcvd - sent;
  443. let mut n_tx = 0u32;
  444. match resp {
  445. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  446. debug!(logger, "server responded ok: 204 NoContent");
  447. buf.clear();
  448. let mut resp = Some(Ok(Resp { buf, took }));
  449. loop {
  450. n_tx += 1;
  451. match tx.try_send(resp.take().unwrap()) {
  452. Ok(_) => {
  453. if n_req > 0 {
  454. info!(logger, "successfully recovered from failed request with retry";
  455. "n_req" => n_req,
  456. "n_tx" => n_tx,
  457. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  458. }
  459. return
  460. }
  461. Err(chan::TrySendError::Full(r)) => {
  462. let throttle = Duration::from_millis(1000) * n_tx;
  463. warn!(logger, "channel full: InfluxWriter http thread failed to return buf";
  464. "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle));
  465. resp = Some(r);
  466. thread::sleep(throttle);
  467. }
  468. Err(chan::TrySendError::Disconnected(_)) => {
  469. warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return";
  470. "n_tx" => n_tx, "n_req" => n_req);
  471. return
  472. }
  473. }
  474. }
  475. }
  476. Ok(mut resp) => {
  477. let mut server_resp = String::new();
  478. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  479. error!(logger, "influx server error (request took {:?})", took;
  480. "status" => %resp.status,
  481. "body" => server_resp);
  482. }
  483. Err(e) => {
  484. error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e);
  485. }
  486. }
  487. }
  488. let took = Instant::now() - start;
  489. warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
  490. "took" => %format_args!("{:?}", took));
  491. let buflen = buf.len();
  492. let n_lines = buf.lines().count();
  493. if let Err(e) = tx.send(Err(Resp { buf, took })) {
  494. crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e;
  495. "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines);
  496. }
  497. });
  498. if let Err(e) = thread_res {
  499. crit!(logger, "failed to spawn thread: {}", e);
  500. }
  501. };
  502. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
  503. match prev {
  504. 0 if N_BUFFER_LINES > 0 => {
  505. serialize_owned(m, buf);
  506. Ok(1)
  507. }
  508. n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => {
  509. buf.push_str("\n");
  510. serialize_owned(m, buf);
  511. Ok(n + 1)
  512. }
  513. n => {
  514. buf.push_str("\n");
  515. serialize_owned(m, buf);
  516. Err(n + 1)
  517. }
  518. }
  519. };
  520. 'event: loop {
  521. loop_time = Instant::now();
  522. active = false;
  523. if loop_time - last_memory_check > Duration::from_secs(300) {
  524. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  525. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  526. info!(logger, "InfluxWriter: allocated memory: {:.1}MB", allocated_mb;
  527. "allocated bytes" => allocated_bytes,
  528. "in flight buffer bytes" => in_flight_buffer_bytes,
  529. "spares.len()" => spares.len(),
  530. "backlog.len()" => backlog.len(),
  531. );
  532. last_memory_check = loop_time;
  533. }
  534. match rx.recv() {
  535. Ok(Some(mut meas)) => {
  536. n_rcvd += 1;
  537. active = true;
  538. if n_rcvd % INFO_HB_EVERY == 0 {
  539. let n_outstanding = n_out(&spares, &backlog, extras);
  540. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  541. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  542. info!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep();
  543. "n_outstanding" => n_outstanding,
  544. "spares.len()" => spares.len(),
  545. "n_rcvd" => n_rcvd,
  546. "n_active_buf" => count,
  547. "db_health" => %format_args!("{:?}", db_health.mean),
  548. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  549. "backlog.len()" => backlog.len());
  550. } else if n_rcvd % DEBUG_HB_EVERY == 0 {
  551. let n_outstanding = n_out(&spares, &backlog, extras);
  552. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  553. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  554. debug!(logger, "InfluxWriter: rcvd {} measurements", n_rcvd.thousands_sep();
  555. "n_outstanding" => n_outstanding,
  556. "spares.len()" => spares.len(),
  557. "n_rcvd" => n_rcvd,
  558. "n_active_buf" => count,
  559. "db_health" => %format_args!("{:?}", db_health.mean),
  560. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  561. "backlog.len()" => backlog.len());
  562. }
  563. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  564. if meas.fields.is_empty() {
  565. meas.fields.push(("n", OwnedValue::Integer(1)));
  566. }
  567. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  568. count = match next(count, &meas, &mut buf, loop_time, last) {
  569. Ok(n) => n,
  570. Err(_n) => {
  571. let mut count = 0;
  572. let mut next: String = match spares.pop_front() {
  573. Some(x) => x,
  574. None => {
  575. let n_outstanding = n_out(&spares, &backlog, extras);
  576. if n_outstanding > MAX_BACKLOG {
  577. warn!(logger, "InfluxWriter: no available buffers in `spares`, pulling from backlog";
  578. "n_outstanding" => n_outstanding,
  579. "spares.len()" => spares.len(),
  580. "n_rcvd" => n_rcvd,
  581. "backlog.len()" => backlog.len());
  582. match backlog.pop_front() {
  583. // Note: this does not clear the backlog buffer,
  584. // instead we will just write more and more until
  585. // we are out of memory. I expect that will never
  586. // happen.
  587. //
  588. Some(x) => {
  589. count = 1; // otherwise, no '\n' added in `next(..)` - we are
  590. // sending a "full" buffer to be extended
  591. x
  592. }
  593. None => {
  594. extras += 1;
  595. crit!(logger, "InfluxWriter: failed to pull from backlog, too!! WTF #!(*#(* ... creating new String";
  596. "n_outstanding" => n_outstanding,
  597. "spares.len()" => spares.len(),
  598. "backlog.len()" => backlog.len(),
  599. "n_rcvd" => n_rcvd,
  600. "extras" => extras);
  601. String::new()
  602. }
  603. }
  604. } else {
  605. extras += 1;
  606. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY;
  607. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  608. info!(logger, "InfluxWriter: allocating new buffer: zero spares avail";
  609. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  610. "n_outstanding" => n_outstanding,
  611. "extras" => extras,
  612. );
  613. String::with_capacity(INITIAL_BUFFER_CAPACITY)
  614. }
  615. }
  616. };
  617. // after swap, buf in next, so want to send next
  618. //
  619. mem::swap(&mut buf, &mut next);
  620. let n_outstanding = n_out(&spares, &backlog, extras);
  621. send(next, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  622. last = loop_time;
  623. count
  624. }
  625. };
  626. }
  627. Ok(None) => {
  628. let start = Instant::now();
  629. let mut hb = Instant::now();
  630. warn!(logger, "terminate signal rcvd"; "count" => count);
  631. if buf.len() > 0 {
  632. info!(logger, "InfluxWriter: sending remaining buffer to influx on terminate"; "count" => count);
  633. let meas = OwnedMeasurement::new("influx_writer").add_field("n", OwnedValue::Integer(1));
  634. let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last);
  635. let n_outstanding = n_out(&spares, &backlog, extras);
  636. let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
  637. mem::swap(&mut buf, &mut placeholder);
  638. send(placeholder, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  639. }
  640. let mut n_ok = 0;
  641. let mut n_err = 0;
  642. loop {
  643. loop_time = Instant::now();
  644. let n_outstanding = n_out(&spares, &backlog, extras);
  645. if backlog.is_empty() && n_outstanding < 1 {
  646. info!(logger, "InfluxWriter: cleared any remaining backlog";
  647. "n_outstanding" => n_outstanding,
  648. "spares.len()" => spares.len(),
  649. "backlog.len()" => backlog.len(),
  650. "n_cleared_ok" => n_ok,
  651. "n_cleared_err" => n_err,
  652. "n_rcvd" => n_rcvd,
  653. "extras" => extras,
  654. "elapsed" => %format_args!("{:?}", loop_time - start));
  655. break 'event
  656. }
  657. if loop_time - hb > Duration::from_secs(5) {
  658. info!(logger, "InfluxWriter still clearing backlog ..";
  659. "n_outstanding" => n_outstanding,
  660. "spares.len()" => spares.len(),
  661. "backlog.len()" => backlog.len(),
  662. "n_cleared_ok" => n_ok,
  663. "n_cleared_err" => n_err,
  664. "extras" => extras,
  665. "n_rcvd" => n_rcvd,
  666. "elapsed" => %format_args!("{:?}", loop_time - start));
  667. hb = loop_time;
  668. }
  669. if let Some(buf) = backlog.pop_front() {
  670. let n_outstanding = n_out(&spares, &backlog, extras);
  671. debug!(logger, "InfluxWriter: resending queued buffer from backlog";
  672. "backlog.len()" => backlog.len(),
  673. "spares.len()" => spares.len(),
  674. "n_rcvd" => n_rcvd,
  675. "n_outstanding" => n_outstanding);
  676. send(buf, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  677. }
  678. 'rx: loop {
  679. match http_rx.try_recv() {
  680. Ok(Ok(Resp { buf, .. })) => {
  681. n_ok += 1;
  682. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  683. if spares.len() <= INITIAL_BACKLOG {
  684. spares.push_back(buf); // needed so `n_outstanding` count remains accurate
  685. } else {
  686. extras = extras.saturating_sub(1);
  687. }
  688. }
  689. Ok(Err(Resp { buf, .. })) => {
  690. warn!(logger, "InfluxWriter: requeueing failed request"; "buf.len()" => buf.len());
  691. n_err += 1;
  692. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  693. backlog.push_front(buf);
  694. }
  695. Err(chan::TryRecvError::Disconnected) => {
  696. crit!(logger, "InfluxWriter: trying to clear backlog, but http_rx disconnected! aborting";
  697. "n_outstanding" => n_outstanding,
  698. "backlog.len()" => backlog.len(),
  699. "n_cleared_ok" => n_ok,
  700. "n_cleared_err" => n_err,
  701. "extras" => extras,
  702. "n_rcvd" => n_rcvd,
  703. "elapsed" => %format_args!("{:?}", loop_time - start));
  704. break 'event
  705. }
  706. Err(_) => break 'rx
  707. }
  708. }
  709. thread::sleep(Duration::from_millis(1));
  710. }
  711. }
  712. _ => {}
  713. }
  714. db_health.refresh(loop_time);
  715. let n_outstanding = n_out(&spares, &backlog, extras);
  716. let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
  717. if (n_outstanding < MAX_OUTSTANDING_HTTP
  718. || loop_time.saturating_duration_since(last_clear) > Duration::from_secs(60))
  719. && healthy {
  720. if let Some(queued) = backlog.pop_front() {
  721. let n_outstanding = n_out(&spares, &backlog, extras);
  722. send(queued, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  723. active = true;
  724. }
  725. last_clear = loop_time;
  726. }
  727. loop {
  728. match http_rx.try_recv() {
  729. Ok(Ok(Resp { buf, took })) => {
  730. db_health.add(loop_time, took);
  731. let in_flight_before = in_flight_buffer_bytes.clone();
  732. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  733. if spares.len() <= INITIAL_BACKLOG {
  734. spares.push_back(buf);
  735. } else {
  736. extras = extras.saturating_sub(1);
  737. debug!(logger, "InfluxWriter: dropping buffer to reduce memory back to INITIAL_BACKLOG size";
  738. "spares.len()" => spares.len(),
  739. "extras" => extras,
  740. "in flight before" => in_flight_before,
  741. "in in_flight_buffer_bytes" => in_flight_buffer_bytes,
  742. );
  743. }
  744. //spares.push_back(buf);
  745. active = true;
  746. }
  747. Ok(Err(Resp { buf, took })) => {
  748. db_health.add(loop_time, took);
  749. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  750. backlog.push_front(buf);
  751. active = true;
  752. }
  753. Err(chan::TryRecvError::Disconnected) => {
  754. crit!(logger, "InfluxWriter: trying to recover buffers, but http_rx disconnected! aborting";
  755. "n_outstanding" => n_outstanding,
  756. "backlog.len()" => backlog.len(),
  757. "n_rcvd" => n_rcvd,
  758. "extras" => extras);
  759. break 'event
  760. }
  761. Err(_) => break
  762. }
  763. }
  764. if !active {
  765. thread::sleep(Duration::new(0, 1))
  766. }
  767. }
  768. thread::sleep(Duration::from_millis(10));
  769. }).unwrap();
  770. InfluxWriter {
  771. host: host.to_string(),
  772. db: db.to_string(),
  773. tx,
  774. thread: Some(Arc::new(thread))
  775. }
  776. }
  777. }
  778. impl Drop for InfluxWriter {
  779. fn drop(&mut self) {
  780. if let Some(arc) = self.thread.take() {
  781. if let Ok(thread) = Arc::try_unwrap(arc) {
  782. let _ = self.tx.send(None);
  783. let _ = thread.join();
  784. }
  785. }
  786. }
  787. }
  788. /// This removes offending things rather than escaping them.
  789. ///
  790. fn escape_tag(s: &str) -> String {
  791. s.replace(" ", "")
  792. .replace(",", "")
  793. .replace("\"", "")
  794. }
  795. fn escape(s: &str) -> String {
  796. s.replace(" ", "\\ ")
  797. .replace(",", "\\,")
  798. }
  799. fn as_string(s: &str) -> String {
  800. // the second replace removes double escapes
  801. //
  802. format!("\"{}\"", s.replace("\"", "\\\"")
  803. .replace(r#"\\""#, r#"\""#))
  804. }
  805. #[test]
  806. fn it_checks_as_string_does_not_double_escape() {
  807. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  808. let escaped = as_string(&raw);
  809. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  810. }
  811. fn as_boolean(b: &bool) -> &str {
  812. if *b { "t" } else { "f" }
  813. }
  814. pub fn now() -> i64 {
  815. nanos(Utc::now()) as i64
  816. }
  817. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  818. ///
  819. /// The serialized measurement is appended to the end of the string without
  820. /// any regard for what exited in it previously.
  821. ///
  822. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  823. line.push_str(&escape_tag(measurement.key));
  824. let add_tag = |line: &mut String, key: &str, value: &str| {
  825. line.push_str(",");
  826. line.push_str(&escape_tag(key));
  827. line.push_str("=");
  828. line.push_str(&escape(value));
  829. };
  830. for (key, value) in measurement.tags.iter() {
  831. #[cfg(not(feature = "string-tags"))]
  832. add_tag(line, key, value);
  833. #[cfg(feature = "string-tags")]
  834. add_tag(line, key, value.as_str());
  835. }
  836. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| -> bool {
  837. if SKIP_NAN_VALUES && ! value.is_finite() { return false }
  838. if is_first { line.push_str(" "); } else { line.push_str(","); }
  839. line.push_str(&escape_tag(key));
  840. line.push_str("=");
  841. match *value {
  842. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  843. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  844. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  845. OwnedValue::D128(ref d) => {
  846. if d.is_finite() {
  847. line.push_str(&format!("{}", d));
  848. } else {
  849. line.push_str("-999.0");
  850. }
  851. }
  852. OwnedValue::Float(ref f) => {
  853. if f.is_finite() {
  854. line.push_str(&format!("{}", f));
  855. } else {
  856. line.push_str("-999.0");
  857. }
  858. }
  859. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  860. };
  861. true
  862. };
  863. // use this counter to ensure that at least one field was
  864. // serialized. since NaN values may be skipped, the serialization
  865. // would be wrong if no fields ended up being serialized. instead,
  866. // track it, and if there are none serialized, add a n=1 to make
  867. // the measurement serialize properly
  868. //
  869. // this also controls what value is passed to the `is_first` argument
  870. // of `add_field`
  871. let mut n_fields_serialized = 0;
  872. for kv in measurement.fields.iter() {
  873. if add_field(line, kv.0, &kv.1, n_fields_serialized == 0) {
  874. n_fields_serialized += 1;
  875. }
  876. }
  877. // supply a minimum of one field (n=1)
  878. //
  879. // TODO: could potentially clobber a "n" tag? do we care?
  880. //
  881. if n_fields_serialized == 0 { add_field(line, "n", &OwnedValue::Integer(1), true); }
  882. if let Some(t) = measurement.timestamp {
  883. line.push_str(" ");
  884. line.push_str(&t.to_string());
  885. }
  886. }
  887. #[derive(Debug, Clone, PartialEq)]
  888. pub enum OwnedValue {
  889. String(String),
  890. Float(f64),
  891. Integer(i64),
  892. Boolean(bool),
  893. D128(d128),
  894. Uuid(Uuid),
  895. }
  896. impl OwnedValue {
  897. /// if `self` is a `Float` or `D128` variant, checks
  898. /// whether the contained value is finite
  899. ///
  900. /// # Examples
  901. ///
  902. /// ```
  903. /// use std::str::FromStr;
  904. /// use influx_writer::OwnedValue;
  905. ///
  906. /// let v1 = OwnedValue::Float(f64::NAN);
  907. /// assert!( ! v1.is_finite());
  908. /// let v2 = OwnedValue::Float(1.234f64);
  909. /// assert!(v2.is_finite());
  910. ///
  911. /// let v3 = OwnedValue::D128(decimal::d128::from_str("NaN").unwrap());
  912. /// assert!( ! v3.is_finite());
  913. /// let v4 = OwnedValue::D128(decimal::d128::from_str("42.42").unwrap());
  914. /// assert!(v4.is_finite());
  915. ///
  916. /// // other variants are always "finite"
  917. /// assert!(OwnedValue::String("NaN".into()).is_finite());
  918. /// ```
  919. pub fn is_finite(&self) -> bool {
  920. match self {
  921. OwnedValue::Float(x) => x.is_finite(),
  922. OwnedValue::D128(x) => x.is_finite(),
  923. _ => true,
  924. }
  925. }
  926. }
  927. /// Holds data meant for an influxdb measurement in transit to the
  928. /// writing thread.
  929. ///
  930. #[derive(Clone, Debug)]
  931. pub struct OwnedMeasurement {
  932. pub key: &'static str,
  933. pub timestamp: Option<i64>,
  934. //pub fields: Map<&'static str, OwnedValue>,
  935. //pub tags: Map<&'static str, &'static str>,
  936. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  937. #[cfg(not(feature = "string-tags"))]
  938. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  939. #[cfg(feature = "string-tags")]
  940. pub tags: SmallVec<[(&'static str, String); 8]>,
  941. }
  942. impl OwnedMeasurement {
  943. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  944. OwnedMeasurement {
  945. key,
  946. timestamp: None,
  947. tags: SmallVec::with_capacity(n_tags),
  948. fields: SmallVec::with_capacity(n_fields),
  949. }
  950. }
  951. pub fn new(key: &'static str) -> Self {
  952. OwnedMeasurement {
  953. key,
  954. timestamp: None,
  955. tags: SmallVec::new(),
  956. fields: SmallVec::new(),
  957. }
  958. }
  959. /// Unusual consuming `self` signature because primarily used by
  960. /// the `measure!` macro.
  961. #[cfg(not(feature = "string-tags"))]
  962. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  963. self.tags.push((key, value));
  964. self
  965. }
  966. #[cfg(feature = "string-tags")]
  967. pub fn add_tag<S: ToString>(mut self, key: &'static str, value: S) -> Self {
  968. self.tags.push((key, value.to_string()));
  969. self
  970. }
  971. /// Unusual consuming `self` signature because primarily used by
  972. /// the `measure!` macro.
  973. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  974. self.fields.push((key, value));
  975. self
  976. }
  977. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  978. self.timestamp = Some(timestamp);
  979. self
  980. }
  981. #[cfg(not(feature = "string-tags"))]
  982. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  983. match self.tags.iter().position(|kv| kv.0 == key) {
  984. Some(i) => {
  985. self.tags.get_mut(i)
  986. .map(|x| {
  987. x.0 = value;
  988. });
  989. self
  990. }
  991. None => {
  992. self.add_tag(key, value)
  993. }
  994. }
  995. }
  996. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  997. self.fields.iter()
  998. .find(|kv| kv.0 == key)
  999. .map(|kv| &kv.1)
  1000. }
  1001. #[cfg(feature = "string-tags")]
  1002. pub fn get_tag(&self, key: &'static str) -> Option<&str> {
  1003. self.tags.iter()
  1004. .find(|kv| kv.0 == key)
  1005. .map(|kv| kv.1.as_str())
  1006. }
  1007. #[cfg(not(feature = "string-tags"))]
  1008. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  1009. self.tags.iter()
  1010. .find(|kv| kv.0 == key)
  1011. .map(|kv| kv.1)
  1012. }
  1013. }
  1014. #[allow(unused)]
  1015. #[cfg(test)]
  1016. mod tests {
  1017. use std::str::FromStr;
  1018. use super::*;
  1019. #[cfg(feature = "unstable")]
  1020. use test::{black_box, Bencher};
  1021. #[ignore]
  1022. #[cfg(feature = "unstable")]
  1023. #[bench]
  1024. fn measure_ten(b: &mut Bencher) {
  1025. let influx = InfluxWriter::new("localhost", "test");
  1026. let mut n = 0;
  1027. b.iter(|| {
  1028. for _ in 0..10 {
  1029. let time = influx.nanos(Utc::now());
  1030. n += 1;
  1031. measure!(influx, million, i(n), tm(time));
  1032. }
  1033. });
  1034. }
  1035. #[test]
  1036. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  1037. const VERSION: &str = "0.3.90";
  1038. let tag_value = "one";
  1039. let color = "red";
  1040. let time = Utc::now();
  1041. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  1042. assert_eq!(m.get_tag("color"), Some("red"));
  1043. assert_eq!(m.get_tag("version"), Some(VERSION));
  1044. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  1045. }
  1046. #[test]
  1047. fn it_uses_the_v_for_version_shortcut() {
  1048. const VERSION: &str = "0.3.90";
  1049. let tag_value = "one";
  1050. let color = "red";
  1051. let time = now();
  1052. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  1053. assert_eq!(m.get_tag("color"), Some("red"));
  1054. assert_eq!(m.get_tag("version"), Some(VERSION));
  1055. assert_eq!(m.timestamp, Some(time));
  1056. }
  1057. #[test]
  1058. fn it_uses_the_new_tag_k_only_shortcut() {
  1059. let tag_value = "one";
  1060. let color = "red";
  1061. let time = now();
  1062. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  1063. assert_eq!(m.get_tag("color"), Some("red"));
  1064. assert_eq!(m.get_tag("tag_value"), Some("one"));
  1065. assert_eq!(m.timestamp, Some(time));
  1066. }
  1067. #[test]
  1068. fn it_uses_measure_macro_parenthesis_syntax() {
  1069. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  1070. assert_eq!(m.key, "test");
  1071. assert_eq!(m.get_tag("a"), Some("b"));
  1072. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  1073. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1074. assert_eq!(m.timestamp, Some(1));
  1075. }
  1076. #[test]
  1077. fn it_uses_measure_macro_on_a_self_attribute() {
  1078. struct A {
  1079. pub influx: InfluxWriter,
  1080. }
  1081. impl A {
  1082. fn f(&self) {
  1083. measure!(self.influx, test, t(color, "red"), i(n, 1));
  1084. }
  1085. }
  1086. let a = A { influx: InfluxWriter::default() };
  1087. a.f();
  1088. }
  1089. #[test]
  1090. fn it_clones_an_influx_writer_to_check_both_drop() {
  1091. let influx = InfluxWriter::default();
  1092. measure!(influx, drop_test, i(a, 1), i(b, 2));
  1093. {
  1094. let influx = influx.clone();
  1095. thread::spawn(move || {
  1096. measure!(influx, drop_test, i(a, 3), i(b, 4));
  1097. });
  1098. }
  1099. }
  1100. #[cfg(feature = "unstable")]
  1101. #[bench]
  1102. fn influx_writer_send_basic(b: &mut Bencher) {
  1103. let m = InfluxWriter::new("localhost", "test");
  1104. b.iter(|| {
  1105. measure!(m, test, t(color; "red"), i(n, 1)); //, float[p; 1.234]);
  1106. });
  1107. }
  1108. #[cfg(feature = "unstable")]
  1109. #[bench]
  1110. fn influx_writer_send_price(b: &mut Bencher) {
  1111. let m = InfluxWriter::new("localhost", "test");
  1112. b.iter(|| {
  1113. measure!(m, test,
  1114. t(ticker, "xmr_btc"),
  1115. t(exchange, "plnx"),
  1116. d(bid, d128::zero()),
  1117. d(ask, d128::zero()),
  1118. );
  1119. });
  1120. }
  1121. #[test]
  1122. fn it_checks_color_tag_error_in_non_doctest() {
  1123. let (tx, rx) = bounded(1024);
  1124. measure!(tx, test, t(color,"red"), i(n,1));
  1125. let meas: OwnedMeasurement = rx.recv().unwrap();
  1126. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  1127. }
  1128. #[test]
  1129. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  1130. let meas = measure!(@make_meas test_measurement,
  1131. t(one, "a"), t(two, "b"), i(three, 2),
  1132. f(four, 1.2345), s(five, String::from("d")),
  1133. b(six, true), i(seven, 1 + 2),
  1134. tm(1)
  1135. );
  1136. assert_eq!(meas.key, "test_measurement");
  1137. assert_eq!(meas.get_tag("one"), Some("a"));
  1138. assert_eq!(meas.get_tag("two"), Some("b"));
  1139. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1140. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1141. assert_eq!(meas.timestamp, Some(1));
  1142. }
  1143. #[test]
  1144. fn it_uses_measure_macro_for_d128_and_uuid() {
  1145. let (tx, rx) = bounded(1024);
  1146. let one = "a";
  1147. let two = d128::zero();
  1148. let three = Uuid::new_v4();
  1149. let time = now();
  1150. measure!(tx, test_measurement, t(one), d(two), u(three), tm(time));
  1151. thread::sleep(Duration::from_millis(10));
  1152. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1153. assert_eq!(meas.key, "test_measurement");
  1154. assert_eq!(meas.get_tag("one"), Some("a"));
  1155. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  1156. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(three)));
  1157. assert_eq!(meas.timestamp, Some(time));
  1158. }
  1159. #[test]
  1160. fn it_uses_the_measure_macro_alt_syntax() {
  1161. let (tx, rx) = bounded(1024);
  1162. measure!(tx, test_measurement,
  1163. t(one, "a"), t(two, "b"), i(three, 2),
  1164. f(four, 1.2345), s(five, String::from("d")),
  1165. b(six, true), i(seven, 1 + 2),
  1166. tm(1)
  1167. );
  1168. thread::sleep(Duration::from_millis(10));
  1169. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1170. assert_eq!(meas.key, "test_measurement");
  1171. assert_eq!(meas.get_tag("one"), Some("a"));
  1172. assert_eq!(meas.get_tag("two"), Some("b"));
  1173. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1174. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1175. assert_eq!(meas.timestamp, Some(1));
  1176. }
  1177. #[test]
  1178. fn it_checks_that_fields_are_separated_correctly() {
  1179. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  1180. assert_eq!(m.key, "test");
  1181. assert_eq!(m.get_tag("a"), Some("one"));
  1182. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1183. let mut buf = String::new();
  1184. serialize_owned(&m, &mut buf);
  1185. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  1186. }
  1187. #[test]
  1188. fn try_to_break_measure_macro() {
  1189. let (tx, _) = bounded(1024);
  1190. measure!(tx, one, t(x,"y"), i(n,1));
  1191. measure!(tx, one, t(x,"y"), i(n,1),);
  1192. struct A {
  1193. pub one: i32,
  1194. pub two: i32,
  1195. }
  1196. struct B {
  1197. pub a: A
  1198. }
  1199. let b = B { a: A { one: 1, two: 2 } };
  1200. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  1201. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  1202. }
  1203. #[cfg(feature = "unstable")]
  1204. #[bench]
  1205. fn measure_macro_small(b: &mut Bencher) {
  1206. let (tx, rx) = bounded(1024);
  1207. let listener = thread::spawn(move || {
  1208. loop { if rx.recv().is_err() { break } }
  1209. });
  1210. b.iter(|| {
  1211. measure!(tx, test, t(color, "red"), i(n, 1), tm(now()));
  1212. });
  1213. }
  1214. #[cfg(feature = "unstable")]
  1215. #[bench]
  1216. fn measure_macro_medium(b: &mut Bencher) {
  1217. let (tx, rx) = bounded(1024);
  1218. let listener = thread::spawn(move || {
  1219. loop { if rx.recv().is_err() { break } }
  1220. });
  1221. b.iter(|| {
  1222. measure!(tx, test, t(color, "red"), t(mood, "playful"),
  1223. t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322),
  1224. i(n, 1), tm(now()));
  1225. });
  1226. }
  1227. #[cfg(feature = "unstable")]
  1228. #[bench]
  1229. fn serialize_owned_longer(b: &mut Bencher) {
  1230. let mut buf = String::with_capacity(1024);
  1231. let m =
  1232. OwnedMeasurement::new("test")
  1233. .add_tag("one", "a")
  1234. .add_tag("two", "b")
  1235. .add_tag("ticker", "xmr_btc")
  1236. .add_tag("exchange", "plnx")
  1237. .add_tag("side", "bid")
  1238. .add_field("three", OwnedValue::Float(1.2345))
  1239. .add_field("four", OwnedValue::Integer(57))
  1240. .add_field("five", OwnedValue::Boolean(true))
  1241. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  1242. .set_timestamp(now());
  1243. b.iter(|| {
  1244. serialize_owned(&m, &mut buf);
  1245. buf.clear()
  1246. });
  1247. }
  1248. #[cfg(feature = "unstable")]
  1249. #[bench]
  1250. fn serialize_owned_simple(b: &mut Bencher) {
  1251. let mut buf = String::with_capacity(1024);
  1252. let m =
  1253. OwnedMeasurement::new("test")
  1254. .add_tag("one", "a")
  1255. .add_tag("two", "b")
  1256. .add_field("three", OwnedValue::Float(1.2345))
  1257. .add_field("four", OwnedValue::Integer(57))
  1258. .set_timestamp(now());
  1259. b.iter(|| {
  1260. serialize_owned(&m, &mut buf);
  1261. buf.clear()
  1262. });
  1263. }
  1264. #[cfg(feature = "unstable")]
  1265. #[bench]
  1266. fn clone_url_for_thread(b: &mut Bencher) {
  1267. let host = "ahmes";
  1268. let db = "mlp";
  1269. let url =
  1270. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1271. &[("db", db), ("precision", "ns")]).unwrap();
  1272. b.iter(|| {
  1273. url.clone()
  1274. })
  1275. }
  1276. #[cfg(feature = "unstable")]
  1277. #[bench]
  1278. fn clone_arc_url_for_thread(b: &mut Bencher) {
  1279. let host = "ahmes";
  1280. let db = "mlp";
  1281. let url =
  1282. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1283. &[("db", db), ("precision", "ns")]).unwrap();
  1284. let url = Arc::new(url);
  1285. b.iter(|| {
  1286. Arc::clone(&url)
  1287. })
  1288. }
  1289. #[test]
  1290. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  1291. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  1292. let mut buf = String::new();
  1293. let mut server_resp = String::new();
  1294. let m = OwnedMeasurement::new("rust_test")
  1295. .add_field("s", OwnedValue::String(raw.to_string()))
  1296. .set_timestamp(now());
  1297. serialize_owned(&m, &mut buf);
  1298. println!("{}", buf);
  1299. buf.push_str("\n");
  1300. let buf_copy = buf.clone();
  1301. buf.push_str(&buf_copy);
  1302. println!("{}", buf);
  1303. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1304. let client = Client::new();
  1305. let req = InfluxWriter::http_req(&client, url.clone(), &buf, &None);
  1306. match req.send() {
  1307. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1308. Ok(mut resp) => {
  1309. resp.read_to_string(&mut server_resp).unwrap();
  1310. panic!("{}", server_resp);
  1311. }
  1312. Err(why) => {
  1313. panic!(why)
  1314. }
  1315. }
  1316. }
  1317. #[cfg(feature = "auth-tests")]
  1318. #[test]
  1319. fn it_sends_authenticated_measurements() {
  1320. let creds = InfluxWriter::get_credentials("auth_test_user".into(), Some("hot dog".into()));
  1321. let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
  1322. //let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  1323. //let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  1324. //let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  1325. //let root = slog::Logger::root(drain, o!("version" => "0.1"));
  1326. //let influx = InfluxWriter::with_logger_and_opt_creds("localhost", "auth_test", Some(creds), &root);
  1327. measure!(influx, auth_test_meas, i(n, 1));
  1328. drop(influx);
  1329. }
  1330. #[test]
  1331. fn it_skips_nan_values() {
  1332. assert!(SKIP_NAN_VALUES, "otherwise this test is worthless");
  1333. let m = OwnedMeasurement::new("rust_test")
  1334. .add_field("hello", OwnedValue::Integer(1234))
  1335. .add_field("finite_float", OwnedValue::Float(1.234))
  1336. .add_field("nan_float", OwnedValue::Float(f64::NAN))
  1337. .add_field("inf_float", OwnedValue::Float(f64::INFINITY))
  1338. .add_field("neg_inf_float", OwnedValue::Float(f64::NEG_INFINITY))
  1339. .add_field("finite_d128", OwnedValue::D128(d128::from_str("3.456").unwrap()))
  1340. .add_field("nan_d128", OwnedValue::D128(d128::from_str("NaN").unwrap()))
  1341. .set_timestamp(now());
  1342. let mut buf = String::new();
  1343. serialize_owned(&m, &mut buf);
  1344. dbg!(&buf);
  1345. assert!(buf.contains("hello=1234"));
  1346. assert!(buf.contains("finite_float=1.234"));
  1347. assert!( ! buf.contains("nan_float="));
  1348. assert!( ! buf.contains("inf_float="));
  1349. assert!( ! buf.contains("neg_inf_float="));
  1350. assert!(buf.contains("finite_d128=3.456"));
  1351. assert!( ! buf.contains("nan_d128="));
  1352. }
  1353. #[test]
  1354. fn it_supplies_a_field_if_every_field_is_skipped_because_nan() {
  1355. assert!(SKIP_NAN_VALUES, "otherwise this test is worthless");
  1356. let m = OwnedMeasurement::new("rust_test")
  1357. .add_field("nan_float", OwnedValue::Float(f64::NAN));
  1358. let mut buf = String::new();
  1359. serialize_owned(&m, &mut buf);
  1360. dbg!(&buf);
  1361. assert!(buf.contains("n=1i"));
  1362. }
  1363. }