You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1418 lines
54KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use crossbeam_channel::{Sender, Receiver, bounded, SendError};
  6. use std::{thread, mem};
  7. use std::time::*;
  8. use std::collections::VecDeque;
  9. use hyper::status::StatusCode;
  10. use hyper::client::response::Response;
  11. use hyper::Url;
  12. use hyper::client::Client;
  13. use chrono::{DateTime, Utc};
  14. use decimal::d128;
  15. use uuid::Uuid;
  16. use smallvec::SmallVec;
  17. use slog::Logger;
  18. use pretty_toa::ThousandsSep;
  19. use super::{nanos, file_logger, LOG_LEVEL};
  20. #[cfg(feature = "warnings")]
  21. use warnings::Warning;
  22. pub use super::{dur_nanos, dt_nanos, measure};
  23. pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096;
  24. /// Created this so I know what types can be passed through the
  25. /// `measure!` macro, which used to convert with `as i64` and
  26. /// `as f64` until I accidentally passed a function name, and it
  27. /// still compiled, but with garbage numbers.
  28. pub trait AsI64 {
  29. fn as_i64(x: Self) -> i64;
  30. }
  31. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  32. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  33. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  34. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  35. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  36. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  37. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  38. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  39. impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  40. impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  41. impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  42. /// Created this so I know what types can be passed through the
  43. /// `measure!` macro, which used to convert with `as i64` and
  44. /// `as f64` until I accidentally passed a function name, and it
  45. /// still compiled, but with garbage numbers.
  46. pub trait AsF64 {
  47. fn as_f64(x: Self) -> f64;
  48. }
  49. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  50. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  51. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  52. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  53. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  54. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  55. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  56. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  57. ///
  58. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  59. /// values, as well as sends it with the `Sender`.
  60. ///
  61. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  62. /// measurement (see `tests` mod).
  63. ///
  64. /// # Examples
  65. ///
  66. /// ```
  67. /// #![feature(try_from)]
  68. /// #[macro_use] extern crate logging;
  69. /// extern crate decimal;
  70. ///
  71. /// use std::sync::mpsc::channel;
  72. /// use decimal::d128;
  73. /// use logging::influx::*;
  74. ///
  75. /// fn main() {
  76. /// let (tx, rx) = crossbeam_channel::bounded(1024);
  77. ///
  78. /// // "shorthand" syntax
  79. ///
  80. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  81. ///
  82. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  83. ///
  84. /// assert_eq!(meas.key, "test");
  85. /// assert_eq!(meas.get_tag("color"), Some("red"));
  86. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  87. ///
  88. /// // alternate syntax ...
  89. ///
  90. /// measure!(tx, test,
  91. /// tag [ one => "a" ],
  92. /// tag [ two => "b" ],
  93. /// int [ three => 2 ],
  94. /// float [ four => 1.2345 ],
  95. /// string [ five => String::from("d") ],
  96. /// bool [ six => true ],
  97. /// int [ seven => { 1 + 2 } ],
  98. /// time [ 1 ]
  99. /// );
  100. ///
  101. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  102. ///
  103. /// assert_eq!(meas.key, "test");
  104. /// assert_eq!(meas.get_tag("one"), Some("a"));
  105. /// assert_eq!(meas.get_tag("two"), Some("b"));
  106. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  107. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  108. /// assert_eq!(meas.timestamp, Some(1));
  109. ///
  110. /// // use the @make_meas flag to skip sending a measurement, instead merely
  111. /// // creating it.
  112. ///
  113. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  114. ///
  115. /// // each variant also has shorthand aliases
  116. ///
  117. /// let meas: OwnedMeasurement =
  118. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  119. /// }
  120. /// ```
  121. ///
  122. #[macro_export]
  123. macro_rules! measure {
  124. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  125. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  126. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  127. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  128. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  129. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  130. (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) };
  131. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  132. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  133. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, &$v); };
  134. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  135. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  136. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  137. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  138. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  139. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  140. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  141. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  142. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  143. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  144. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  145. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  146. (@as_expr $e:expr) => {$e};
  147. (@count_tags) => {0usize};
  148. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  149. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  150. (@count_fields) => {0usize};
  151. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  152. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  153. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  154. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  155. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  156. };
  157. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  158. let n_tags = measure!(@count_tags $($t)*);
  159. let n_fields = measure!(@count_fields $($t)*);
  160. let mut meas =
  161. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  162. $(
  163. measure!(@kv $t, meas, $($tail)*);
  164. )*
  165. meas
  166. }};
  167. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  168. measure!($m, $name, $($t [ $($tail)* ] ),+)
  169. };
  170. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  171. #[allow(unused_imports)]
  172. use $crate::influx::{AsI64, AsF64};
  173. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  174. let _ = $m.send(measurement);
  175. }};
  176. }
  177. #[derive(Clone, Debug)]
  178. pub struct Point<T, V> {
  179. pub time: T,
  180. pub value: V
  181. }
  182. pub struct DurationWindow {
  183. pub size: Duration,
  184. pub mean: Duration,
  185. pub sum: Duration,
  186. pub count: u32,
  187. pub items: VecDeque<Point<Instant, Duration>>
  188. }
  189. impl DurationWindow {
  190. #[inline]
  191. pub fn update(&mut self, time: Instant, value: Duration) {
  192. self.add(time, value);
  193. self.refresh(time);
  194. }
  195. #[inline]
  196. pub fn refresh(&mut self, t: Instant) -> &Self {
  197. if !self.items.is_empty() {
  198. let (n_remove, sum, count) =
  199. self.items.iter()
  200. .take_while(|x| t - x.time > self.size)
  201. .fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
  202. (n_remove + 1, sum - x.value, count - 1)
  203. });
  204. self.sum = sum;
  205. self.count = count;
  206. for _ in 0..n_remove {
  207. self.items.pop_front();
  208. }
  209. }
  210. if self.count > 0 {
  211. self.mean = self.sum / self.count.into();
  212. }
  213. self
  214. }
  215. #[inline]
  216. pub fn add(&mut self, time: Instant, value: Duration) {
  217. let p = Point { time, value };
  218. self.sum += p.value;
  219. self.count += 1;
  220. self.items.push_back(p);
  221. }
  222. }
  223. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  224. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  225. /// measurements have accumulated.
  226. ///
  227. #[derive(Debug)]
  228. pub struct InfluxWriter {
  229. host: String,
  230. db: String,
  231. tx: Sender<Option<OwnedMeasurement>>,
  232. thread: Option<Arc<thread::JoinHandle<()>>>,
  233. }
  234. impl Default for InfluxWriter {
  235. fn default() -> Self {
  236. //if cfg!(any(test, feature = "test")) {
  237. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  238. //} else {
  239. InfluxWriter::new("localhost", "test", "/tmp/influx-test.log", 4096)
  240. //}
  241. }
  242. }
  243. impl Clone for InfluxWriter {
  244. fn clone(&self) -> Self {
  245. debug_assert!(self.thread.is_some());
  246. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  247. InfluxWriter {
  248. host: self.host.to_string(),
  249. db: self.db.to_string(),
  250. tx: self.tx.clone(),
  251. thread,
  252. }
  253. }
  254. }
  255. impl InfluxWriter {
  256. pub fn host(&self) -> &str { self.host.as_str() }
  257. pub fn db(&self) -> &str { self.db.as_str() }
  258. /// Sends the `OwnedMeasurement` to the serialization thread.
  259. ///
  260. #[cfg_attr(feature = "inlines", inline)]
  261. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  262. self.tx.send(Some(m))
  263. }
  264. #[cfg_attr(feature = "inlines", inline)]
  265. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  266. #[cfg_attr(feature = "inlines", inline)]
  267. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  268. #[cfg_attr(feature = "inlines", inline)]
  269. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  270. #[cfg_attr(feature = "inlines", inline)]
  271. pub fn rsecs(&self, d: Duration) -> f64 {
  272. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  273. * 1000.0)
  274. .round()
  275. / 1000.0
  276. }
  277. #[cfg_attr(feature = "inlines", inline)]
  278. pub fn secs(&self, d: Duration) -> f64 {
  279. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  280. }
  281. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  282. self.tx.clone()
  283. }
  284. pub fn is_full(&self) -> bool { self.tx.is_full() }
  285. pub fn placeholder() -> Self {
  286. let (tx, _) = bounded(1024);
  287. Self {
  288. host: String::new(),
  289. db: String::new(),
  290. tx,
  291. thread: None,
  292. }
  293. }
  294. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  295. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  296. Self::with_logger(host, db, buffer_size, logger)
  297. }
  298. #[allow(unused_assignments)]
  299. pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self {
  300. let logger = logger.new(o!(
  301. "host" => host.to_string(),
  302. "db" => db.to_string()));
  303. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(4096);
  304. let url =
  305. Url::parse_with_params(&format!("http://{}:8086/write", host),
  306. &[("db", db), ("precision", "ns")])
  307. .expect("influx writer url should parse");
  308. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  309. use std::time::*;
  310. use crossbeam_channel as chan;
  311. #[cfg(feature = "no-influx-buffer")]
  312. const N_BUFFER_LINES: usize = 0;
  313. const N_BUFFER_LINES: usize = 8192;
  314. const MAX_PENDING: Duration = Duration::from_secs(3);
  315. const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32;
  316. const MAX_BACKLOG: usize = 512;
  317. const MAX_OUTSTANDING_HTTP: usize = 64;
  318. const HB_EVERY: usize = 100_000;
  319. const N_HTTP_ATTEMPTS: u32 = 15;
  320. let client = Arc::new(Client::new());
  321. info!(logger, "initializing InfluxWriter ...";
  322. "N_BUFFER_LINES" => N_BUFFER_LINES,
  323. "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING),
  324. "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP,
  325. "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
  326. "MAX_BACKLOG" => MAX_BACKLOG);
  327. // pre-allocated buffers ready for use if the active one is stasheed
  328. // during an outage
  329. let mut spares: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG);
  330. // queue failed sends here until problem resolved, then send again. in worst
  331. // case scenario, loop back around on buffers queued in `backlog`, writing
  332. // over the oldest first.
  333. //
  334. let mut backlog: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG);
  335. for _ in 0..MAX_BACKLOG {
  336. spares.push_back(String::with_capacity(32 * 32 * 32));
  337. }
  338. struct Resp {
  339. pub buf: String,
  340. pub took: Duration,
  341. }
  342. let mut db_health = DurationWindow {
  343. size: Duration::from_secs(120),
  344. mean: Duration::new(10, 0),
  345. sum: Duration::new(0, 0),
  346. count: 0,
  347. items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP),
  348. };
  349. let (http_tx, http_rx) = chan::bounded(32);
  350. let mut buf = spares.pop_front().unwrap();
  351. let mut count = 0;
  352. let mut extras = 0; // any new Strings we intro to the system
  353. let mut n_rcvd = 0;
  354. let mut last = Instant::now();
  355. let mut active: bool;
  356. let mut last_clear = Instant::now();
  357. let mut loop_time = Instant::now();
  358. let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
  359. MAX_BACKLOG + extras - s.len() - b.len() - 1
  360. };
  361. assert_eq!(n_out(&spares, &backlog, extras), 0);
  362. let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize| {
  363. if n_outstanding >= MAX_OUTSTANDING_HTTP {
  364. backlog.push_back(buf);
  365. return
  366. }
  367. let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
  368. let tx = http_tx.clone();
  369. let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
  370. let client = Arc::clone(&client);
  371. debug!(logger, "launching http thread");
  372. let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
  373. let logger = thread_logger;
  374. debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len());
  375. let start = Instant::now();
  376. for n_req in 0..N_HTTP_ATTEMPTS {
  377. let throttle = Duration::from_secs(2) * n_req * n_req;
  378. if n_req > 0 {
  379. warn!(logger, "InfluxWriter http thread: pausing before next request";
  380. "n_req" => n_req,
  381. "throttle" => %format_args!("{:?}", throttle),
  382. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  383. thread::sleep(throttle); // 0, 2, 8, 16, 32
  384. }
  385. let sent = Instant::now();
  386. let resp = client.post(url.clone())
  387. .body(buf.as_str())
  388. .send();
  389. let rcvd = Instant::now();
  390. let took = rcvd - sent;
  391. let mut n_tx = 0u32;
  392. match resp {
  393. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  394. debug!(logger, "server responded ok: 204 NoContent");
  395. buf.clear();
  396. let mut resp = Some(Ok(Resp { buf, took }));
  397. loop {
  398. n_tx += 1;
  399. match tx.try_send(resp.take().unwrap()) {
  400. Ok(_) => {
  401. if n_req > 0 {
  402. info!(logger, "successfully recovered from failed request with retry";
  403. "n_req" => n_req,
  404. "n_tx" => n_tx,
  405. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  406. }
  407. return
  408. }
  409. Err(chan::TrySendError::Full(r)) => {
  410. let throttle = Duration::from_millis(1000) * n_tx;
  411. warn!(logger, "channel full: InfluxWriter http thread failed to return buf";
  412. "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle));
  413. resp = Some(r);
  414. thread::sleep(throttle);
  415. }
  416. Err(chan::TrySendError::Disconnected(_)) => {
  417. warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return";
  418. "n_tx" => n_tx, "n_req" => n_req);
  419. return
  420. }
  421. }
  422. }
  423. }
  424. Ok(mut resp) => {
  425. let mut server_resp = String::new();
  426. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  427. error!(logger, "influx server error (request took {:?})", took;
  428. "status" => %resp.status,
  429. "body" => server_resp);
  430. }
  431. Err(e) => {
  432. error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e);
  433. }
  434. }
  435. }
  436. let took = Instant::now() - start;
  437. warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
  438. "took" => %format_args!("{:?}", took));
  439. let buflen = buf.len();
  440. let n_lines = buf.lines().count();
  441. if let Err(e) = tx.send(Err(Resp { buf, took })) {
  442. crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e;
  443. "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines);
  444. }
  445. });
  446. if let Err(e) = thread_res {
  447. crit!(logger, "failed to spawn thread: {}", e);
  448. }
  449. };
  450. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
  451. match prev {
  452. 0 if N_BUFFER_LINES > 0 => {
  453. serialize_owned(m, buf);
  454. Ok(1)
  455. }
  456. n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => {
  457. buf.push_str("\n");
  458. serialize_owned(m, buf);
  459. Ok(n + 1)
  460. }
  461. n => {
  462. buf.push_str("\n");
  463. serialize_owned(m, buf);
  464. Err(n + 1)
  465. }
  466. }
  467. };
  468. 'event: loop {
  469. loop_time = Instant::now();
  470. active = false;
  471. match rx.recv() {
  472. Ok(Some(mut meas)) => {
  473. n_rcvd += 1;
  474. active = true;
  475. if n_rcvd % HB_EVERY == 0 {
  476. let n_outstanding = n_out(&spares, &backlog, extras);
  477. info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep();
  478. "n_outstanding" => n_outstanding,
  479. "spares.len()" => spares.len(),
  480. "n_rcvd" => n_rcvd,
  481. "n_active_buf" => count,
  482. "db_health" => %format_args!("{:?}", db_health.mean),
  483. "backlog.len()" => backlog.len());
  484. }
  485. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  486. if meas.fields.is_empty() {
  487. meas.fields.push(("n", OwnedValue::Integer(1)));
  488. }
  489. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  490. count = match next(count, &meas, &mut buf, loop_time, last) {
  491. Ok(n) => n,
  492. Err(_n) => {
  493. let mut count = 0;
  494. let mut next: String = match spares.pop_front() {
  495. Some(x) => x,
  496. None => {
  497. let n_outstanding = n_out(&spares, &backlog, extras);
  498. crit!(logger, "no available buffers in `spares`, pulling from backlog";
  499. "n_outstanding" => n_outstanding,
  500. "spares.len()" => spares.len(),
  501. "n_rcvd" => n_rcvd,
  502. "backlog.len()" => backlog.len());
  503. match backlog.pop_front() {
  504. // Note: this does not clear the backlog buffer,
  505. // instead we will just write more and more until
  506. // we are out of memory. I expect that will never
  507. // happen.
  508. //
  509. Some(x) => {
  510. count = 1; // otherwise, no '\n' added in `next(..)` - we are
  511. // sending a "full" buffer to be extended
  512. x
  513. }
  514. None => {
  515. extras += 1;
  516. crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String";
  517. "n_outstanding" => n_outstanding,
  518. "spares.len()" => spares.len(),
  519. "backlog.len()" => backlog.len(),
  520. "n_rcvd" => n_rcvd,
  521. "extras" => extras);
  522. String::new()
  523. }
  524. }
  525. }
  526. };
  527. // after swap, buf in next, so want to send next
  528. //
  529. mem::swap(&mut buf, &mut next);
  530. let n_outstanding = n_out(&spares, &backlog, extras);
  531. send(next, &mut backlog, n_outstanding);
  532. last = loop_time;
  533. count
  534. }
  535. };
  536. }
  537. Ok(None) => {
  538. let start = Instant::now();
  539. let mut hb = Instant::now();
  540. warn!(logger, "terminate signal rcvd"; "count" => count);
  541. if buf.len() > 0 {
  542. info!(logger, "sending remaining buffer to influx on terminate"; "count" => count);
  543. let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1));
  544. let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last);
  545. let n_outstanding = n_out(&spares, &backlog, extras);
  546. let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
  547. mem::swap(&mut buf, &mut placeholder);
  548. send(placeholder, &mut backlog, n_outstanding);
  549. }
  550. let mut n_ok = 0;
  551. let mut n_err = 0;
  552. loop {
  553. loop_time = Instant::now();
  554. let n_outstanding = n_out(&spares, &backlog, extras);
  555. if backlog.is_empty() && n_outstanding < 1 {
  556. info!(logger, "cleared any remaining backlog";
  557. "n_outstanding" => n_outstanding,
  558. "spares.len()" => spares.len(),
  559. "backlog.len()" => backlog.len(),
  560. "n_cleared_ok" => n_ok,
  561. "n_cleared_err" => n_err,
  562. "n_rcvd" => n_rcvd,
  563. "extras" => extras,
  564. "elapsed" => %format_args!("{:?}", loop_time - start));
  565. break 'event
  566. }
  567. if loop_time - hb > Duration::from_secs(5) {
  568. info!(logger, "InfluxWriter still clearing backlog ..";
  569. "n_outstanding" => n_outstanding,
  570. "spares.len()" => spares.len(),
  571. "backlog.len()" => backlog.len(),
  572. "n_cleared_ok" => n_ok,
  573. "n_cleared_err" => n_err,
  574. "extras" => extras,
  575. "n_rcvd" => n_rcvd,
  576. "elapsed" => %format_args!("{:?}", loop_time - start));
  577. hb = loop_time;
  578. }
  579. if let Some(buf) = backlog.pop_front() {
  580. let n_outstanding = n_out(&spares, &backlog, extras);
  581. debug!(logger, "resending queued buffer from backlog";
  582. "backlog.len()" => backlog.len(),
  583. "spares.len()" => spares.len(),
  584. "n_rcvd" => n_rcvd,
  585. "n_outstanding" => n_outstanding);
  586. send(buf, &mut backlog, n_outstanding);
  587. last_clear = loop_time;
  588. }
  589. 'rx: loop {
  590. match http_rx.try_recv() {
  591. Ok(Ok(Resp { buf, .. })) => {
  592. n_ok += 1;
  593. spares.push_back(buf); // needed so `n_outstanding` count remains accurate
  594. }
  595. Ok(Err(Resp { buf, .. })) => {
  596. warn!(logger, "requeueing failed request"; "buf.len()" => buf.len());
  597. n_err += 1;
  598. backlog.push_front(buf);
  599. }
  600. Err(chan::TryRecvError::Disconnected) => {
  601. crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting";
  602. "n_outstanding" => n_outstanding,
  603. "backlog.len()" => backlog.len(),
  604. "n_cleared_ok" => n_ok,
  605. "n_cleared_err" => n_err,
  606. "extras" => extras,
  607. "n_rcvd" => n_rcvd,
  608. "elapsed" => %format_args!("{:?}", loop_time - start));
  609. break 'event
  610. }
  611. Err(_) => break 'rx
  612. }
  613. }
  614. thread::sleep(Duration::from_millis(1));
  615. }
  616. }
  617. _ => {}
  618. }
  619. db_health.refresh(loop_time);
  620. let n_outstanding = n_out(&spares, &backlog, extras);
  621. let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
  622. if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy {
  623. if let Some(queued) = backlog.pop_front() {
  624. let n_outstanding = n_out(&spares, &backlog, extras);
  625. send(queued, &mut backlog, n_outstanding);
  626. active = true;
  627. }
  628. }
  629. loop {
  630. match http_rx.try_recv() {
  631. Ok(Ok(Resp { buf, took })) => {
  632. db_health.add(loop_time, took);
  633. spares.push_back(buf);
  634. active = true;
  635. }
  636. Ok(Err(Resp { buf, took })) => {
  637. db_health.add(loop_time, took);
  638. backlog.push_front(buf);
  639. active = true;
  640. }
  641. Err(chan::TryRecvError::Disconnected) => {
  642. crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting";
  643. "n_outstanding" => n_outstanding,
  644. "backlog.len()" => backlog.len(),
  645. "n_rcvd" => n_rcvd,
  646. "extras" => extras);
  647. break 'event
  648. }
  649. Err(_) => break
  650. }
  651. }
  652. if !active {
  653. thread::sleep(Duration::new(0, 1))
  654. }
  655. }
  656. info!(logger, "waiting 1s before exiting thread");
  657. thread::sleep(Duration::from_secs(1));
  658. }).unwrap();
  659. InfluxWriter {
  660. host: host.to_string(),
  661. db: db.to_string(),
  662. tx,
  663. thread: Some(Arc::new(thread))
  664. }
  665. }
  666. }
  667. impl Drop for InfluxWriter {
  668. fn drop(&mut self) {
  669. if let Some(arc) = self.thread.take() {
  670. if let Ok(thread) = Arc::try_unwrap(arc) {
  671. let _ = self.tx.send(None);
  672. let _ = thread.join();
  673. }
  674. }
  675. }
  676. }
  677. #[cfg(feature = "zmq")]
  678. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  679. #[cfg(feature = "zmq")]
  680. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  681. let socket = ctx.socket(zmq::PULL)?;
  682. socket.bind(WRITER_ADDR)?;
  683. socket.set_rcvhwm(0)?;
  684. Ok(socket)
  685. }
  686. #[cfg(feature = "zmq")]
  687. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  688. let socket = ctx.socket(zmq::PUSH)?;
  689. socket.connect(WRITER_ADDR)?;
  690. socket.set_sndhwm(0)?;
  691. Ok(socket)
  692. }
  693. /// This removes offending things rather than escaping them.
  694. ///
  695. fn escape_tag(s: &str) -> String {
  696. s.replace(" ", "")
  697. .replace(",", "")
  698. .replace("\"", "")
  699. }
  700. fn escape(s: &str) -> String {
  701. s.replace(" ", "\\ ")
  702. .replace(",", "\\,")
  703. }
  704. fn as_string(s: &str) -> String {
  705. // the second replace removes double escapes
  706. //
  707. format!("\"{}\"", s.replace("\"", "\\\"")
  708. .replace(r#"\\""#, r#"\""#))
  709. }
  710. #[test]
  711. fn it_checks_as_string_does_not_double_escape() {
  712. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  713. let escaped = as_string(&raw);
  714. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  715. }
  716. fn as_boolean(b: &bool) -> &str {
  717. if *b { "t" } else { "f" }
  718. }
  719. pub fn now() -> i64 {
  720. nanos(Utc::now()) as i64
  721. }
  722. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  723. ///
  724. /// The serialized measurement is appended to the end of the string without
  725. /// any regard for what exited in it previously.
  726. ///
  727. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  728. line.push_str(&escape_tag(measurement.key));
  729. let add_tag = |line: &mut String, key: &str, value: &str| {
  730. line.push_str(",");
  731. line.push_str(&escape_tag(key));
  732. line.push_str("=");
  733. line.push_str(&escape(value));
  734. };
  735. for (key, value) in measurement.tags.iter() {
  736. #[cfg(not(feature = "string-tags"))]
  737. add_tag(line, key, value);
  738. #[cfg(feature = "string-tags")]
  739. add_tag(line, key, value.as_str());
  740. }
  741. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  742. if is_first { line.push_str(" "); } else { line.push_str(","); }
  743. line.push_str(&escape_tag(key));
  744. line.push_str("=");
  745. match *value {
  746. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  747. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  748. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  749. OwnedValue::D128(ref d) => {
  750. if d.is_finite() {
  751. line.push_str(&format!("{}", d));
  752. } else {
  753. line.push_str("0.0");
  754. }
  755. }
  756. OwnedValue::Float(ref f) => {
  757. if f.is_finite() {
  758. line.push_str(&format!("{}", f));
  759. } else {
  760. line.push_str("-999.0");
  761. }
  762. }
  763. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  764. };
  765. };
  766. let mut fields = measurement.fields.iter();
  767. // first time separate from tags with space
  768. //
  769. fields.next().map(|kv| {
  770. add_field(line, &kv.0, &kv.1, true);
  771. });
  772. // then seperate the rest w/ comma
  773. //
  774. for kv in fields {
  775. add_field(line, kv.0, &kv.1, false);
  776. }
  777. if let Some(t) = measurement.timestamp {
  778. line.push_str(" ");
  779. line.push_str(&t.to_string());
  780. }
  781. }
  782. #[cfg(feature = "warnings")]
  783. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  784. #[cfg(feature = "zmq")]
  785. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  786. assert!(false);
  787. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  788. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  789. let _ = fs::create_dir("/tmp/mm");
  790. let ctx = zmq::Context::new();
  791. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  792. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  793. let client = Client::new();
  794. let mut buf = String::with_capacity(4096);
  795. let mut server_resp = String::with_capacity(4096);
  796. let mut count = 0;
  797. loop {
  798. if let Ok(bytes) = socket.recv_bytes(0) {
  799. if let Ok(msg) = String::from_utf8(bytes) {
  800. count = match count {
  801. 0 => {
  802. buf.push_str(&msg);
  803. 1
  804. }
  805. n @ 1...40 => {
  806. buf.push_str("\n");
  807. buf.push_str(&msg);
  808. n + 1
  809. }
  810. _ => {
  811. buf.push_str("\n");
  812. buf.push_str(&msg);
  813. match client.post(url.clone())
  814. .body(&buf)
  815. .send() {
  816. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  817. Ok(mut resp) => {
  818. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  819. let _ = warnings.send(
  820. Warning::Error(
  821. format!("Influx server: {}", server_resp)));
  822. server_resp.clear();
  823. }
  824. Err(why) => {
  825. let _ = warnings.send(
  826. Warning::Error(
  827. format!("Influx write error: {}", why)));
  828. }
  829. }
  830. buf.clear();
  831. 0
  832. }
  833. }
  834. }
  835. }
  836. }
  837. }).unwrap()
  838. }
  839. #[derive(Debug, Clone, PartialEq)]
  840. pub enum OwnedValue {
  841. String(String),
  842. Float(f64),
  843. Integer(i64),
  844. Boolean(bool),
  845. D128(d128),
  846. Uuid(Uuid),
  847. }
  848. /// Holds data meant for an influxdb measurement in transit to the
  849. /// writing thread.
  850. ///
  851. /// TODO: convert `Map` to `SmallVec`?
  852. ///
  853. #[derive(Clone, Debug)]
  854. pub struct OwnedMeasurement {
  855. pub key: &'static str,
  856. pub timestamp: Option<i64>,
  857. //pub fields: Map<&'static str, OwnedValue>,
  858. //pub tags: Map<&'static str, &'static str>,
  859. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  860. #[cfg(not(feature = "string-tags"))]
  861. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  862. #[cfg(feature = "string-tags")]
  863. pub tags: SmallVec<[(&'static str, String); 8]>,
  864. }
  865. impl OwnedMeasurement {
  866. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  867. OwnedMeasurement {
  868. key,
  869. timestamp: None,
  870. tags: SmallVec::with_capacity(n_tags),
  871. fields: SmallVec::with_capacity(n_fields),
  872. }
  873. }
  874. pub fn new(key: &'static str) -> Self {
  875. OwnedMeasurement {
  876. key,
  877. timestamp: None,
  878. tags: SmallVec::new(),
  879. fields: SmallVec::new(),
  880. }
  881. }
  882. /// Unusual consuming `self` signature because primarily used by
  883. /// the `measure!` macro.
  884. #[cfg(not(feature = "string-tags"))]
  885. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  886. self.tags.push((key, value));
  887. self
  888. }
  889. #[cfg(feature = "string-tags")]
  890. pub fn add_tag<S: ToString>(mut self, key: &'static str, value: S) -> Self {
  891. self.tags.push((key, value.to_string()));
  892. self
  893. }
  894. /// Unusual consuming `self` signature because primarily used by
  895. /// the `measure!` macro.
  896. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  897. self.fields.push((key, value));
  898. self
  899. }
  900. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  901. self.timestamp = Some(timestamp);
  902. self
  903. }
  904. #[cfg(not(feature = "string-tags"))]
  905. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  906. match self.tags.iter().position(|kv| kv.0 == key) {
  907. Some(i) => {
  908. self.tags.get_mut(i)
  909. .map(|x| {
  910. x.0 = value;
  911. });
  912. self
  913. }
  914. None => {
  915. self.add_tag(key, value)
  916. }
  917. }
  918. }
  919. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  920. self.fields.iter()
  921. .find(|kv| kv.0 == key)
  922. .map(|kv| &kv.1)
  923. }
  924. #[cfg(not(feature = "string-tags"))]
  925. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  926. self.tags.iter()
  927. .find(|kv| kv.0 == key)
  928. .map(|kv| kv.1)
  929. }
  930. }
  931. #[allow(unused_imports, unused_variables)]
  932. #[cfg(test)]
  933. mod tests {
  934. use super::*;
  935. use test::{black_box, Bencher};
  936. #[ignore]
  937. #[bench]
  938. fn measure_ten(b: &mut Bencher) {
  939. let influx = InfluxWriter::new("localhost", "test", "log/influx.log", 8192);
  940. let mut n = 0;
  941. b.iter(|| {
  942. for _ in 0..10 {
  943. let time = influx.nanos(Utc::now());
  944. n += 1;
  945. measure!(influx, million, i(n), tm(time));
  946. }
  947. });
  948. }
  949. #[test]
  950. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  951. const VERSION: &str = "0.3.90";
  952. let tag_value = "one";
  953. let color = "red";
  954. let time = Utc::now();
  955. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  956. assert_eq!(m.get_tag("color"), Some("red"));
  957. assert_eq!(m.get_tag("version"), Some(VERSION));
  958. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  959. }
  960. #[test]
  961. fn it_uses_the_v_for_version_shortcut() {
  962. const VERSION: &str = "0.3.90";
  963. let tag_value = "one";
  964. let color = "red";
  965. let time = now();
  966. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  967. assert_eq!(m.get_tag("color"), Some("red"));
  968. assert_eq!(m.get_tag("version"), Some(VERSION));
  969. assert_eq!(m.timestamp, Some(time));
  970. }
  971. #[test]
  972. fn it_uses_the_new_tag_k_only_shortcut() {
  973. let tag_value = "one";
  974. let color = "red";
  975. let time = now();
  976. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  977. assert_eq!(m.get_tag("color"), Some("red"));
  978. assert_eq!(m.get_tag("tag_value"), Some("one"));
  979. assert_eq!(m.timestamp, Some(time));
  980. }
  981. #[test]
  982. fn it_uses_measure_macro_parenthesis_syntax() {
  983. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  984. assert_eq!(m.key, "test");
  985. assert_eq!(m.get_tag("a"), Some("b"));
  986. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  987. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  988. assert_eq!(m.timestamp, Some(1));
  989. }
  990. #[test]
  991. fn it_uses_measure_macro_on_a_self_attribute() {
  992. struct A {
  993. pub influx: InfluxWriter,
  994. }
  995. impl A {
  996. fn f(&self) {
  997. measure!(self.influx, test, t(color, "red"), i(n, 1));
  998. }
  999. }
  1000. let a = A { influx: InfluxWriter::default() };
  1001. a.f();
  1002. }
  1003. #[test]
  1004. fn it_clones_an_influx_writer_to_check_both_drop() {
  1005. let influx = InfluxWriter::default();
  1006. measure!(influx, drop_test, i(a, 1), i(b, 2));
  1007. {
  1008. let influx = influx.clone();
  1009. thread::spawn(move || {
  1010. measure!(influx, drop_test, i(a, 3), i(b, 4));
  1011. });
  1012. }
  1013. }
  1014. #[bench]
  1015. fn influx_writer_send_basic(b: &mut Bencher) {
  1016. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  1017. b.iter(|| {
  1018. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  1019. });
  1020. }
  1021. #[bench]
  1022. fn influx_writer_send_price(b: &mut Bencher) {
  1023. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  1024. b.iter(|| {
  1025. measure!(m, test,
  1026. t(ticker, t!(xmr-btc).as_str()),
  1027. t(exchange, "plnx"),
  1028. d(bid, d128::zero()),
  1029. d(ask, d128::zero()),
  1030. );
  1031. });
  1032. }
  1033. #[test]
  1034. fn it_checks_color_tag_error_in_non_doctest() {
  1035. let (tx, rx) = bounded(1024);
  1036. measure!(tx, test, tag[color;"red"], int[n;1]);
  1037. let meas: OwnedMeasurement = rx.recv().unwrap();
  1038. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  1039. }
  1040. #[test]
  1041. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  1042. let meas = measure!(@make_meas test_measurement,
  1043. tag [ one => "a" ],
  1044. tag [ two => "b" ],
  1045. int [ three => 2 ],
  1046. float [ four => 1.2345 ],
  1047. string [ five => String::from("d") ],
  1048. bool [ six => true ],
  1049. int [ seven => { 1 + 2 } ],
  1050. time [ 1 ]
  1051. );
  1052. assert_eq!(meas.key, "test_measurement");
  1053. assert_eq!(meas.get_tag("one"), Some("a"));
  1054. assert_eq!(meas.get_tag("two"), Some("b"));
  1055. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1056. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1057. assert_eq!(meas.timestamp, Some(1));
  1058. }
  1059. #[test]
  1060. fn it_uses_the_measure_macro() {
  1061. let (tx, rx) = bounded(1024);
  1062. measure!(tx, test_measurement,
  1063. tag [ one => "a" ],
  1064. tag [ two => "b" ],
  1065. int [ three => 2 ],
  1066. float [ four => 1.2345 ],
  1067. string [ five => String::from("d") ],
  1068. bool [ six => true ],
  1069. int [ seven => { 1 + 2 } ],
  1070. time [ 1 ]
  1071. );
  1072. thread::sleep(Duration::from_millis(10));
  1073. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1074. assert_eq!(meas.key, "test_measurement");
  1075. assert_eq!(meas.get_tag("one"), Some("a"));
  1076. assert_eq!(meas.get_tag("two"), Some("b"));
  1077. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1078. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1079. assert_eq!(meas.timestamp, Some(1));
  1080. }
  1081. #[test]
  1082. fn it_uses_measure_macro_for_d128_and_uuid() {
  1083. let (tx, rx) = bounded(1024);
  1084. let u = Uuid::new_v4();
  1085. let d = d128::zero();
  1086. let t = now();
  1087. measure!(tx, test_measurement,
  1088. tag[one; "a"],
  1089. d128[two; d],
  1090. uuid[three; u],
  1091. time[t]
  1092. );
  1093. thread::sleep(Duration::from_millis(10));
  1094. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1095. assert_eq!(meas.key, "test_measurement");
  1096. assert_eq!(meas.get_tag("one"), Some("a"));
  1097. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  1098. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  1099. assert_eq!(meas.timestamp, Some(t));
  1100. }
  1101. #[test]
  1102. fn it_uses_the_measure_macro_alt_syntax() {
  1103. let (tx, rx) = bounded(1024);
  1104. measure!(tx, test_measurement,
  1105. tag[one; "a"],
  1106. tag[two; "b"],
  1107. int[three; 2],
  1108. float[four; 1.2345],
  1109. string[five; String::from("d")],
  1110. bool [ six => true ],
  1111. int[seven; { 1 + 2 }],
  1112. time[1]
  1113. );
  1114. thread::sleep(Duration::from_millis(10));
  1115. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1116. assert_eq!(meas.key, "test_measurement");
  1117. assert_eq!(meas.get_tag("one"), Some("a"));
  1118. assert_eq!(meas.get_tag("two"), Some("b"));
  1119. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1120. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1121. assert_eq!(meas.timestamp, Some(1));
  1122. }
  1123. #[test]
  1124. fn it_checks_that_fields_are_separated_correctly() {
  1125. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  1126. assert_eq!(m.key, "test");
  1127. assert_eq!(m.get_tag("a"), Some("one"));
  1128. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1129. let mut buf = String::new();
  1130. serialize_owned(&m, &mut buf);
  1131. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  1132. }
  1133. #[test]
  1134. fn try_to_break_measure_macro() {
  1135. let (tx, _) = bounded(1024);
  1136. measure!(tx, one, tag[x=>"y"], int[n;1]);
  1137. measure!(tx, one, tag[x;"y"], int[n;1],);
  1138. struct A {
  1139. pub one: i32,
  1140. pub two: i32,
  1141. }
  1142. struct B {
  1143. pub a: A
  1144. }
  1145. let b = B { a: A { one: 1, two: 2 } };
  1146. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  1147. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  1148. }
  1149. #[bench]
  1150. fn measure_macro_small(b: &mut Bencher) {
  1151. let (tx, rx) = bounded(1024);
  1152. let listener = thread::spawn(move || {
  1153. loop { if rx.recv().is_err() { break } }
  1154. });
  1155. b.iter(|| {
  1156. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  1157. });
  1158. }
  1159. #[bench]
  1160. fn measure_macro_medium(b: &mut Bencher) {
  1161. let (tx, rx) = bounded(1024);
  1162. let listener = thread::spawn(move || {
  1163. loop { if rx.recv().is_err() { break } }
  1164. });
  1165. b.iter(|| {
  1166. measure!(tx, test, t(color, "red"), t(mood, "playful"),
  1167. t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322),
  1168. i(n, 1), tm(now()));
  1169. });
  1170. }
  1171. #[bench]
  1172. fn serialize_owned_longer(b: &mut Bencher) {
  1173. let mut buf = String::with_capacity(1024);
  1174. let m =
  1175. OwnedMeasurement::new("test")
  1176. .add_tag("one", "a")
  1177. .add_tag("two", "b")
  1178. .add_tag("ticker", "xmr_btc")
  1179. .add_tag("exchange", "plnx")
  1180. .add_tag("side", "bid")
  1181. .add_field("three", OwnedValue::Float(1.2345))
  1182. .add_field("four", OwnedValue::Integer(57))
  1183. .add_field("five", OwnedValue::Boolean(true))
  1184. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  1185. .set_timestamp(now());
  1186. b.iter(|| {
  1187. serialize_owned(&m, &mut buf);
  1188. buf.clear()
  1189. });
  1190. }
  1191. #[bench]
  1192. fn serialize_owned_simple(b: &mut Bencher) {
  1193. let mut buf = String::with_capacity(1024);
  1194. let m =
  1195. OwnedMeasurement::new("test")
  1196. .add_tag("one", "a")
  1197. .add_tag("two", "b")
  1198. .add_field("three", OwnedValue::Float(1.2345))
  1199. .add_field("four", OwnedValue::Integer(57))
  1200. .set_timestamp(now());
  1201. b.iter(|| {
  1202. serialize_owned(&m, &mut buf);
  1203. buf.clear()
  1204. });
  1205. }
  1206. #[bench]
  1207. fn clone_url_for_thread(b: &mut Bencher) {
  1208. let host = "ahmes";
  1209. let db = "mlp";
  1210. let url =
  1211. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1212. &[("db", db), ("precision", "ns")]).unwrap();
  1213. b.iter(|| {
  1214. url.clone()
  1215. })
  1216. }
  1217. #[bench]
  1218. fn clone_arc_url_for_thread(b: &mut Bencher) {
  1219. let host = "ahmes";
  1220. let db = "mlp";
  1221. let url =
  1222. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1223. &[("db", db), ("precision", "ns")]).unwrap();
  1224. let url = Arc::new(url);
  1225. b.iter(|| {
  1226. Arc::clone(&url)
  1227. })
  1228. }
  1229. #[test]
  1230. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  1231. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  1232. let mut buf = String::new();
  1233. let mut server_resp = String::new();
  1234. let m = OwnedMeasurement::new("rust_test")
  1235. .add_field("s", OwnedValue::String(raw.to_string()))
  1236. .set_timestamp(now());
  1237. serialize_owned(&m, &mut buf);
  1238. println!("{}", buf);
  1239. buf.push_str("\n");
  1240. let buf_copy = buf.clone();
  1241. buf.push_str(&buf_copy);
  1242. println!("{}", buf);
  1243. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1244. let client = Client::new();
  1245. match client.post(url.clone())
  1246. .body(&buf)
  1247. .send() {
  1248. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1249. Ok(mut resp) => {
  1250. resp.read_to_string(&mut server_resp).unwrap();
  1251. panic!("{}", server_resp);
  1252. }
  1253. Err(why) => {
  1254. panic!(why)
  1255. }
  1256. }
  1257. }
  1258. }