You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1446 lines
57KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. #![feature(test)]
  4. #[cfg(test)]
  5. extern crate test;
  6. #[macro_use]
  7. extern crate slog;
  8. use std::io::Read;
  9. use std::sync::Arc;
  10. use crossbeam_channel::{Sender, Receiver, bounded, SendError};
  11. use std::{thread, mem};
  12. use std::time::*;
  13. use std::collections::VecDeque;
  14. use hyper::status::StatusCode;
  15. use hyper::client::response::Response;
  16. use hyper::Url;
  17. use hyper::client::Client;
  18. use slog::Drain;
  19. use chrono::prelude::*;
  20. use decimal::d128;
  21. use uuid::Uuid;
  22. use smallvec::SmallVec;
  23. use slog::Logger;
  24. use pretty_toa::ThousandsSep;
  25. pub type Credentials = hyper::header::Authorization<hyper::header::Basic>;
  26. /// Created this so I know what types can be passed through the
  27. /// `measure!` macro, which used to convert with `as i64` and
  28. /// `as f64` until I accidentally passed a function name, and it
  29. /// still compiled, but with garbage numbers.
  30. pub trait AsI64 {
  31. fn as_i64(x: Self) -> i64;
  32. }
  33. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  34. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  35. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  36. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  37. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  38. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  39. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  40. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  41. impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  42. impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  43. impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  44. /// Created this so I know what types can be passed through the
  45. /// `measure!` macro, which used to convert with `as i64` and
  46. /// `as f64` until I accidentally passed a function name, and it
  47. /// still compiled, but with garbage numbers.
  48. pub trait AsF64 {
  49. fn as_f64(x: Self) -> f64;
  50. }
  51. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  52. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  53. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  54. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  55. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  56. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  57. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  58. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  59. ///
  60. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  61. /// values, as well as sends it with the `Sender`.
  62. ///
  63. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  64. /// measurement (see `tests` mod).
  65. ///
  66. /// # Examples
  67. ///
  68. /// ```
  69. /// #[macro_use]
  70. /// extern crate influx_writer;
  71. ///
  72. /// use influx_writer::{OwnedValue, OwnedMeasurement, AsI64};
  73. ///
  74. /// use decimal::d128;
  75. ///
  76. /// fn main() {
  77. /// let (tx, rx) = crossbeam_channel::bounded(1024);
  78. ///
  79. /// // "shorthand" syntax
  80. ///
  81. /// measure!(tx, test, t(color, "red"), i(n, 1));
  82. ///
  83. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  84. ///
  85. /// assert_eq!(meas.key, "test");
  86. /// assert_eq!(meas.get_tag("color"), Some("red"));
  87. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  88. ///
  89. /// measure!(tx, test,
  90. /// t(one, "a"), t(two, "b"), i(three, 2),
  91. /// f(four, 1.2345), s(five, String::from("d")),
  92. /// b(six, true), i(seven, 1 + 2),
  93. /// tm(1)
  94. /// );
  95. ///
  96. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  97. ///
  98. /// assert_eq!(meas.key, "test");
  99. /// assert_eq!(meas.get_tag("one"), Some("a"));
  100. /// assert_eq!(meas.get_tag("two"), Some("b"));
  101. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  102. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  103. /// assert_eq!(meas.timestamp, Some(1));
  104. ///
  105. /// // use the @make_meas flag to skip sending a measurement, instead merely
  106. /// // creating it.
  107. ///
  108. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, t(color, "red"), i(n, 1));
  109. ///
  110. /// // each variant also has shorthand aliases
  111. ///
  112. /// let meas: OwnedMeasurement = measure!(@make_meas abcd, t(color, "red"), i(n, 1), d(price, d128::zero()));
  113. /// }
  114. /// ```
  115. ///
  116. #[macro_export]
  117. macro_rules! measure {
  118. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  119. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  120. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  121. //(@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  122. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  123. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  124. (@kv v, $meas:ident, $k:expr) => { measure!(@ea t, $meas, "version", $k) };
  125. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  126. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  127. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Integer(AsI64::as_i64($v))) };
  128. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Float(AsF64::as_f64($v))) };
  129. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::String($v)) };
  130. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::D128($v)) };
  131. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Uuid($v)) };
  132. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::OwnedValue::Boolean(bool::from($v))) };
  133. (@as_expr $e:expr) => {$e};
  134. (@count_tags) => {0usize};
  135. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  136. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  137. (@count_fields) => {0usize};
  138. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  139. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  140. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  141. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  142. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  143. };
  144. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  145. let n_tags = measure!(@count_tags $($t)*);
  146. let n_fields = measure!(@count_fields $($t)*);
  147. let mut meas =
  148. $crate::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  149. $(
  150. measure!(@kv $t, meas, $($tail)*);
  151. )*
  152. meas
  153. }};
  154. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  155. measure!($m, $name, $($t [ $($tail)* ] ),+)
  156. };
  157. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  158. #[allow(unused_imports)]
  159. use $crate::{AsI64, AsF64};
  160. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  161. let _ = $m.send(measurement);
  162. }};
  163. }
  164. /// converts a chrono::DateTime to an integer timestamp (ns)
  165. ///
  166. #[inline]
  167. pub fn nanos(t: DateTime<Utc>) -> u64 {
  168. (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64)
  169. }
  170. #[inline]
  171. pub fn secs(d: Duration) -> f64 {
  172. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  173. }
  174. #[inline]
  175. pub fn inanos(t: DateTime<Utc>) -> i64 {
  176. t.timestamp() * 1_000_000_000i64 + t.timestamp_subsec_nanos() as i64
  177. }
  178. //#[deprecated(since="0.4.3", note="Use `nanos(DateTime<Utc>) -> u64` instead")]
  179. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  180. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  181. }
  182. #[inline]
  183. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  184. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  185. }
  186. #[inline]
  187. pub fn nanos_utc(t: i64) -> DateTime<Utc> {
  188. Utc.timestamp(t / 1_000_000_000, (t % 1_000_000_000) as u32)
  189. }
  190. #[derive(Clone, Debug)]
  191. struct Point<T, V> {
  192. pub time: T,
  193. pub value: V
  194. }
  195. struct DurationWindow {
  196. pub size: Duration,
  197. pub mean: Duration,
  198. pub sum: Duration,
  199. pub count: u32,
  200. pub items: VecDeque<Point<Instant, Duration>>
  201. }
  202. #[allow(dead_code)]
  203. impl DurationWindow {
  204. #[inline]
  205. pub fn update(&mut self, time: Instant, value: Duration) {
  206. self.add(time, value);
  207. self.refresh(time);
  208. }
  209. #[inline]
  210. pub fn refresh(&mut self, t: Instant) -> &Self {
  211. if !self.items.is_empty() {
  212. let (n_remove, sum, count) =
  213. self.items.iter()
  214. .take_while(|x| t - x.time > self.size)
  215. .fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
  216. (n_remove + 1, sum - x.value, count - 1)
  217. });
  218. self.sum = sum;
  219. self.count = count;
  220. for _ in 0..n_remove {
  221. self.items.pop_front();
  222. }
  223. }
  224. if self.count > 0 {
  225. self.mean = self.sum / self.count.into();
  226. }
  227. self
  228. }
  229. #[inline]
  230. pub fn add(&mut self, time: Instant, value: Duration) {
  231. let p = Point { time, value };
  232. self.sum += p.value;
  233. self.count += 1;
  234. self.items.push_back(p);
  235. }
  236. }
  237. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  238. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  239. /// measurements have accumulated.
  240. ///
  241. #[derive(Debug)]
  242. pub struct InfluxWriter {
  243. host: String,
  244. db: String,
  245. tx: Sender<Option<OwnedMeasurement>>,
  246. thread: Option<Arc<thread::JoinHandle<()>>>,
  247. }
  248. impl Default for InfluxWriter {
  249. fn default() -> Self {
  250. InfluxWriter::new("localhost", "test")
  251. }
  252. }
  253. impl Clone for InfluxWriter {
  254. fn clone(&self) -> Self {
  255. debug_assert!(self.thread.is_some());
  256. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  257. InfluxWriter {
  258. host: self.host.to_string(),
  259. db: self.db.to_string(),
  260. tx: self.tx.clone(),
  261. thread,
  262. }
  263. }
  264. }
  265. impl InfluxWriter {
  266. pub fn host(&self) -> &str { self.host.as_str() }
  267. pub fn db(&self) -> &str { self.db.as_str() }
  268. /// Sends the `OwnedMeasurement` to the serialization thread.
  269. ///
  270. #[inline]
  271. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  272. self.tx.send(Some(m))
  273. }
  274. #[inline]
  275. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  276. #[inline]
  277. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  278. #[inline]
  279. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  280. #[inline]
  281. pub fn rsecs(&self, d: Duration) -> f64 {
  282. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  283. * 1000.0)
  284. .round()
  285. / 1000.0
  286. }
  287. #[inline]
  288. pub fn secs(&self, d: Duration) -> f64 {
  289. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  290. }
  291. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  292. self.tx.clone()
  293. }
  294. #[inline]
  295. pub fn is_full(&self) -> bool { self.tx.is_full() }
  296. pub fn placeholder() -> Self {
  297. let (tx, _) = bounded(1024);
  298. Self {
  299. host: String::new(),
  300. db: String::new(),
  301. tx,
  302. thread: None,
  303. }
  304. }
  305. pub fn new(host: &str, db: &str) -> Self {
  306. let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
  307. Self::with_logger_and_opt_creds(host, db, None, &noop_logger)
  308. }
  309. pub fn get_credentials(username: String, password: Option<String>) -> Credentials {
  310. hyper::header::Authorization(
  311. hyper::header::Basic { username, password }
  312. )
  313. }
  314. fn http_req<'a>(client: &'a Client, url: Url, body: &'a str, creds: &Option<Credentials>) -> hyper::client::RequestBuilder<'a> {
  315. let req = client.post(url.clone())
  316. .body(body);
  317. if let Some(auth) = creds {
  318. req.header(auth.clone())
  319. } else {
  320. req
  321. }
  322. }
  323. #[deprecated(since = "0.9.0", note = "replaced by with_logger_and_opt_creds")]
  324. #[allow(unused_assignments)]
  325. pub fn with_logger(host: &str, db: &str, logger: &Logger) -> Self {
  326. Self::with_logger_and_opt_creds(host, db, None, logger)
  327. }
  328. pub fn with_logger_and_opt_creds(host: &str, db: &str, creds: Option<Credentials>, logger: &Logger) -> Self {
  329. let logger = logger.new(o!(
  330. "host" => host.to_string(),
  331. "db" => db.to_string()));
  332. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(4096);
  333. let url =
  334. Url::parse_with_params(&format!("http://{}:8086/write", host),
  335. &[("db", db), ("precision", "ns")])
  336. .expect("influx writer url should parse");
  337. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  338. use std::time::*;
  339. use crossbeam_channel as chan;
  340. #[cfg(feature = "no-influx-buffer")]
  341. const N_BUFFER_LINES: usize = 0;
  342. const N_BUFFER_LINES: usize = 1024;
  343. const MAX_PENDING: Duration = Duration::from_secs(3);
  344. const INITIAL_BUFFER_CAPACITY: usize = 4096;
  345. const MAX_BACKLOG: usize = 1024;
  346. const MAX_OUTSTANDING_HTTP: usize = 64;
  347. const DEBUG_HB_EVERY: usize = 1024 * 96;
  348. const INFO_HB_EVERY: usize = 1024 * 1024;
  349. const N_HTTP_ATTEMPTS: u32 = 15;
  350. const INITIAL_BACKLOG: usize = MAX_OUTSTANDING_HTTP * 2;
  351. let client = Arc::new(Client::new());
  352. let creds = Arc::new(creds);
  353. info!(logger, "initializing InfluxWriter ...";
  354. "N_BUFFER_LINES" => N_BUFFER_LINES,
  355. "MAX_PENDING" => %format_args!("{:?}", MAX_PENDING),
  356. "MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP,
  357. "INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY,
  358. "INITIAL_BACKLOG" => INITIAL_BACKLOG,
  359. "MAX_BACKLOG" => MAX_BACKLOG,
  360. );
  361. // pre-allocated buffers ready for use if the active one is stasheed
  362. // during an outage
  363. let mut spares: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
  364. // queue failed sends here until problem resolved, then send again. in worst
  365. // case scenario, loop back around on buffers queued in `backlog`, writing
  366. // over the oldest first.
  367. //
  368. let mut backlog: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);
  369. for _ in 0..INITIAL_BACKLOG {
  370. spares.push_back(String::with_capacity(INITIAL_BUFFER_CAPACITY));
  371. }
  372. struct Resp {
  373. pub buf: String,
  374. pub took: Duration,
  375. }
  376. let mut db_health = DurationWindow {
  377. size: Duration::from_secs(120),
  378. mean: Duration::new(10, 0),
  379. sum: Duration::new(0, 0),
  380. count: 0,
  381. items: VecDeque::with_capacity(MAX_OUTSTANDING_HTTP),
  382. };
  383. let (http_tx, http_rx) = chan::bounded(32);
  384. let mut buf = spares.pop_front().unwrap();
  385. let mut count = 0;
  386. let mut extras = 0; // any new Strings we intro to the system
  387. let mut n_rcvd = 0;
  388. let mut in_flight_buffer_bytes = 0;
  389. let mut last = Instant::now();
  390. let mut active: bool;
  391. let mut last_clear = Instant::now();
  392. let mut last_memory_check = Instant::now();
  393. let mut loop_time: Instant;
  394. let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
  395. INITIAL_BACKLOG + extras - s.len() - b.len() - 1
  396. };
  397. assert_eq!(n_out(&spares, &backlog, extras), 0);
  398. let count_allocated_memory = |spares: &VecDeque<String>, backlog: &VecDeque<String>, in_flight_buffer_bytes: &usize| -> usize {
  399. spares.iter().map(|x| x.capacity()).sum::<usize>()
  400. + backlog.iter().map(|x| x.capacity()).sum::<usize>()
  401. + (*in_flight_buffer_bytes)
  402. };
  403. let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize, in_flight_buffer_bytes: &mut usize| {
  404. if n_outstanding >= MAX_OUTSTANDING_HTTP {
  405. backlog.push_back(buf);
  406. return
  407. }
  408. let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
  409. let tx = http_tx.clone();
  410. let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "in flight req at spawn time" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
  411. let client = Arc::clone(&client);
  412. let creds = Arc::clone(&creds);
  413. *in_flight_buffer_bytes = *in_flight_buffer_bytes + buf.capacity();
  414. debug!(logger, "launching http thread");
  415. let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
  416. let logger = thread_logger;
  417. debug!(logger, "preparing to send http request to influx"; "buf.len()" => buf.len());
  418. let start = Instant::now();
  419. for n_req in 0..N_HTTP_ATTEMPTS {
  420. let throttle = Duration::from_secs(2) * n_req * n_req;
  421. if n_req > 0 {
  422. warn!(logger, "InfluxWriter http thread: pausing before next request";
  423. "n_req" => n_req,
  424. "throttle" => %format_args!("{:?}", throttle),
  425. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  426. thread::sleep(throttle); // 0, 2, 8, 16, 32
  427. }
  428. let sent = Instant::now();
  429. let req = Self::http_req(&client, url.clone(), buf.as_str(), &creds);
  430. let resp = req.send();
  431. let rcvd = Instant::now();
  432. let took = rcvd - sent;
  433. let mut n_tx = 0u32;
  434. match resp {
  435. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  436. debug!(logger, "server responded ok: 204 NoContent");
  437. buf.clear();
  438. let mut resp = Some(Ok(Resp { buf, took }));
  439. loop {
  440. n_tx += 1;
  441. match tx.try_send(resp.take().unwrap()) {
  442. Ok(_) => {
  443. if n_req > 0 {
  444. info!(logger, "successfully recovered from failed request with retry";
  445. "n_req" => n_req,
  446. "n_tx" => n_tx,
  447. "elapsed" => %format_args!("{:?}", Instant::now() - start));
  448. }
  449. return
  450. }
  451. Err(chan::TrySendError::Full(r)) => {
  452. let throttle = Duration::from_millis(1000) * n_tx;
  453. warn!(logger, "channel full: InfluxWriter http thread failed to return buf";
  454. "n_tx" => n_tx, "n_req" => n_req, "until next" => %format_args!("{:?}", throttle));
  455. resp = Some(r);
  456. thread::sleep(throttle);
  457. }
  458. Err(chan::TrySendError::Disconnected(_)) => {
  459. warn!(logger, "InfluxWriter http thread: channel disconnected, aborting buffer return";
  460. "n_tx" => n_tx, "n_req" => n_req);
  461. return
  462. }
  463. }
  464. }
  465. }
  466. Ok(mut resp) => {
  467. let mut server_resp = String::new();
  468. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  469. error!(logger, "influx server error (request took {:?})", took;
  470. "status" => %resp.status,
  471. "body" => server_resp);
  472. }
  473. Err(e) => {
  474. error!(logger, "http request failed: {:?} (request took {:?})", e, took; "err" => %e);
  475. }
  476. }
  477. }
  478. let took = Instant::now() - start;
  479. warn!(logger, "InfluxWriter http thread: aborting http req, returning buffer";
  480. "took" => %format_args!("{:?}", took));
  481. let buflen = buf.len();
  482. let n_lines = buf.lines().count();
  483. if let Err(e) = tx.send(Err(Resp { buf, took })) {
  484. crit!(logger, "failed to send Err(Resp {{ .. }}) back on abort: {:?}", e;
  485. "err" => %e, "buf.len()" => buflen, "n_lines" => n_lines);
  486. }
  487. });
  488. if let Err(e) = thread_res {
  489. crit!(logger, "failed to spawn thread: {}", e);
  490. }
  491. };
  492. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: Instant, last: Instant| -> Result<usize, usize> {
  493. match prev {
  494. 0 if N_BUFFER_LINES > 0 => {
  495. serialize_owned(m, buf);
  496. Ok(1)
  497. }
  498. n if n < N_BUFFER_LINES && loop_time - last < MAX_PENDING => {
  499. buf.push_str("\n");
  500. serialize_owned(m, buf);
  501. Ok(n + 1)
  502. }
  503. n => {
  504. buf.push_str("\n");
  505. serialize_owned(m, buf);
  506. Err(n + 1)
  507. }
  508. }
  509. };
  510. 'event: loop {
  511. loop_time = Instant::now();
  512. active = false;
  513. if loop_time - last_memory_check > Duration::from_secs(60) {
  514. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  515. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  516. info!(logger, "allocated memory: {:.1}MB", allocated_mb;
  517. "allocated bytes" => allocated_bytes,
  518. "in flight buffer bytes" => in_flight_buffer_bytes,
  519. "spares.len()" => spares.len(),
  520. "backlog.len()" => backlog.len(),
  521. );
  522. last_memory_check = loop_time;
  523. }
  524. match rx.recv() {
  525. Ok(Some(mut meas)) => {
  526. n_rcvd += 1;
  527. active = true;
  528. if n_rcvd % INFO_HB_EVERY == 0 {
  529. let n_outstanding = n_out(&spares, &backlog, extras);
  530. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  531. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  532. info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep();
  533. "n_outstanding" => n_outstanding,
  534. "spares.len()" => spares.len(),
  535. "n_rcvd" => n_rcvd,
  536. "n_active_buf" => count,
  537. "db_health" => %format_args!("{:?}", db_health.mean),
  538. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  539. "backlog.len()" => backlog.len());
  540. } else if n_rcvd % DEBUG_HB_EVERY == 0 {
  541. let n_outstanding = n_out(&spares, &backlog, extras);
  542. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
  543. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  544. debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep();
  545. "n_outstanding" => n_outstanding,
  546. "spares.len()" => spares.len(),
  547. "n_rcvd" => n_rcvd,
  548. "n_active_buf" => count,
  549. "db_health" => %format_args!("{:?}", db_health.mean),
  550. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  551. "backlog.len()" => backlog.len());
  552. }
  553. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  554. if meas.fields.is_empty() {
  555. meas.fields.push(("n", OwnedValue::Integer(1)));
  556. }
  557. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  558. count = match next(count, &meas, &mut buf, loop_time, last) {
  559. Ok(n) => n,
  560. Err(_n) => {
  561. let mut count = 0;
  562. let mut next: String = match spares.pop_front() {
  563. Some(x) => x,
  564. None => {
  565. let n_outstanding = n_out(&spares, &backlog, extras);
  566. if n_outstanding > MAX_BACKLOG {
  567. warn!(logger, "no available buffers in `spares`, pulling from backlog";
  568. "n_outstanding" => n_outstanding,
  569. "spares.len()" => spares.len(),
  570. "n_rcvd" => n_rcvd,
  571. "backlog.len()" => backlog.len());
  572. match backlog.pop_front() {
  573. // Note: this does not clear the backlog buffer,
  574. // instead we will just write more and more until
  575. // we are out of memory. I expect that will never
  576. // happen.
  577. //
  578. Some(x) => {
  579. count = 1; // otherwise, no '\n' added in `next(..)` - we are
  580. // sending a "full" buffer to be extended
  581. x
  582. }
  583. None => {
  584. extras += 1;
  585. crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String";
  586. "n_outstanding" => n_outstanding,
  587. "spares.len()" => spares.len(),
  588. "backlog.len()" => backlog.len(),
  589. "n_rcvd" => n_rcvd,
  590. "extras" => extras);
  591. String::new()
  592. }
  593. }
  594. } else {
  595. extras += 1;
  596. let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY;
  597. let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
  598. info!(logger, "allocating new buffer: zero spares avail";
  599. "allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
  600. "n_outstanding" => n_outstanding,
  601. "extras" => extras,
  602. );
  603. String::with_capacity(INITIAL_BUFFER_CAPACITY)
  604. }
  605. }
  606. };
  607. // after swap, buf in next, so want to send next
  608. //
  609. mem::swap(&mut buf, &mut next);
  610. let n_outstanding = n_out(&spares, &backlog, extras);
  611. send(next, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  612. last = loop_time;
  613. count
  614. }
  615. };
  616. }
  617. Ok(None) => {
  618. let start = Instant::now();
  619. let mut hb = Instant::now();
  620. warn!(logger, "terminate signal rcvd"; "count" => count);
  621. if buf.len() > 0 {
  622. info!(logger, "sending remaining buffer to influx on terminate"; "count" => count);
  623. let meas = OwnedMeasurement::new("influx_writer").add_field("n", OwnedValue::Integer(1));
  624. let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last);
  625. let n_outstanding = n_out(&spares, &backlog, extras);
  626. let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
  627. mem::swap(&mut buf, &mut placeholder);
  628. send(placeholder, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  629. }
  630. let mut n_ok = 0;
  631. let mut n_err = 0;
  632. loop {
  633. loop_time = Instant::now();
  634. let n_outstanding = n_out(&spares, &backlog, extras);
  635. if backlog.is_empty() && n_outstanding < 1 {
  636. info!(logger, "cleared any remaining backlog";
  637. "n_outstanding" => n_outstanding,
  638. "spares.len()" => spares.len(),
  639. "backlog.len()" => backlog.len(),
  640. "n_cleared_ok" => n_ok,
  641. "n_cleared_err" => n_err,
  642. "n_rcvd" => n_rcvd,
  643. "extras" => extras,
  644. "elapsed" => %format_args!("{:?}", loop_time - start));
  645. break 'event
  646. }
  647. if loop_time - hb > Duration::from_secs(5) {
  648. info!(logger, "InfluxWriter still clearing backlog ..";
  649. "n_outstanding" => n_outstanding,
  650. "spares.len()" => spares.len(),
  651. "backlog.len()" => backlog.len(),
  652. "n_cleared_ok" => n_ok,
  653. "n_cleared_err" => n_err,
  654. "extras" => extras,
  655. "n_rcvd" => n_rcvd,
  656. "elapsed" => %format_args!("{:?}", loop_time - start));
  657. hb = loop_time;
  658. }
  659. if let Some(buf) = backlog.pop_front() {
  660. let n_outstanding = n_out(&spares, &backlog, extras);
  661. debug!(logger, "resending queued buffer from backlog";
  662. "backlog.len()" => backlog.len(),
  663. "spares.len()" => spares.len(),
  664. "n_rcvd" => n_rcvd,
  665. "n_outstanding" => n_outstanding);
  666. send(buf, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  667. }
  668. 'rx: loop {
  669. match http_rx.try_recv() {
  670. Ok(Ok(Resp { buf, .. })) => {
  671. n_ok += 1;
  672. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  673. if spares.len() <= INITIAL_BACKLOG {
  674. spares.push_back(buf); // needed so `n_outstanding` count remains accurate
  675. } else {
  676. extras = extras.saturating_sub(1);
  677. }
  678. }
  679. Ok(Err(Resp { buf, .. })) => {
  680. warn!(logger, "requeueing failed request"; "buf.len()" => buf.len());
  681. n_err += 1;
  682. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  683. backlog.push_front(buf);
  684. }
  685. Err(chan::TryRecvError::Disconnected) => {
  686. crit!(logger, "trying to clear backlog, but http_rx disconnected! aborting";
  687. "n_outstanding" => n_outstanding,
  688. "backlog.len()" => backlog.len(),
  689. "n_cleared_ok" => n_ok,
  690. "n_cleared_err" => n_err,
  691. "extras" => extras,
  692. "n_rcvd" => n_rcvd,
  693. "elapsed" => %format_args!("{:?}", loop_time - start));
  694. break 'event
  695. }
  696. Err(_) => break 'rx
  697. }
  698. }
  699. thread::sleep(Duration::from_millis(1));
  700. }
  701. }
  702. _ => {}
  703. }
  704. db_health.refresh(loop_time);
  705. let n_outstanding = n_out(&spares, &backlog, extras);
  706. let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200);
  707. if (n_outstanding < MAX_OUTSTANDING_HTTP
  708. || loop_time.saturating_duration_since(last_clear) > Duration::from_secs(60))
  709. && healthy {
  710. if let Some(queued) = backlog.pop_front() {
  711. let n_outstanding = n_out(&spares, &backlog, extras);
  712. send(queued, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
  713. active = true;
  714. }
  715. last_clear = loop_time;
  716. }
  717. loop {
  718. match http_rx.try_recv() {
  719. Ok(Ok(Resp { buf, took })) => {
  720. db_health.add(loop_time, took);
  721. let in_flight_before = in_flight_buffer_bytes.clone();
  722. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  723. if spares.len() <= INITIAL_BACKLOG {
  724. spares.push_back(buf);
  725. } else {
  726. extras = extras.saturating_sub(1);
  727. debug!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size";
  728. "spares.len()" => spares.len(),
  729. "extras" => extras,
  730. "in flight before" => in_flight_before,
  731. "in in_flight_buffer_bytes" => in_flight_buffer_bytes,
  732. );
  733. }
  734. //spares.push_back(buf);
  735. active = true;
  736. }
  737. Ok(Err(Resp { buf, took })) => {
  738. db_health.add(loop_time, took);
  739. in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
  740. backlog.push_front(buf);
  741. active = true;
  742. }
  743. Err(chan::TryRecvError::Disconnected) => {
  744. crit!(logger, "trying to recover buffers, but http_rx disconnected! aborting";
  745. "n_outstanding" => n_outstanding,
  746. "backlog.len()" => backlog.len(),
  747. "n_rcvd" => n_rcvd,
  748. "extras" => extras);
  749. break 'event
  750. }
  751. Err(_) => break
  752. }
  753. }
  754. if !active {
  755. thread::sleep(Duration::new(0, 1))
  756. }
  757. }
  758. info!(logger, "waiting 1s before exiting thread");
  759. thread::sleep(Duration::from_secs(1));
  760. }).unwrap();
  761. InfluxWriter {
  762. host: host.to_string(),
  763. db: db.to_string(),
  764. tx,
  765. thread: Some(Arc::new(thread))
  766. }
  767. }
  768. }
  769. impl Drop for InfluxWriter {
  770. fn drop(&mut self) {
  771. if let Some(arc) = self.thread.take() {
  772. if let Ok(thread) = Arc::try_unwrap(arc) {
  773. let _ = self.tx.send(None);
  774. let _ = thread.join();
  775. }
  776. }
  777. }
  778. }
  779. /// This removes offending things rather than escaping them.
  780. ///
  781. fn escape_tag(s: &str) -> String {
  782. s.replace(" ", "")
  783. .replace(",", "")
  784. .replace("\"", "")
  785. }
  786. fn escape(s: &str) -> String {
  787. s.replace(" ", "\\ ")
  788. .replace(",", "\\,")
  789. }
  790. fn as_string(s: &str) -> String {
  791. // the second replace removes double escapes
  792. //
  793. format!("\"{}\"", s.replace("\"", "\\\"")
  794. .replace(r#"\\""#, r#"\""#))
  795. }
  796. #[test]
  797. fn it_checks_as_string_does_not_double_escape() {
  798. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  799. let escaped = as_string(&raw);
  800. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  801. }
  802. fn as_boolean(b: &bool) -> &str {
  803. if *b { "t" } else { "f" }
  804. }
  805. pub fn now() -> i64 {
  806. nanos(Utc::now()) as i64
  807. }
  808. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  809. ///
  810. /// The serialized measurement is appended to the end of the string without
  811. /// any regard for what exited in it previously.
  812. ///
  813. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  814. line.push_str(&escape_tag(measurement.key));
  815. let add_tag = |line: &mut String, key: &str, value: &str| {
  816. line.push_str(",");
  817. line.push_str(&escape_tag(key));
  818. line.push_str("=");
  819. line.push_str(&escape(value));
  820. };
  821. for (key, value) in measurement.tags.iter() {
  822. #[cfg(not(feature = "string-tags"))]
  823. add_tag(line, key, value);
  824. #[cfg(feature = "string-tags")]
  825. add_tag(line, key, value.as_str());
  826. }
  827. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  828. if is_first { line.push_str(" "); } else { line.push_str(","); }
  829. line.push_str(&escape_tag(key));
  830. line.push_str("=");
  831. match *value {
  832. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  833. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  834. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  835. OwnedValue::D128(ref d) => {
  836. if d.is_finite() {
  837. line.push_str(&format!("{}", d));
  838. } else {
  839. line.push_str("0.0");
  840. }
  841. }
  842. OwnedValue::Float(ref f) => {
  843. if f.is_finite() {
  844. line.push_str(&format!("{}", f));
  845. } else {
  846. line.push_str("-999.0");
  847. }
  848. }
  849. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  850. };
  851. };
  852. let mut fields = measurement.fields.iter();
  853. // first time separate from tags with space
  854. //
  855. fields.next().map(|kv| {
  856. add_field(line, &kv.0, &kv.1, true);
  857. });
  858. // then seperate the rest w/ comma
  859. //
  860. for kv in fields {
  861. add_field(line, kv.0, &kv.1, false);
  862. }
  863. if let Some(t) = measurement.timestamp {
  864. line.push_str(" ");
  865. line.push_str(&t.to_string());
  866. }
  867. }
  868. #[derive(Debug, Clone, PartialEq)]
  869. pub enum OwnedValue {
  870. String(String),
  871. Float(f64),
  872. Integer(i64),
  873. Boolean(bool),
  874. D128(d128),
  875. Uuid(Uuid),
  876. }
  877. /// Holds data meant for an influxdb measurement in transit to the
  878. /// writing thread.
  879. ///
  880. #[derive(Clone, Debug)]
  881. pub struct OwnedMeasurement {
  882. pub key: &'static str,
  883. pub timestamp: Option<i64>,
  884. //pub fields: Map<&'static str, OwnedValue>,
  885. //pub tags: Map<&'static str, &'static str>,
  886. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  887. #[cfg(not(feature = "string-tags"))]
  888. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  889. #[cfg(feature = "string-tags")]
  890. pub tags: SmallVec<[(&'static str, String); 8]>,
  891. }
  892. impl OwnedMeasurement {
  893. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  894. OwnedMeasurement {
  895. key,
  896. timestamp: None,
  897. tags: SmallVec::with_capacity(n_tags),
  898. fields: SmallVec::with_capacity(n_fields),
  899. }
  900. }
  901. pub fn new(key: &'static str) -> Self {
  902. OwnedMeasurement {
  903. key,
  904. timestamp: None,
  905. tags: SmallVec::new(),
  906. fields: SmallVec::new(),
  907. }
  908. }
  909. /// Unusual consuming `self` signature because primarily used by
  910. /// the `measure!` macro.
  911. #[cfg(not(feature = "string-tags"))]
  912. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  913. self.tags.push((key, value));
  914. self
  915. }
  916. #[cfg(feature = "string-tags")]
  917. pub fn add_tag<S: ToString>(mut self, key: &'static str, value: S) -> Self {
  918. self.tags.push((key, value.to_string()));
  919. self
  920. }
  921. /// Unusual consuming `self` signature because primarily used by
  922. /// the `measure!` macro.
  923. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  924. self.fields.push((key, value));
  925. self
  926. }
  927. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  928. self.timestamp = Some(timestamp);
  929. self
  930. }
  931. #[cfg(not(feature = "string-tags"))]
  932. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  933. match self.tags.iter().position(|kv| kv.0 == key) {
  934. Some(i) => {
  935. self.tags.get_mut(i)
  936. .map(|x| {
  937. x.0 = value;
  938. });
  939. self
  940. }
  941. None => {
  942. self.add_tag(key, value)
  943. }
  944. }
  945. }
  946. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  947. self.fields.iter()
  948. .find(|kv| kv.0 == key)
  949. .map(|kv| &kv.1)
  950. }
  951. #[cfg(feature = "string-tags")]
  952. pub fn get_tag(&self, key: &'static str) -> Option<&str> {
  953. self.tags.iter()
  954. .find(|kv| kv.0 == key)
  955. .map(|kv| kv.1.as_str())
  956. }
  957. #[cfg(not(feature = "string-tags"))]
  958. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  959. self.tags.iter()
  960. .find(|kv| kv.0 == key)
  961. .map(|kv| kv.1)
  962. }
  963. }
  964. #[allow(unused)]
  965. #[cfg(test)]
  966. mod tests {
  967. use super::*;
  968. #[cfg(feature = "unstable")]
  969. use test::{black_box, Bencher};
  970. #[ignore]
  971. #[cfg(feature = "unstable")]
  972. #[bench]
  973. fn measure_ten(b: &mut Bencher) {
  974. let influx = InfluxWriter::new("localhost", "test");
  975. let mut n = 0;
  976. b.iter(|| {
  977. for _ in 0..10 {
  978. let time = influx.nanos(Utc::now());
  979. n += 1;
  980. measure!(influx, million, i(n), tm(time));
  981. }
  982. });
  983. }
  984. #[test]
  985. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  986. const VERSION: &str = "0.3.90";
  987. let tag_value = "one";
  988. let color = "red";
  989. let time = Utc::now();
  990. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  991. assert_eq!(m.get_tag("color"), Some("red"));
  992. assert_eq!(m.get_tag("version"), Some(VERSION));
  993. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  994. }
  995. #[test]
  996. fn it_uses_the_v_for_version_shortcut() {
  997. const VERSION: &str = "0.3.90";
  998. let tag_value = "one";
  999. let color = "red";
  1000. let time = now();
  1001. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  1002. assert_eq!(m.get_tag("color"), Some("red"));
  1003. assert_eq!(m.get_tag("version"), Some(VERSION));
  1004. assert_eq!(m.timestamp, Some(time));
  1005. }
  1006. #[test]
  1007. fn it_uses_the_new_tag_k_only_shortcut() {
  1008. let tag_value = "one";
  1009. let color = "red";
  1010. let time = now();
  1011. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  1012. assert_eq!(m.get_tag("color"), Some("red"));
  1013. assert_eq!(m.get_tag("tag_value"), Some("one"));
  1014. assert_eq!(m.timestamp, Some(time));
  1015. }
  1016. #[test]
  1017. fn it_uses_measure_macro_parenthesis_syntax() {
  1018. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  1019. assert_eq!(m.key, "test");
  1020. assert_eq!(m.get_tag("a"), Some("b"));
  1021. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  1022. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1023. assert_eq!(m.timestamp, Some(1));
  1024. }
  1025. #[test]
  1026. fn it_uses_measure_macro_on_a_self_attribute() {
  1027. struct A {
  1028. pub influx: InfluxWriter,
  1029. }
  1030. impl A {
  1031. fn f(&self) {
  1032. measure!(self.influx, test, t(color, "red"), i(n, 1));
  1033. }
  1034. }
  1035. let a = A { influx: InfluxWriter::default() };
  1036. a.f();
  1037. }
  1038. #[test]
  1039. fn it_clones_an_influx_writer_to_check_both_drop() {
  1040. let influx = InfluxWriter::default();
  1041. measure!(influx, drop_test, i(a, 1), i(b, 2));
  1042. {
  1043. let influx = influx.clone();
  1044. thread::spawn(move || {
  1045. measure!(influx, drop_test, i(a, 3), i(b, 4));
  1046. });
  1047. }
  1048. }
  1049. #[cfg(feature = "unstable")]
  1050. #[bench]
  1051. fn influx_writer_send_basic(b: &mut Bencher) {
  1052. let m = InfluxWriter::new("localhost", "test");
  1053. b.iter(|| {
  1054. measure!(m, test, t(color; "red"), i(n, 1)); //, float[p; 1.234]);
  1055. });
  1056. }
  1057. #[cfg(feature = "unstable")]
  1058. #[bench]
  1059. fn influx_writer_send_price(b: &mut Bencher) {
  1060. let m = InfluxWriter::new("localhost", "test");
  1061. b.iter(|| {
  1062. measure!(m, test,
  1063. t(ticker, "xmr_btc"),
  1064. t(exchange, "plnx"),
  1065. d(bid, d128::zero()),
  1066. d(ask, d128::zero()),
  1067. );
  1068. });
  1069. }
  1070. #[test]
  1071. fn it_checks_color_tag_error_in_non_doctest() {
  1072. let (tx, rx) = bounded(1024);
  1073. measure!(tx, test, t(color,"red"), i(n,1));
  1074. let meas: OwnedMeasurement = rx.recv().unwrap();
  1075. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  1076. }
  1077. #[test]
  1078. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  1079. let meas = measure!(@make_meas test_measurement,
  1080. t(one, "a"), t(two, "b"), i(three, 2),
  1081. f(four, 1.2345), s(five, String::from("d")),
  1082. b(six, true), i(seven, 1 + 2),
  1083. tm(1)
  1084. );
  1085. assert_eq!(meas.key, "test_measurement");
  1086. assert_eq!(meas.get_tag("one"), Some("a"));
  1087. assert_eq!(meas.get_tag("two"), Some("b"));
  1088. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1089. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1090. assert_eq!(meas.timestamp, Some(1));
  1091. }
  1092. #[test]
  1093. fn it_uses_measure_macro_for_d128_and_uuid() {
  1094. let (tx, rx) = bounded(1024);
  1095. let one = "a";
  1096. let two = d128::zero();
  1097. let three = Uuid::new_v4();
  1098. let time = now();
  1099. measure!(tx, test_measurement, t(one), d(two), u(three), tm(time));
  1100. thread::sleep(Duration::from_millis(10));
  1101. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1102. assert_eq!(meas.key, "test_measurement");
  1103. assert_eq!(meas.get_tag("one"), Some("a"));
  1104. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  1105. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(three)));
  1106. assert_eq!(meas.timestamp, Some(time));
  1107. }
  1108. #[test]
  1109. fn it_uses_the_measure_macro_alt_syntax() {
  1110. let (tx, rx) = bounded(1024);
  1111. measure!(tx, test_measurement,
  1112. t(one, "a"), t(two, "b"), i(three, 2),
  1113. f(four, 1.2345), s(five, String::from("d")),
  1114. b(six, true), i(seven, 1 + 2),
  1115. tm(1)
  1116. );
  1117. thread::sleep(Duration::from_millis(10));
  1118. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  1119. assert_eq!(meas.key, "test_measurement");
  1120. assert_eq!(meas.get_tag("one"), Some("a"));
  1121. assert_eq!(meas.get_tag("two"), Some("b"));
  1122. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  1123. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  1124. assert_eq!(meas.timestamp, Some(1));
  1125. }
  1126. #[test]
  1127. fn it_checks_that_fields_are_separated_correctly() {
  1128. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  1129. assert_eq!(m.key, "test");
  1130. assert_eq!(m.get_tag("a"), Some("one"));
  1131. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  1132. let mut buf = String::new();
  1133. serialize_owned(&m, &mut buf);
  1134. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  1135. }
  1136. #[test]
  1137. fn try_to_break_measure_macro() {
  1138. let (tx, _) = bounded(1024);
  1139. measure!(tx, one, t(x,"y"), i(n,1));
  1140. measure!(tx, one, t(x,"y"), i(n,1),);
  1141. struct A {
  1142. pub one: i32,
  1143. pub two: i32,
  1144. }
  1145. struct B {
  1146. pub a: A
  1147. }
  1148. let b = B { a: A { one: 1, two: 2 } };
  1149. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  1150. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  1151. }
  1152. #[cfg(feature = "unstable")]
  1153. #[bench]
  1154. fn measure_macro_small(b: &mut Bencher) {
  1155. let (tx, rx) = bounded(1024);
  1156. let listener = thread::spawn(move || {
  1157. loop { if rx.recv().is_err() { break } }
  1158. });
  1159. b.iter(|| {
  1160. measure!(tx, test, t(color, "red"), i(n, 1), tm(now()));
  1161. });
  1162. }
  1163. #[cfg(feature = "unstable")]
  1164. #[bench]
  1165. fn measure_macro_medium(b: &mut Bencher) {
  1166. let (tx, rx) = bounded(1024);
  1167. let listener = thread::spawn(move || {
  1168. loop { if rx.recv().is_err() { break } }
  1169. });
  1170. b.iter(|| {
  1171. measure!(tx, test, t(color, "red"), t(mood, "playful"),
  1172. t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322),
  1173. i(n, 1), tm(now()));
  1174. });
  1175. }
  1176. #[cfg(feature = "unstable")]
  1177. #[bench]
  1178. fn serialize_owned_longer(b: &mut Bencher) {
  1179. let mut buf = String::with_capacity(1024);
  1180. let m =
  1181. OwnedMeasurement::new("test")
  1182. .add_tag("one", "a")
  1183. .add_tag("two", "b")
  1184. .add_tag("ticker", "xmr_btc")
  1185. .add_tag("exchange", "plnx")
  1186. .add_tag("side", "bid")
  1187. .add_field("three", OwnedValue::Float(1.2345))
  1188. .add_field("four", OwnedValue::Integer(57))
  1189. .add_field("five", OwnedValue::Boolean(true))
  1190. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  1191. .set_timestamp(now());
  1192. b.iter(|| {
  1193. serialize_owned(&m, &mut buf);
  1194. buf.clear()
  1195. });
  1196. }
  1197. #[cfg(feature = "unstable")]
  1198. #[bench]
  1199. fn serialize_owned_simple(b: &mut Bencher) {
  1200. let mut buf = String::with_capacity(1024);
  1201. let m =
  1202. OwnedMeasurement::new("test")
  1203. .add_tag("one", "a")
  1204. .add_tag("two", "b")
  1205. .add_field("three", OwnedValue::Float(1.2345))
  1206. .add_field("four", OwnedValue::Integer(57))
  1207. .set_timestamp(now());
  1208. b.iter(|| {
  1209. serialize_owned(&m, &mut buf);
  1210. buf.clear()
  1211. });
  1212. }
  1213. #[cfg(feature = "unstable")]
  1214. #[bench]
  1215. fn clone_url_for_thread(b: &mut Bencher) {
  1216. let host = "ahmes";
  1217. let db = "mlp";
  1218. let url =
  1219. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1220. &[("db", db), ("precision", "ns")]).unwrap();
  1221. b.iter(|| {
  1222. url.clone()
  1223. })
  1224. }
  1225. #[cfg(feature = "unstable")]
  1226. #[bench]
  1227. fn clone_arc_url_for_thread(b: &mut Bencher) {
  1228. let host = "ahmes";
  1229. let db = "mlp";
  1230. let url =
  1231. Url::parse_with_params(&format!("http://{}:8086/write", host),
  1232. &[("db", db), ("precision", "ns")]).unwrap();
  1233. let url = Arc::new(url);
  1234. b.iter(|| {
  1235. Arc::clone(&url)
  1236. })
  1237. }
  1238. #[test]
  1239. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  1240. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  1241. let mut buf = String::new();
  1242. let mut server_resp = String::new();
  1243. let m = OwnedMeasurement::new("rust_test")
  1244. .add_field("s", OwnedValue::String(raw.to_string()))
  1245. .set_timestamp(now());
  1246. serialize_owned(&m, &mut buf);
  1247. println!("{}", buf);
  1248. buf.push_str("\n");
  1249. let buf_copy = buf.clone();
  1250. buf.push_str(&buf_copy);
  1251. println!("{}", buf);
  1252. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1253. let client = Client::new();
  1254. let req = InfluxWriter::http_req(&client, url.clone(), &buf, &None);
  1255. match req.send() {
  1256. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1257. Ok(mut resp) => {
  1258. resp.read_to_string(&mut server_resp).unwrap();
  1259. panic!("{}", server_resp);
  1260. }
  1261. Err(why) => {
  1262. panic!(why)
  1263. }
  1264. }
  1265. }
  1266. #[cfg(feature = "auth-tests")]
  1267. #[test]
  1268. fn it_sends_authenticated_measurements() {
  1269. let creds = InfluxWriter::get_credentials("auth_test_user".into(), Some("hot dog".into()));
  1270. let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
  1271. //let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  1272. //let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  1273. //let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  1274. //let root = slog::Logger::root(drain, o!("version" => "0.1"));
  1275. //let influx = InfluxWriter::with_logger_and_opt_creds("localhost", "auth_test", Some(creds), &root);
  1276. measure!(influx, auth_test_meas, i(n, 1));
  1277. drop(influx);
  1278. }
  1279. }