You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1571 lines
59KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. //use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use crossbeam_channel::{Sender, Receiver, bounded, SendError};
  7. use std::{thread, mem};
  8. use std::time::*;
  9. use std::hash::BuildHasherDefault;
  10. use std::collections::VecDeque;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. #[cfg(feature = "zmq")]
  17. use zmq;
  18. #[allow(unused_imports)]
  19. use chrono::{DateTime, Utc};
  20. use ordermap::OrderMap;
  21. use fnv::FnvHasher;
  22. use decimal::d128;
  23. use uuid::Uuid;
  24. use smallvec::SmallVec;
  25. use slog::Logger;
  26. use pretty_toa::ThousandsSep;
  27. use super::{nanos, file_logger, LOG_LEVEL};
  28. #[cfg(feature = "warnings")]
  29. use warnings::Warning;
  30. pub use super::{dur_nanos, dt_nanos};
  31. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  32. pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096;
  33. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  34. Map::with_capacity_and_hasher(capacity, Default::default())
  35. }
  36. /// Created this so I know what types can be passed through the
  37. /// `measure!` macro, which used to convert with `as i64` and
  38. /// `as f64` until I accidentally passed a function name, and it
  39. /// still compiled, but with garbage numbers.
  40. pub trait AsI64 {
  41. fn as_i64(x: Self) -> i64;
  42. }
  43. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  44. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  45. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  46. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  47. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  48. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  49. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  50. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  51. impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  52. impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  53. impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  54. /// Created this so I know what types can be passed through the
  55. /// `measure!` macro, which used to convert with `as i64` and
  56. /// `as f64` until I accidentally passed a function name, and it
  57. /// still compiled, but with garbage numbers.
  58. pub trait AsF64 {
  59. fn as_f64(x: Self) -> f64;
  60. }
  61. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  62. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  63. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  64. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  65. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  66. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  67. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  68. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  69. ///
  70. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  71. /// values, as well as sends it with the `Sender`.
  72. ///
  73. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  74. /// measurement (see `tests` mod).
  75. ///
  76. /// # Examples
  77. ///
  78. /// ```
  79. /// #![feature(try_from)]
  80. /// #[macro_use] extern crate logging;
  81. /// extern crate decimal;
  82. ///
  83. /// use std::sync::mpsc::channel;
  84. /// use decimal::d128;
  85. /// use logging::influx::*;
  86. ///
  87. /// fn main() {
  88. /// let (tx, rx) = bounded(1024);
  89. ///
  90. /// // "shorthand" syntax
  91. ///
  92. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  93. ///
  94. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  95. ///
  96. /// assert_eq!(meas.key, "test");
  97. /// assert_eq!(meas.get_tag("color"), Some("red"));
  98. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  99. ///
  100. /// // alternate syntax ...
  101. ///
  102. /// measure!(tx, test,
  103. /// tag [ one => "a" ],
  104. /// tag [ two => "b" ],
  105. /// int [ three => 2 ],
  106. /// float [ four => 1.2345 ],
  107. /// string [ five => String::from("d") ],
  108. /// bool [ six => true ],
  109. /// int [ seven => { 1 + 2 } ],
  110. /// time [ 1 ]
  111. /// );
  112. ///
  113. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  114. ///
  115. /// assert_eq!(meas.key, "test");
  116. /// assert_eq!(meas.get_tag("one"), Some("a"));
  117. /// assert_eq!(meas.get_tag("two"), Some("b"));
  118. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  119. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  120. /// assert_eq!(meas.timestamp, Some(1));
  121. ///
  122. /// // use the @make_meas flag to skip sending a measurement, instead merely
  123. /// // creating it.
  124. ///
  125. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  126. ///
  127. /// // each variant also has shorthand aliases
  128. ///
  129. /// let meas: OwnedMeasurement =
  130. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  131. /// }
  132. /// ```
  133. ///
  134. #[macro_export]
  135. macro_rules! measure {
  136. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  137. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  138. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  139. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  140. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  141. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  142. (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) };
  143. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  144. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  145. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  146. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  147. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  148. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  149. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  150. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  151. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  152. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  153. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  154. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  155. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  156. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  157. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  158. (@as_expr $e:expr) => {$e};
  159. (@count_tags) => {0usize};
  160. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  161. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  162. (@count_fields) => {0usize};
  163. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  164. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  165. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  166. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  167. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  168. };
  169. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  170. let n_tags = measure!(@count_tags $($t)*);
  171. let n_fields = measure!(@count_fields $($t)*);
  172. let mut meas =
  173. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  174. $(
  175. measure!(@kv $t, meas, $($tail)*);
  176. )*
  177. meas
  178. }};
  179. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  180. measure!($m, $name, $($t [ $($tail)* ] ),+)
  181. };
  182. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  183. #[allow(unused_imports)]
  184. use $crate::influx::{AsI64, AsF64};
  185. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  186. let _ = $m.send(measurement);
  187. }};
  188. }
  189. #[derive(Clone, Debug)]
  190. pub struct Point<T, V> {
  191. pub time: T,
  192. pub value: V
  193. }
  194. pub struct DurationWindow {
  195. pub size: Duration,
  196. pub mean: Duration,
  197. pub sum: Duration,
  198. pub count: u32,
  199. pub items: VecDeque<Point<Instant, Duration>>
  200. }
  201. impl DurationWindow {
  202. #[inline]
  203. pub fn update(&mut self, time: Instant, value: Duration) {
  204. self.add(time, value);
  205. self.refresh(time);
  206. }
  207. #[inline]
  208. pub fn refresh(&mut self, t: Instant) -> &Self {
  209. if !self.items.is_empty() {
  210. let (n_remove, sum, count) =
  211. self.items.iter()
  212. .take_while(|x| t - x.time > self.size)
  213. .fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
  214. (n_remove + 1, sum - x.value, count - 1)
  215. });
  216. self.sum = sum;
  217. self.count = count;
  218. for _ in 0..n_remove {
  219. self.items.pop_front();
  220. }
  221. }
  222. if self.count > 0 {
  223. self.mean = self.sum / self.count.into();
  224. }
  225. self
  226. }
  227. #[inline]
  228. pub fn add(&mut self, time: Instant, value: Duration) {
  229. let p = Point { time, value };
  230. self.sum += p.value;
  231. self.count += 1;
  232. self.items.push_back(p);
  233. }
  234. }
  235. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  236. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  237. /// measurements have accumulated.
  238. ///
  239. #[derive(Debug)]
  240. pub struct InfluxWriter {
  241. host: String,
  242. db: String,
  243. tx: Sender<Option<OwnedMeasurement>>,
  244. thread: Option<Arc<thread::JoinHandle<()>>>,
  245. }
  246. impl Default for InfluxWriter {
  247. fn default() -> Self {
  248. //if cfg!(any(test, feature = "test")) {
  249. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  250. //} else {
  251. InfluxWriter::new("localhost", "test", "/tmp/influx-test.log", 4096)
  252. //}
  253. }
  254. }
  255. impl Clone for InfluxWriter {
  256. fn clone(&self) -> Self {
  257. debug_assert!(self.thread.is_some());
  258. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  259. InfluxWriter {
  260. host: self.host.to_string(),
  261. db: self.db.to_string(),
  262. tx: self.tx.clone(),
  263. thread,
  264. }
  265. }
  266. }
  267. impl InfluxWriter {
  268. pub fn host(&self) -> &str { self.host.as_str() }
  269. pub fn db(&self) -> &str { self.db.as_str() }
  270. /// Sends the `OwnedMeasurement` to the serialization thread.
  271. ///
  272. #[cfg_attr(feature = "inlines", inline)]
  273. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  274. self.tx.send(Some(m))
  275. }
  276. #[cfg_attr(feature = "inlines", inline)]
  277. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  278. #[cfg_attr(feature = "inlines", inline)]
  279. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  280. #[cfg_attr(feature = "inlines", inline)]
  281. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  282. #[cfg_attr(feature = "inlines", inline)]
  283. pub fn rsecs(&self, d: Duration) -> f64 {
  284. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  285. * 1000.0)
  286. .round()
  287. / 1000.0
  288. }
  289. #[cfg_attr(feature = "inlines", inline)]
  290. pub fn secs(&self, d: Duration) -> f64 {
  291. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  292. }
  293. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  294. self.tx.clone()
  295. }
  296. pub fn is_full(&self) -> bool { self.tx.is_full() }
  297. pub fn placeholder() -> Self {
  298. let (tx, _) = bounded(1024);
  299. Self {
  300. host: String::new(),
  301. db: String::new(),
  302. tx,
  303. thread: None,
  304. }
  305. }
  306. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  307. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  308. Self::with_logger(host, db, buffer_size, logger)
  309. }
  310. #[allow(unused_assignments)]
  311. pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self {
  312. let logger = logger.new(o!(
  313. "host" => host.to_string(),
  314. "db" => db.to_string()));
  315. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(4096);
  316. let url =
  317. Url::parse_with_params(&format!("http://{}:8086/write", host),
  318. &[("db", db), ("precision", "ns")])
  319. .expect("influx writer url should parse");
  320. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  321. use std::collections::VecDeque;
  322. use std::time::*;
  323. use crossbeam_channel as chan;
  324. #[cfg(feature = "no-influx-buffer")]
  325. const N_BUFFER_LINES: usize = 0;
  326. const N_BUFFER_LINES: usize = 8192;
  327. const MAX_PENDING: Duration = Duration::from_secs(3);
  328. const INITIAL_BUFFER_CAPACITY: usize = 32 * 32 * 32;
  329. const MAX_BACKLOG: usize = 512;
  330. const MAX_OUTSTANDING_HTTP: usize = 64;
  331. const HB_EVERY: usize = 100_000;
  332. const N_HTTP_ATTEMPTS: u32 = 15;
  333. let client = Arc::new(Client::new());
  334. info!(logger, "initializing InfluxWriter ...";
  335. "N_BUFFER_LINES" => N_BUFFER_LINES,
  336. "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING),
  337. "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP,
  338. "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
  339. "MAX_BACKLOG" => MAX_BACKLOG);
  340. // pre-allocated buffers ready for use if the active one is stasheed
  341. // during an outage
  342. let mut spares: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG);
  343. // queue failed sends here until problem resolved, then send again. in worst
  344. // case scenario, loop back around on buffers queued in `backlog`, writing
  345. // over the oldest first.
  346. //
  347. let mut backlog: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG);
  348. for _ in 0..MAX_BACKLOG {
  349. spares.push_back(String::with_capacity(32 * 32 * 32));
  350. }
  351. struct Resp {
  352. pub buf: String,
  353. pub took: Duration,
  354. }
  355. let mut db_health = DurationWindow {
  356. size: Duration::from_secs(120),
  357. mean: Duration::new(10, 0),
  358. sum: Duration::new(0, 0),
  359. count: 0,
  360. items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP),
  361. };
  362. let (http_tx, http_rx) = chan::bounded(32);
  363. let mut buf = spares.pop_front().unwrap();
  364. let mut count = 0;
  365. let mut extras = 0; // any new Strings we intro to the system
  366. let mut n_rcvd = 0;
  367. let mut last = Instant::now();
  368. let mut active: bool;
  369. let mut last_clear = Instant::now();
  370. let mut loop_time = Instant::now();
  371. let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
  372. MAX_BACKLOG + extras - s.len() - b.len() - 1
  373. };
  374. assert_eq!(n_out(&spares, &backlog, extras), 0);
  375. let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize| {
  376. if n_outstanding >= MAX_OUTSTANDING_HTTP {
  377. backlog.push_back(buf);
  378. return
  379. }
  380. let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
  381. let tx = http_tx.clone();
  382. let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
  383. let client = Arc::clone(&client);
  384. debug!(logger, "launching http thread");
  385. let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
  386. let logger = thread_logger;
  387. debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len());
  388. let start = Instant::now();
  389. 'a: for n_req in 0..N_HTTP_ATTEMPTS {
  390. let throttle = Duration::from_secs(2) * n_req * n_req;
  391. if n_req > 0 {
  392. warn!(logger, "InfluxWriter http thread: pausing before next request";
  393. "n_req" => n_req,
  394. "throttle" => %format_args!("{:?}", throttle),
  395. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  396. thread::sleep(throttle); // 0, 2, 8, 16, 32
  397. }
  398. let sent = Instant::now();
  399. let resp = client.post(url.clone())
  400. .body(buf.as_str())
  401. .send();
  402. let rcvd = Instant::now();
  403. let took = rcvd - sent;
  404. let mut n_tx = 0u32;
  405. match resp {
  406. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  407. debug!(logger, "server responded ok: 204 NoContent");
  408. buf.clear();
  409. let mut resp = Some(Ok(Resp { buf, took }));
  410. 'b: loop {
  411. n_tx += 1;
  412. match tx.try_send(resp.take().unwrap()) {
  413. Ok(_) => {
  414. if n_req > 0 {
  415. info!(logger, "successfully recovered from failed request with retry";
  416. "n_req" => n_req,
  417. "n_tx" => n_tx,
  418. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  419. }
  420. return
  421. }
  422. Err(chan::TrySendError::Full(r)) => {
  423. let throttle = Duration::from_millis(1000) * n_tx;
  424. warn!(logger, "channel full: InfluxWriter http thread failed to return buf";
  425. "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle));
  426. resp = Some(r);
  427. thread::sleep(throttle);
  428. }
  429. Err(chan::TrySendError::Disconnected(_)) => {
  430. warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return";
  431. "n_tx" => n_tx, "n_req" => n_req);
  432. return
  433. }
  434. }
  435. }
  436. }
  437. Ok(mut resp) => {
  438. let mut server_resp = String::new();
  439. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  440. error!(logger, "influx server error (request took {:?})", took;
  441. "status" => %resp.status,
  442. "body" => server_resp);
  443. }
  444. Err(e) => {
  445. error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e);
  446. }
  447. }
  448. }
  449. let took = Instant::now() - start;
  450. warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
  451. "took" => %format_args!("{:?}", took));
  452. let buflen = buf.len();
  453. let n_lines = buf.lines().count();
  454. if let Err(e) = tx.send(Err(Resp { buf, took })) {
  455. crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e;
  456. "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines);
  457. }
  458. });
  459. if let Err(e) = thread_res {
  460. crit!(logger, "failed to spawn thread: {}", e);
  461. }
  462. };
  463. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
  464. match prev {
  465. 0 if N_BUFFER_LINES > 0 => {
  466. serialize_owned(m, buf);
  467. Ok(1)
  468. }
  469. n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => {
  470. buf.push_str("\n");
  471. serialize_owned(m, buf);
  472. Ok(n + 1)
  473. }
  474. n => {
  475. buf.push_str("\n");
  476. serialize_owned(m, buf);
  477. Err(n + 1)
  478. }
  479. }
  480. };
  481. 'event: loop {
  482. loop_time = Instant::now();
  483. active = false;
  484. match rx.recv() {
  485. Ok(Some(mut meas)) => {
  486. n_rcvd += 1;
  487. active = true;
  488. if n_rcvd % HB_EVERY == 0 {
  489. let n_outstanding = n_out(&spares, &backlog, extras);
  490. info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep();
  491. "n_outstanding" => n_outstanding,
  492. "spares.len()" => spares.len(),
  493. "n_rcvd" => n_rcvd,
  494. "n_active_buf" => count,
  495. "db_health" => %format_args!("{:?}", db_health.mean),
  496. "backlog.len()" => backlog.len());
  497. }
  498. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  499. if meas.fields.is_empty() {
  500. meas.fields.push(("n", OwnedValue::Integer(1)));
  501. }
  502. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  503. count = match next(count, &meas, &mut buf, loop_time, last) {
  504. Ok(n) => n,
  505. Err(_n) => {
  506. let mut count = 0;
  507. let mut next: String = match spares.pop_front() {
  508. Some(x) => x,
  509. None => {
  510. let n_outstanding = n_out(&spares, &backlog, extras);
  511. crit!(logger, "no available buffers in `spares`, pulling from backlog";
  512. "n_outstanding" => n_outstanding,
  513. "spares.len()" => spares.len(),
  514. "n_rcvd" => n_rcvd,
  515. "backlog.len()" => backlog.len());
  516. match backlog.pop_front() {
  517. // Note: this does not clear the backlog buffer,
  518. // instead we will just write more and more until
  519. // we are out of memory. I expect that will never
  520. // happen.
  521. //
  522. Some(x) => {
  523. count = 1; // otherwise, no '\n' added in `next(..)` - we are
  524. // sending a "full" buffer to be extended
  525. x
  526. }
  527. None => {
  528. extras += 1;
  529. crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String";
  530. "n_outstanding" => n_outstanding,
  531. "spares.len()" => spares.len(),
  532. "backlog.len()" => backlog.len(),
  533. "n_rcvd" => n_rcvd,
  534. "extras" => extras);
  535. String::new()
  536. }
  537. }
  538. }
  539. };
  540. // after swap, buf in next, so want to send next
  541. //
  542. mem::swap(&mut buf, &mut next);
  543. let n_outstanding = n_out(&spares, &backlog, extras);
  544. send(next, &mut backlog, n_outstanding);
  545. last = loop_time;
  546. count
  547. }
  548. };
  549. }
  550. Ok(None) => {
  551. let start = Instant::now();
  552. let mut hb = Instant::now();
  553. warn!(logger, "terminate signal rcvd"; "count" => count);
  554. if buf.len() > 0 {
  555. info!(logger, "sending remaining buffer to influx on terminate"; "count" => count);
  556. let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1));
  557. let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last);
  558. let n_outstanding = n_out(&spares, &backlog, extras);
  559. let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
  560. mem::swap(&mut buf, &mut placeholder);
  561. send(placeholder, &mut backlog, n_outstanding);
  562. }
  563. let mut n_ok = 0;
  564. let mut n_err = 0;
  565. loop {
  566. loop_time = Instant::now();
  567. let n_outstanding = n_out(&spares, &backlog, extras);
  568. if backlog.is_empty() && n_outstanding < 1 {
  569. info!(logger, "cleared any remaining backlog";
  570. "n_outstanding" => n_outstanding,
  571. "spares.len()" => spares.len(),
  572. "backlog.len()" => backlog.len(),
  573. "n_cleared_ok" => n_ok,
  574. "n_cleared_err" => n_err,
  575. "n_rcvd" => n_rcvd,
  576. "extras" => extras,
  577. "elapsed" => %format_args!("{:?}", loop_time - start));
  578. break 'event
  579. }
  580. if loop_time - hb > Duration::from_secs(5) {
  581. info!(logger, "InfluxWriter still clearing backlog ..";
  582. "n_outstanding" => n_outstanding,
  583. "spares.len()" => spares.len(),
  584. "backlog.len()" => backlog.len(),
  585. "n_cleared_ok" => n_ok,
  586. "n_cleared_err" => n_err,
  587. "extras" => extras,
  588. "n_rcvd" => n_rcvd,
  589. "elapsed" => %format_args!("{:?}", loop_time - start));
  590. hb = loop_time;
  591. }
  592. if let Some(buf) = backlog.pop_front() {
  593. let n_outstanding = n_out(&spares, &backlog, extras);
  594. debug!(logger, "resending queued buffer from backlog";
  595. "backlog.len()" => backlog.len(),
  596. "spares.len()" => spares.len(),
  597. "n_rcvd" => n_rcvd,
  598. "n_outstanding" => n_outstanding);
  599. send(buf, &mut backlog, n_outstanding);
  600. last_clear = loop_time;
  601. }
  602. 'rx: loop {
  603. match http_rx.try_recv() {
  604. Ok(Ok(Resp { buf, .. })) => {
  605. n_ok += 1;
  606. spares.push_back(buf); // needed so `n_outstanding` count remains accurate
  607. }
  608. Ok(Err(Resp { buf, .. })) => {
  609. warn!(logger, "requeueing failed request"; "buf.len()" => buf.len());
  610. n_err += 1;
  611. backlog.push_front(buf);
  612. }
  613. Err(chan::TryRecvError::Disconnected) => {
  614. crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting";
  615. "n_outstanding" => n_outstanding,
  616. "backlog.len()" => backlog.len(),
  617. "n_cleared_ok" => n_ok,
  618. "n_cleared_err" => n_err,
  619. "extras" => extras,
  620. "n_rcvd" => n_rcvd,
  621. "elapsed" => %format_args!("{:?}", loop_time - start));
  622. break 'event
  623. }
  624. Err(_) => break 'rx
  625. }
  626. }
  627. thread::sleep(Duration::from_millis(1));
  628. }
  629. }
  630. _ => {}
  631. }
  632. db_health.refresh(loop_time);
  633. let n_outstanding = n_out(&spares, &backlog, extras);
  634. let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
  635. if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy {
  636. if let Some(queued) = backlog.pop_front() {
  637. let n_outstanding = n_out(&spares, &backlog, extras);
  638. send(queued, &mut backlog, n_outstanding);
  639. active = true;
  640. }
  641. }
  642. loop {
  643. match http_rx.try_recv() {
  644. Ok(Ok(Resp { buf, took })) => {
  645. db_health.add(loop_time, took);
  646. spares.push_back(buf);
  647. active = true;
  648. }
  649. Ok(Err(Resp { buf, took })) => {
  650. db_health.add(loop_time, took);
  651. backlog.push_front(buf);
  652. active = true;
  653. }
  654. Err(chan::TryRecvError::Disconnected) => {
  655. crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting";
  656. "n_outstanding" => n_outstanding,
  657. "backlog.len()" => backlog.len(),
  658. "n_rcvd" => n_rcvd,
  659. "extras" => extras);
  660. break 'event
  661. }
  662. Err(_) => break
  663. }
  664. }
  665. if !active {
  666. thread::sleep(Duration::new(0, 1))
  667. }
  668. }
  669. info!(logger, "waiting 1s before exiting thread");
  670. thread::sleep(Duration::from_secs(1));
  671. }).unwrap();
  672. InfluxWriter {
  673. host: host.to_string(),
  674. db: db.to_string(),
  675. tx,
  676. thread: Some(Arc::new(thread))
  677. }
  678. }
  679. }
  680. impl Drop for InfluxWriter {
  681. fn drop(&mut self) {
  682. if let Some(arc) = self.thread.take() {
  683. if let Ok(thread) = Arc::try_unwrap(arc) {
  684. let _ = self.tx.send(None);
  685. let _ = thread.join();
  686. }
  687. }
  688. }
  689. }
  690. #[cfg(feature = "zmq")]
  691. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  692. #[cfg(feature = "zmq")]
  693. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  694. let socket = ctx.socket(zmq::PULL)?;
  695. socket.bind(WRITER_ADDR)?;
  696. socket.set_rcvhwm(0)?;
  697. Ok(socket)
  698. }
  699. #[cfg(feature = "zmq")]
  700. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  701. let socket = ctx.socket(zmq::PUSH)?;
  702. socket.connect(WRITER_ADDR)?;
  703. socket.set_sndhwm(0)?;
  704. Ok(socket)
  705. }
  706. /// This removes offending things rather than escaping them.
  707. ///
  708. fn escape_tag(s: &str) -> String {
  709. s.replace(" ", "")
  710. .replace(",", "")
  711. .replace("\"", "")
  712. }
  713. fn escape(s: &str) -> String {
  714. s.replace(" ", "\\ ")
  715. .replace(",", "\\,")
  716. }
  717. fn as_string(s: &str) -> String {
  718. // the second replace removes double escapes
  719. //
  720. format!("\"{}\"", s.replace("\"", "\\\"")
  721. .replace(r#"\\""#, r#"\""#))
  722. }
  723. #[test]
  724. fn it_checks_as_string_does_not_double_escape() {
  725. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  726. let escaped = as_string(&raw);
  727. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  728. }
  729. fn as_integer(i: &i64) -> String {
  730. format!("{}i", i)
  731. }
  732. fn as_float(f: &f64) -> String {
  733. f.to_string()
  734. }
  735. fn as_boolean(b: &bool) -> &str {
  736. if *b { "t" } else { "f" }
  737. }
  738. pub fn now() -> i64 {
  739. nanos(Utc::now()) as i64
  740. }
  741. /// Serialize the measurement into influx line protocol
  742. /// and append to the buffer.
  743. ///
  744. /// # Examples
  745. ///
  746. /// ```
  747. /// extern crate influent;
  748. /// extern crate logging;
  749. ///
  750. /// use influent::measurement::{Measurement, Value};
  751. /// use std::string::String;
  752. /// use logging::influx::serialize;
  753. ///
  754. /// fn main() {
  755. /// let mut buf = String::new();
  756. /// let mut m = Measurement::new("test");
  757. /// m.add_field("x", Value::Integer(1));
  758. /// serialize(&m, &mut buf);
  759. /// }
  760. ///
  761. /// ```
  762. ///
  763. pub fn serialize(measurement: &Measurement, line: &mut String) {
  764. line.push_str(&escape(measurement.key));
  765. for (tag, value) in measurement.tags.iter() {
  766. line.push_str(",");
  767. line.push_str(&escape(tag));
  768. line.push_str("=");
  769. line.push_str(&escape(value));
  770. }
  771. let mut was_spaced = false;
  772. for (field, value) in measurement.fields.iter() {
  773. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  774. line.push_str(&escape(field));
  775. line.push_str("=");
  776. match value {
  777. &Value::String(ref s) => line.push_str(&as_string(s)),
  778. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  779. &Value::Float(ref f) => line.push_str(&as_float(f)),
  780. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  781. };
  782. }
  783. match measurement.timestamp {
  784. Some(t) => {
  785. line.push_str(" ");
  786. line.push_str(&t.to_string());
  787. }
  788. _ => {}
  789. }
  790. }
  791. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  792. ///
  793. /// The serialized measurement is appended to the end of the string without
  794. /// any regard for what exited in it previously.
  795. ///
  796. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  797. line.push_str(&escape_tag(measurement.key));
  798. let add_tag = |line: &mut String, key: &str, value: &str| {
  799. line.push_str(",");
  800. line.push_str(&escape_tag(key));
  801. line.push_str("=");
  802. line.push_str(&escape(value));
  803. };
  804. for (key, value) in measurement.tags.iter() {
  805. #[cfg(not(feature = "string-tags"))]
  806. add_tag(line, key, value);
  807. #[cfg(feature = "string-tags")]
  808. add_tag(line, key, value.as_str());
  809. }
  810. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  811. if is_first { line.push_str(" "); } else { line.push_str(","); }
  812. line.push_str(&escape_tag(key));
  813. line.push_str("=");
  814. match *value {
  815. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  816. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  817. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  818. OwnedValue::D128(ref d) => {
  819. if d.is_finite() {
  820. line.push_str(&format!("{}", d));
  821. } else {
  822. line.push_str("0.0");
  823. }
  824. }
  825. OwnedValue::Float(ref f) => {
  826. if f.is_finite() {
  827. line.push_str(&format!("{}", f));
  828. } else {
  829. line.push_str("-999.0");
  830. }
  831. }
  832. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  833. };
  834. };
  835. let mut fields = measurement.fields.iter();
  836. // first time separate from tags with space
  837. //
  838. fields.next().map(|kv| {
  839. add_field(line, &kv.0, &kv.1, true);
  840. });
  841. // then seperate the rest w/ comma
  842. //
  843. for kv in fields {
  844. add_field(line, kv.0, &kv.1, false);
  845. }
  846. if let Some(t) = measurement.timestamp {
  847. line.push_str(" ");
  848. line.push_str(&t.to_string());
  849. }
  850. }
  851. #[cfg(feature = "warnings")]
  852. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  853. #[cfg(feature = "zmq")]
  854. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  855. assert!(false);
  856. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  857. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  858. let _ = fs::create_dir("/tmp/mm");
  859. let ctx = zmq::Context::new();
  860. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  861. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  862. let client = Client::new();
  863. let mut buf = String::with_capacity(4096);
  864. let mut server_resp = String::with_capacity(4096);
  865. let mut count = 0;
  866. loop {
  867. if let Ok(bytes) = socket.recv_bytes(0) {
  868. if let Ok(msg) = String::from_utf8(bytes) {
  869. count = match count {
  870. 0 => {
  871. buf.push_str(&msg);
  872. 1
  873. }
  874. n @ 1...40 => {
  875. buf.push_str("\n");
  876. buf.push_str(&msg);
  877. n + 1
  878. }
  879. _ => {
  880. buf.push_str("\n");
  881. buf.push_str(&msg);
  882. match client.post(url.clone())
  883. .body(&buf)
  884. .send() {
  885. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  886. Ok(mut resp) => {
  887. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  888. let _ = warnings.send(
  889. Warning::Error(
  890. format!("Influx server: {}", server_resp)));
  891. server_resp.clear();
  892. }
  893. Err(why) => {
  894. let _ = warnings.send(
  895. Warning::Error(
  896. format!("Influx write error: {}", why)));
  897. }
  898. }
  899. buf.clear();
  900. 0
  901. }
  902. }
  903. }
  904. }
  905. }
  906. }).unwrap()
  907. }
  908. #[derive(Debug, Clone, PartialEq)]
  909. pub enum OwnedValue {
  910. String(String),
  911. Float(f64),
  912. Integer(i64),
  913. Boolean(bool),
  914. D128(d128),
  915. Uuid(Uuid),
  916. }
  917. /// Holds data meant for an influxdb measurement in transit to the
  918. /// writing thread.
  919. ///
  920. /// TODO: convert `Map` to `SmallVec`?
  921. ///
  922. #[derive(Clone, Debug)]
  923. pub struct OwnedMeasurement {
  924. pub key: &'static str,
  925. pub timestamp: Option<i64>,
  926. //pub fields: Map<&'static str, OwnedValue>,
  927. //pub tags: Map<&'static str, &'static str>,
  928. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  929. #[cfg(not(feature = "string-tags"))]
  930. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  931. #[cfg(feature = "string-tags")]
  932. pub tags: SmallVec<[(&'static str, String); 8]>,
  933. }
  934. impl OwnedMeasurement {
  935. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  936. OwnedMeasurement {
  937. key,
  938. timestamp: None,
  939. tags: SmallVec::with_capacity(n_tags),
  940. fields: SmallVec::with_capacity(n_fields),
  941. }
  942. }
  943. pub fn new(key: &'static str) -> Self {
  944. OwnedMeasurement {
  945. key,
  946. timestamp: None,
  947. tags: SmallVec::new(),
  948. fields: SmallVec::new(),
  949. }
  950. }
  951. /// Unusual consuming `self` signature because primarily used by
  952. /// the `measure!` macro.
  953. #[cfg(not(feature = "string-tags"))]
  954. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  955. self.tags.push((key, value));
  956. self
  957. }
  958. #[cfg(feature = "string-tags")]
  959. pub fn add_tag<S: ToString>(mut self, key: &'static str, value: S) -> Self {
  960. self.tags.push((key, value.to_string()));
  961. self
  962. }
  963. /// Unusual consuming `self` signature because primarily used by
  964. /// the `measure!` macro.
  965. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  966. self.fields.push((key, value));
  967. self
  968. }
  969. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  970. self.timestamp = Some(timestamp);
  971. self
  972. }
  973. #[cfg(not(feature = "string-tags"))]
  974. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  975. match self.tags.iter().position(|kv| kv.0 == key) {
  976. Some(i) => {
  977. self.tags.get_mut(i)
  978. .map(|x| {
  979. x.0 = value;
  980. });
  981. self
  982. }
  983. None => {
  984. self.add_tag(key, value)
  985. }
  986. }
  987. }
  988. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  989. self.fields.iter()
  990. .find(|kv| kv.0 == key)
  991. .map(|kv| &kv.1)
  992. }
  993. #[cfg(not(feature = "string-tags"))]
  994. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  995. self.tags.iter()
  996. .find(|kv| kv.0 == key)
  997. .map(|kv| kv.1)
  998. }
  999. }
  1000. #[allow(unused_imports, unused_variables)]
  1001. #[cfg(test)]
  1002. mod tests {
  1003. use super::*;
  1004. use test::{black_box, Bencher};
  1005. #[ignore]
  1006. #[bench]
  1007. fn measure_ten(b: &mut Bencher) {
  1008. let influx = InfluxWriter::new("localhost", "test", "log/influx.log", 8192);
  1009. let mut n = 0;
  1010. b.iter(|| {
  1011. for _ in 0..10 {
  1012. let time = influx.nanos(Utc::now());
  1013. n += 1;
  1014. measure!(influx, million, i(n), tm(time));
  1015. }
  1016. });
  1017. }
  1018. #[test]
  1019. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  1020. const VERSION: &str = "0.3.90";
  1021. let tag_value = "one";
  1022. let color = "red";
  1023. let time = Utc::now();
  1024. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  1025. assert_eq!(m.get_tag("color"), Some("red"));
  1026. assert_eq!(m.get_tag("version"), Some(VERSION));
  1027. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  1028. }
  1029. #[test]
  1030. fn it_uses_the_v_for_version_shortcut() {
  1031. const VERSION: &str = "0.3.90";
  1032. let tag_value = "one";
  1033. let color = "red";
  1034. let time = now();
  1035. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  1036. assert_eq!(m.get_tag("color"), Some("red"));
  1037. assert_eq!(m.get_tag("version"), Some(VERSION));
  1038. assert_eq!(m.timestamp, Some(time));
  1039. }
  1040. #[test]
  1041. fn it_uses_the_new_tag_k_only_shortcut() {
  1042. let tag_value = "one";
  1043. let color = "red";
  1044. let time = now();
  1045. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  1046. assert_eq!(m.get_tag("color"), Some("red"));
  1047. assert_eq!(m.get_tag("tag_value"), Some("one"));
  1048. assert_eq!(m.timestamp, Some(time));
  1049. }
  1050. #[test]
  1051. fn it_uses_measure_macro_parenthesis_syntax() {
  1052. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  1053. assert_eq!(m.key, "test");
  1054. assert_eq!(m.get_tag("a"), Some("b"));
  1055. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  1056. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1057. assert_eq!(m.timestamp, Some(1));
  1058. }
  1059. #[test]
  1060. fn it_uses_measure_macro_on_a_self_attribute() {
  1061. struct A {
  1062. pub influx: InfluxWriter,
  1063. }
  1064. impl A {
  1065. fn f(&self) {
  1066. measure!(self.influx, test, t(color, "red"), i(n, 1));
  1067. }
  1068. }
  1069. let a = A { influx: InfluxWriter::default() };
  1070. a.f();
  1071. }
  1072. #[test]
  1073. fn it_clones_an_influx_writer_to_check_both_drop() {
  1074. let influx = InfluxWriter::default();
  1075. measure!(influx, drop_test, i(a, 1), i(b, 2));
  1076. {
  1077. let influx = influx.clone();
  1078. thread::spawn(move || {
  1079. measure!(influx, drop_test, i(a, 3), i(b, 4));
  1080. });
  1081. }
  1082. }
  1083. #[bench]
  1084. fn influx_writer_send_basic(b: &mut Bencher) {
  1085. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  1086. b.iter(|| {
  1087. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  1088. });
  1089. }
  1090. #[bench]
  1091. fn influx_writer_send_price(b: &mut Bencher) {
  1092. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  1093. b.iter(|| {
  1094. measure!(m, test,
  1095. t(ticker, t!(xmr-btc).as_str()),
  1096. t(exchange, "plnx"),
  1097. d(bid, d128::zero()),
  1098. d(ask, d128::zero()),
  1099. );
  1100. });
  1101. }
  1102. #[test]
  1103. fn it_checks_color_tag_error_in_non_doctest() {
  1104. let (tx, rx) = bounded(1024);
  1105. measure!(tx, test, tag[color;"red"], int[n;1]);
  1106. let meas: OwnedMeasurement = rx.recv().unwrap();
  1107. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  1108. }
  1109. #[test]
  1110. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  1111. let meas = measure!(@make_meas test_measurement,
  1112. tag [ one => "a" ],
  1113. tag [ two => "b" ],
  1114. int [ three => 2 ],
  1115. float [ four => 1.2345 ],
  1116. string [ five => String::from("d") ],
  1117. bool [ six => true ],
  1118. int [ seven => { 1 + 2 } ],
  1119. time [ 1 ]
  1120. );
  1121. assert_eq!(meas.key, "test_measurement");
  1122. assert_eq!(meas.get_tag("one"), Some("a"));
  1123. assert_eq!(meas.get_tag("two"), Some("b"));
  1124. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1125. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1126. assert_eq!(meas.timestamp, Some(1));
  1127. }
  1128. #[test]
  1129. fn it_uses_the_measure_macro() {
  1130. let (tx, rx) = bounded(1024);
  1131. measure!(tx, test_measurement,
  1132. tag [ one => "a" ],
  1133. tag [ two => "b" ],
  1134. int [ three => 2 ],
  1135. float [ four => 1.2345 ],
  1136. string [ five => String::from("d") ],
  1137. bool [ six => true ],
  1138. int [ seven => { 1 + 2 } ],
  1139. time [ 1 ]
  1140. );
  1141. thread::sleep(Duration::from_millis(10));
  1142. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1143. assert_eq!(meas.key, "test_measurement");
  1144. assert_eq!(meas.get_tag("one"), Some("a"));
  1145. assert_eq!(meas.get_tag("two"), Some("b"));
  1146. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1147. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1148. assert_eq!(meas.timestamp, Some(1));
  1149. }
  1150. #[test]
  1151. fn it_uses_measure_macro_for_d128_and_uuid() {
  1152. let (tx, rx) = bounded(1024);
  1153. let u = Uuid::new_v4();
  1154. let d = d128::zero();
  1155. let t = now();
  1156. measure!(tx, test_measurement,
  1157. tag[one; "a"],
  1158. d128[two; d],
  1159. uuid[three; u],
  1160. time[t]
  1161. );
  1162. thread::sleep(Duration::from_millis(10));
  1163. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1164. assert_eq!(meas.key, "test_measurement");
  1165. assert_eq!(meas.get_tag("one"), Some("a"));
  1166. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  1167. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  1168. assert_eq!(meas.timestamp, Some(t));
  1169. }
  1170. #[test]
  1171. fn it_uses_the_measure_macro_alt_syntax() {
  1172. let (tx, rx) = bounded(1024);
  1173. measure!(tx, test_measurement,
  1174. tag[one; "a"],
  1175. tag[two; "b"],
  1176. int[three; 2],
  1177. float[four; 1.2345],
  1178. string[five; String::from("d")],
  1179. bool [ six => true ],
  1180. int[seven; { 1 + 2 }],
  1181. time[1]
  1182. );
  1183. thread::sleep(Duration::from_millis(10));
  1184. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1185. assert_eq!(meas.key, "test_measurement");
  1186. assert_eq!(meas.get_tag("one"), Some("a"));
  1187. assert_eq!(meas.get_tag("two"), Some("b"));
  1188. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1189. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1190. assert_eq!(meas.timestamp, Some(1));
  1191. }
  1192. #[test]
  1193. fn it_checks_that_fields_are_separated_correctly() {
  1194. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  1195. assert_eq!(m.key, "test");
  1196. assert_eq!(m.get_tag("a"), Some("one"));
  1197. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1198. let mut buf = String::new();
  1199. serialize_owned(&m, &mut buf);
  1200. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  1201. }
  1202. #[test]
  1203. fn try_to_break_measure_macro() {
  1204. let (tx, _) = bounded(1024);
  1205. measure!(tx, one, tag[x=>"y"], int[n;1]);
  1206. measure!(tx, one, tag[x;"y"], int[n;1],);
  1207. struct A {
  1208. pub one: i32,
  1209. pub two: i32,
  1210. }
  1211. struct B {
  1212. pub a: A
  1213. }
  1214. let b = B { a: A { one: 1, two: 2 } };
  1215. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  1216. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  1217. }
  1218. #[bench]
  1219. fn measure_macro_small(b: &mut Bencher) {
  1220. let (tx, rx) = bounded(1024);
  1221. let listener = thread::spawn(move || {
  1222. loop { if rx.recv().is_err() { break } }
  1223. });
  1224. b.iter(|| {
  1225. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  1226. });
  1227. }
  1228. #[bench]
  1229. fn measure_macro_medium(b: &mut Bencher) {
  1230. let (tx, rx) = bounded(1024);
  1231. let listener = thread::spawn(move || {
  1232. loop { if rx.recv().is_err() { break } }
  1233. });
  1234. b.iter(|| {
  1235. measure!(tx, test,
  1236. tag[color; "red"],
  1237. tag[mood => "playful"],
  1238. tag [ ticker => "xmr_btc" ],
  1239. float[ price => 1.2345 ],
  1240. float[ amount => 56.323],
  1241. int[n; 1],
  1242. time[now()]
  1243. );
  1244. });
  1245. }
  1246. #[cfg(feature = "zmq")]
  1247. #[cfg(feature = "warnings")]
  1248. #[test]
  1249. #[ignore]
  1250. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  1251. let ctx = zmq::Context::new();
  1252. let socket = push(&ctx).unwrap();
  1253. let (tx, rx) = bounded(1024);
  1254. let w = writer(tx.clone());
  1255. let mut buf = String::with_capacity(4096);
  1256. let mut meas = Measurement::new("rust_test");
  1257. meas.add_tag("a", "t");
  1258. meas.add_field("c", Value::Float(1.23456));
  1259. let now = now();
  1260. meas.set_timestamp(now);
  1261. serialize(&meas, &mut buf);
  1262. socket.send_str(&buf, 0).unwrap();
  1263. drop(w);
  1264. }
  1265. #[test]
  1266. fn it_serializes_a_measurement_in_place() {
  1267. let mut buf = String::with_capacity(4096);
  1268. let mut meas = Measurement::new("rust_test");
  1269. meas.add_tag("a", "b");
  1270. meas.add_field("c", Value::Float(1.0));
  1271. let now = now();
  1272. meas.set_timestamp(now);
  1273. serialize(&meas, &mut buf);
  1274. let ans = format!("rust_test,a=b c=1 {}", now);
  1275. assert_eq!(buf, ans);
  1276. }
  1277. #[test]
  1278. fn it_serializes_a_hard_to_serialize_message() {
  1279. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  1280. let mut buf = String::new();
  1281. let mut server_resp = String::new();
  1282. let mut m = Measurement::new("rust_test");
  1283. m.add_field("s", Value::String(&raw));
  1284. let now = now();
  1285. m.set_timestamp(now);
  1286. serialize(&m, &mut buf);
  1287. println!("{}", buf);
  1288. buf.push_str("\n");
  1289. let buf_copy = buf.clone();
  1290. buf.push_str(&buf_copy);
  1291. println!("{}", buf);
  1292. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1293. let client = Client::new();
  1294. match client.post(url.clone())
  1295. .body(&buf)
  1296. .send() {
  1297. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1298. Ok(mut resp) => {
  1299. resp.read_to_string(&mut server_resp).unwrap();
  1300. panic!("{}", server_resp);
  1301. }
  1302. Err(why) => {
  1303. panic!(why)
  1304. }
  1305. }
  1306. }
  1307. #[bench]
  1308. fn serialize_owned_longer(b: &mut Bencher) {
  1309. let mut buf = String::with_capacity(1024);
  1310. let m =
  1311. OwnedMeasurement::new("test")
  1312. .add_tag("one", "a")
  1313. .add_tag("two", "b")
  1314. .add_tag("ticker", "xmr_btc")
  1315. .add_tag("exchange", "plnx")
  1316. .add_tag("side", "bid")
  1317. .add_field("three", OwnedValue::Float(1.2345))
  1318. .add_field("four", OwnedValue::Integer(57))
  1319. .add_field("five", OwnedValue::Boolean(true))
  1320. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  1321. .set_timestamp(now());
  1322. b.iter(|| {
  1323. serialize_owned(&m, &mut buf);
  1324. buf.clear()
  1325. });
  1326. }
  1327. #[bench]
  1328. fn serialize_owned_simple(b: &mut Bencher) {
  1329. let mut buf = String::with_capacity(1024);
  1330. let m =
  1331. OwnedMeasurement::new("test")
  1332. .add_tag("one", "a")
  1333. .add_tag("two", "b")
  1334. .add_field("three", OwnedValue::Float(1.2345))
  1335. .add_field("four", OwnedValue::Integer(57))
  1336. .set_timestamp(now());
  1337. b.iter(|| {
  1338. serialize_owned(&m, &mut buf);
  1339. buf.clear()
  1340. });
  1341. }
  1342. #[bench]
  1343. fn clone_url_for_thread(b: &mut Bencher) {
  1344. let host = "ahmes";
  1345. let db = "mlp";
  1346. let url =
  1347. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1348. &[("db", db), ("precision", "ns")]).unwrap();
  1349. b.iter(|| {
  1350. url.clone()
  1351. })
  1352. }
  1353. #[bench]
  1354. fn clone_arc_url_for_thread(b: &mut Bencher) {
  1355. let host = "ahmes";
  1356. let db = "mlp";
  1357. let url =
  1358. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1359. &[("db", db), ("precision", "ns")]).unwrap();
  1360. let url = Arc::new(url);
  1361. b.iter(|| {
  1362. Arc::clone(&url)
  1363. })
  1364. }
  1365. #[test]
  1366. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  1367. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  1368. let mut buf = String::new();
  1369. let mut server_resp = String::new();
  1370. let m = OwnedMeasurement::new("rust_test")
  1371. .add_field("s", OwnedValue::String(raw.to_string()))
  1372. .set_timestamp(now());
  1373. serialize_owned(&m, &mut buf);
  1374. println!("{}", buf);
  1375. buf.push_str("\n");
  1376. let buf_copy = buf.clone();
  1377. buf.push_str(&buf_copy);
  1378. println!("{}", buf);
  1379. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1380. let client = Client::new();
  1381. match client.post(url.clone())
  1382. .body(&buf)
  1383. .send() {
  1384. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1385. Ok(mut resp) => {
  1386. resp.read_to_string(&mut server_resp).unwrap();
  1387. panic!("{}", server_resp);
  1388. }
  1389. Err(why) => {
  1390. panic!(why)
  1391. }
  1392. }
  1393. }
  1394. }