You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1167 lines
40KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. #[cfg(feature = "warnings")]
  8. use std::fs;
  9. use std::time::{Instant, Duration};
  10. use std::hash::BuildHasherDefault;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. #[cfg(feature = "zmq")]
  17. use zmq;
  18. #[allow(unused_imports)]
  19. use chrono::{DateTime, Utc};
  20. use ordermap::OrderMap;
  21. use fnv::FnvHasher;
  22. use decimal::d128;
  23. use uuid::Uuid;
  24. use smallvec::SmallVec;
  25. use slog::Logger;
  26. use super::{nanos, file_logger, LOG_LEVEL};
  27. #[cfg(feature = "warnings")]
  28. use warnings::Warning;
  29. pub use super::{dur_nanos, dt_nanos};
  30. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  31. pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096;
  32. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  33. Map::with_capacity_and_hasher(capacity, Default::default())
  34. }
  35. /// Created this so I know what types can be passed through the
  36. /// `measure!` macro, which used to convert with `as i64` and
  37. /// `as f64` until I accidentally passed a function name, and it
  38. /// still compiled, but with garbage numbers.
  39. pub trait AsI64 {
  40. fn as_i64(x: Self) -> i64;
  41. }
  42. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  43. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  44. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  45. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  46. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  47. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  48. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  49. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  50. impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  51. impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  52. impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  53. /// Created this so I know what types can be passed through the
  54. /// `measure!` macro, which used to convert with `as i64` and
  55. /// `as f64` until I accidentally passed a function name, and it
  56. /// still compiled, but with garbage numbers.
  57. pub trait AsF64 {
  58. fn as_f64(x: Self) -> f64;
  59. }
  60. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  61. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  62. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  63. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  64. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  65. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  66. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  67. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  68. ///
  69. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  70. /// values, as well as sends it with the `Sender`.
  71. ///
  72. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  73. /// measurement (see `tests` mod).
  74. ///
  75. /// # Examples
  76. ///
  77. /// ```
  78. /// #![feature(try_from)]
  79. /// #[macro_use] extern crate logging;
  80. /// extern crate decimal;
  81. ///
  82. /// use std::sync::mpsc::channel;
  83. /// use decimal::d128;
  84. /// use logging::influx::*;
  85. ///
  86. /// fn main() {
  87. /// let (tx, rx) = channel();
  88. ///
  89. /// // "shorthand" syntax
  90. ///
  91. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  92. ///
  93. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  94. ///
  95. /// assert_eq!(meas.key, "test");
  96. /// assert_eq!(meas.get_tag("color"), Some("red"));
  97. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  98. ///
  99. /// // alternate syntax ...
  100. ///
  101. /// measure!(tx, test,
  102. /// tag [ one => "a" ],
  103. /// tag [ two => "b" ],
  104. /// int [ three => 2 ],
  105. /// float [ four => 1.2345 ],
  106. /// string [ five => String::from("d") ],
  107. /// bool [ six => true ],
  108. /// int [ seven => { 1 + 2 } ],
  109. /// time [ 1 ]
  110. /// );
  111. ///
  112. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  113. ///
  114. /// assert_eq!(meas.key, "test");
  115. /// assert_eq!(meas.get_tag("one"), Some("a"));
  116. /// assert_eq!(meas.get_tag("two"), Some("b"));
  117. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  118. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  119. /// assert_eq!(meas.timestamp, Some(1));
  120. ///
  121. /// // use the @make_meas flag to skip sending a measurement, instead merely
  122. /// // creating it.
  123. ///
  124. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  125. ///
  126. /// // each variant also has shorthand aliases
  127. ///
  128. /// let meas: OwnedMeasurement =
  129. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  130. /// }
  131. /// ```
  132. ///
  133. #[macro_export]
  134. macro_rules! measure {
  135. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  136. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  137. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  138. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  139. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  140. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  141. (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) };
  142. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  143. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  144. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  145. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  146. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  147. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  148. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  149. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  150. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  151. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  152. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  153. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  154. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  155. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  156. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  157. (@as_expr $e:expr) => {$e};
  158. (@count_tags) => {0usize};
  159. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  160. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  161. (@count_fields) => {0usize};
  162. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  163. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  164. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  165. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  166. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  167. };
  168. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  169. let n_tags = measure!(@count_tags $($t)*);
  170. let n_fields = measure!(@count_fields $($t)*);
  171. let mut meas =
  172. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  173. $(
  174. measure!(@kv $t, meas, $($tail)*);
  175. )*
  176. meas
  177. }};
  178. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  179. measure!($m, $name, $($t [ $($tail)* ] ),+)
  180. };
  181. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  182. #[allow(unused_imports)]
  183. use $crate::influx::{AsI64, AsF64};
  184. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  185. let _ = $m.send(measurement);
  186. }};
  187. }
  188. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  189. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  190. /// measurements have accumulated.
  191. ///
  192. #[derive(Debug)]
  193. pub struct InfluxWriter {
  194. host: String,
  195. db: String,
  196. tx: Sender<Option<OwnedMeasurement>>,
  197. thread: Option<Arc<thread::JoinHandle<()>>>,
  198. }
  199. impl Default for InfluxWriter {
  200. fn default() -> Self {
  201. //if cfg!(any(test, feature = "test")) {
  202. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  203. //} else {
  204. InfluxWriter::new("localhost", "test", "/tmp/influx-test.log", 4096)
  205. //}
  206. }
  207. }
  208. impl Clone for InfluxWriter {
  209. fn clone(&self) -> Self {
  210. debug_assert!(self.thread.is_some());
  211. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  212. InfluxWriter {
  213. host: self.host.to_string(),
  214. db: self.db.to_string(),
  215. tx: self.tx.clone(),
  216. thread,
  217. }
  218. }
  219. }
  220. impl InfluxWriter {
  221. pub fn host(&self) -> &str { self.host.as_str() }
  222. pub fn db(&self) -> &str { self.db.as_str() }
  223. /// Sends the `OwnedMeasurement` to the serialization thread.
  224. ///
  225. #[cfg_attr(feature = "inlines", inline)]
  226. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  227. self.tx.send(Some(m))
  228. }
  229. #[cfg_attr(feature = "inlines", inline)]
  230. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  231. #[cfg_attr(feature = "inlines", inline)]
  232. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  233. #[cfg_attr(feature = "inlines", inline)]
  234. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  235. #[cfg_attr(feature = "inlines", inline)]
  236. pub fn rsecs(&self, d: Duration) -> f64 {
  237. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  238. * 1000.0)
  239. .round()
  240. / 1000.0
  241. }
  242. #[cfg_attr(feature = "inlines", inline)]
  243. pub fn secs(&self, d: Duration) -> f64 {
  244. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  245. }
  246. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  247. self.tx.clone()
  248. }
  249. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  250. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  251. Self::with_logger(host, db, buffer_size, logger)
  252. }
  253. #[allow(unused_assignments)]
  254. pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self {
  255. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  256. let buffer_size = INFLUX_WRITER_MAX_BUFFER;
  257. #[cfg(feature = "no-influx-buffer")]
  258. let buffer_size = 0usize;
  259. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  260. let url =
  261. Url::parse_with_params(&format!("http://{}:8086/write", host),
  262. &[("db", db), ("precision", "ns")])
  263. .expect("influx writer url should parse");
  264. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  265. const MAX_PENDING: Duration = Duration::from_secs(1);
  266. let client = Client::new();
  267. debug!(logger, "initializing buffers");
  268. let mut buf = String::with_capacity(32 * 32 * 32);
  269. let mut count = 0;
  270. let mut last = Instant::now();
  271. let mut loop_time = Instant::now();
  272. let send = |buf: &str| {
  273. let resp = client.post(url.clone())
  274. .body(buf)
  275. .send();
  276. match resp {
  277. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  278. debug!(logger, "server responded ok: 204 NoContent");
  279. }
  280. Ok(mut resp) => {
  281. let mut server_resp = String::with_capacity(32 * 1024); // need to allocate here bc will be
  282. // sent to logging thread
  283. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  284. error!(logger, "influx server error";
  285. "status" => resp.status.to_string(),
  286. "body" => server_resp);
  287. }
  288. Err(why) => {
  289. error!(logger, "http request failed: {:?}", why);
  290. }
  291. }
  292. };
  293. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: &Instant, last: &mut Instant| -> usize {
  294. match prev {
  295. 0 if buffer_size > 0 => {
  296. serialize_owned(m, buf);
  297. 1
  298. }
  299. n if n < buffer_size && *loop_time - *last < MAX_PENDING => {
  300. buf.push_str("\n");
  301. serialize_owned(m, buf);
  302. n + 1
  303. }
  304. n => {
  305. buf.push_str("\n");
  306. serialize_owned(m, buf);
  307. debug!(logger, "sending buffer to influx"; "len" => n);
  308. send(buf);
  309. *last = *loop_time;
  310. buf.clear();
  311. 0
  312. }
  313. }
  314. };
  315. loop {
  316. loop_time = Instant::now();
  317. match rx.recv() {
  318. Ok(Some(mut meas)) => {
  319. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  320. if meas.fields.is_empty() {
  321. meas.fields.push(("n", OwnedValue::Integer(1)));
  322. }
  323. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  324. count = next(count, &meas, &mut buf, &loop_time, &mut last);
  325. }
  326. Ok(None) => {
  327. warn!(logger, "terminate signal rcvd"; "count" => count);
  328. if buf.len() > 0 {
  329. info!(logger, "sending remaining buffer to influx on terminate"; "count" => count);
  330. let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1));
  331. count = next(buffer_size, &meas, &mut buf, &loop_time, &mut last);
  332. info!(logger, "triggered send of remaining buffer"; "count" => count);
  333. if !buf.is_empty() {
  334. warn!(logger, "buffer sill isn't empty after 'wtrterm' meas";
  335. "count" => count, "buf.len()" => buf.len());
  336. send(&buf);
  337. }
  338. }
  339. info!(logger, "exiting loop"; "count" => count, "buf.len()" => buf.len());
  340. break
  341. }
  342. _ => {
  343. thread::sleep(Duration::new(0, 1))
  344. }
  345. }
  346. }
  347. }).unwrap();
  348. InfluxWriter {
  349. host: host.to_string(),
  350. db: db.to_string(),
  351. tx,
  352. thread: Some(Arc::new(thread))
  353. }
  354. }
  355. }
  356. impl Drop for InfluxWriter {
  357. fn drop(&mut self) {
  358. if let Some(arc) = self.thread.take() {
  359. if let Ok(thread) = Arc::try_unwrap(arc) {
  360. let _ = self.tx.send(None);
  361. let _ = thread.join();
  362. }
  363. }
  364. }
  365. }
  366. #[cfg(feature = "zmq")]
  367. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  368. #[cfg(feature = "zmq")]
  369. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  370. let socket = ctx.socket(zmq::PULL)?;
  371. socket.bind(WRITER_ADDR)?;
  372. socket.set_rcvhwm(0)?;
  373. Ok(socket)
  374. }
  375. #[cfg(feature = "zmq")]
  376. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  377. let socket = ctx.socket(zmq::PUSH)?;
  378. socket.connect(WRITER_ADDR)?;
  379. socket.set_sndhwm(0)?;
  380. Ok(socket)
  381. }
  382. /// This removes offending things rather than escaping them.
  383. ///
  384. fn escape_tag(s: &str) -> String {
  385. s.replace(" ", "")
  386. .replace(",", "")
  387. .replace("\"", "")
  388. }
  389. fn escape(s: &str) -> String {
  390. s.replace(" ", "\\ ")
  391. .replace(",", "\\,")
  392. }
  393. fn as_string(s: &str) -> String {
  394. // the second replace removes double escapes
  395. //
  396. format!("\"{}\"", s.replace("\"", "\\\"")
  397. .replace(r#"\\""#, r#"\""#))
  398. }
  399. #[test]
  400. fn it_checks_as_string_does_not_double_escape() {
  401. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  402. let escaped = as_string(&raw);
  403. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  404. }
  405. fn as_integer(i: &i64) -> String {
  406. format!("{}i", i)
  407. }
  408. fn as_float(f: &f64) -> String {
  409. f.to_string()
  410. }
  411. fn as_boolean(b: &bool) -> &str {
  412. if *b { "t" } else { "f" }
  413. }
  414. pub fn now() -> i64 {
  415. nanos(Utc::now()) as i64
  416. }
  417. /// Serialize the measurement into influx line protocol
  418. /// and append to the buffer.
  419. ///
  420. /// # Examples
  421. ///
  422. /// ```
  423. /// extern crate influent;
  424. /// extern crate logging;
  425. ///
  426. /// use influent::measurement::{Measurement, Value};
  427. /// use std::string::String;
  428. /// use logging::influx::serialize;
  429. ///
  430. /// fn main() {
  431. /// let mut buf = String::new();
  432. /// let mut m = Measurement::new("test");
  433. /// m.add_field("x", Value::Integer(1));
  434. /// serialize(&m, &mut buf);
  435. /// }
  436. ///
  437. /// ```
  438. ///
  439. pub fn serialize(measurement: &Measurement, line: &mut String) {
  440. line.push_str(&escape(measurement.key));
  441. for (tag, value) in measurement.tags.iter() {
  442. line.push_str(",");
  443. line.push_str(&escape(tag));
  444. line.push_str("=");
  445. line.push_str(&escape(value));
  446. }
  447. let mut was_spaced = false;
  448. for (field, value) in measurement.fields.iter() {
  449. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  450. line.push_str(&escape(field));
  451. line.push_str("=");
  452. match value {
  453. &Value::String(ref s) => line.push_str(&as_string(s)),
  454. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  455. &Value::Float(ref f) => line.push_str(&as_float(f)),
  456. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  457. };
  458. }
  459. match measurement.timestamp {
  460. Some(t) => {
  461. line.push_str(" ");
  462. line.push_str(&t.to_string());
  463. }
  464. _ => {}
  465. }
  466. }
  467. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  468. ///
  469. /// The serialized measurement is appended to the end of the string without
  470. /// any regard for what exited in it previously.
  471. ///
  472. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  473. line.push_str(&escape_tag(measurement.key));
  474. let add_tag = |line: &mut String, key: &str, value: &str| {
  475. line.push_str(",");
  476. line.push_str(&escape_tag(key));
  477. line.push_str("=");
  478. line.push_str(&escape(value));
  479. };
  480. for &(key, value) in measurement.tags.iter() {
  481. add_tag(line, key, value);
  482. }
  483. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  484. if is_first { line.push_str(" "); } else { line.push_str(","); }
  485. line.push_str(&escape_tag(key));
  486. line.push_str("=");
  487. match *value {
  488. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  489. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  490. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  491. OwnedValue::D128(ref d) => {
  492. if d.is_finite() {
  493. line.push_str(&format!("{}", d));
  494. } else {
  495. line.push_str("0.0");
  496. }
  497. }
  498. OwnedValue::Float(ref f) => {
  499. if f.is_finite() {
  500. line.push_str(&format!("{}", f));
  501. } else {
  502. line.push_str("0.0");
  503. }
  504. }
  505. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  506. };
  507. };
  508. let mut fields = measurement.fields.iter();
  509. // first time separate from tags with space
  510. //
  511. fields.next().map(|kv| {
  512. add_field(line, &kv.0, &kv.1, true);
  513. });
  514. // then seperate the rest w/ comma
  515. //
  516. for kv in fields {
  517. add_field(line, kv.0, &kv.1, false);
  518. }
  519. if let Some(t) = measurement.timestamp {
  520. line.push_str(" ");
  521. line.push_str(&t.to_string());
  522. }
  523. }
  524. #[cfg(feature = "warnings")]
  525. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  526. #[cfg(feature = "zmq")]
  527. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  528. assert!(false);
  529. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  530. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  531. let _ = fs::create_dir("/tmp/mm");
  532. let ctx = zmq::Context::new();
  533. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  534. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  535. let client = Client::new();
  536. let mut buf = String::with_capacity(4096);
  537. let mut server_resp = String::with_capacity(4096);
  538. let mut count = 0;
  539. loop {
  540. if let Ok(bytes) = socket.recv_bytes(0) {
  541. if let Ok(msg) = String::from_utf8(bytes) {
  542. count = match count {
  543. 0 => {
  544. buf.push_str(&msg);
  545. 1
  546. }
  547. n @ 1...40 => {
  548. buf.push_str("\n");
  549. buf.push_str(&msg);
  550. n + 1
  551. }
  552. _ => {
  553. buf.push_str("\n");
  554. buf.push_str(&msg);
  555. match client.post(url.clone())
  556. .body(&buf)
  557. .send() {
  558. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  559. Ok(mut resp) => {
  560. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  561. let _ = warnings.send(
  562. Warning::Error(
  563. format!("Influx server: {}", server_resp)));
  564. server_resp.clear();
  565. }
  566. Err(why) => {
  567. let _ = warnings.send(
  568. Warning::Error(
  569. format!("Influx write error: {}", why)));
  570. }
  571. }
  572. buf.clear();
  573. 0
  574. }
  575. }
  576. }
  577. }
  578. }
  579. }).unwrap()
  580. }
  581. #[derive(Debug, Clone, PartialEq)]
  582. pub enum OwnedValue {
  583. String(String),
  584. Float(f64),
  585. Integer(i64),
  586. Boolean(bool),
  587. D128(d128),
  588. Uuid(Uuid),
  589. }
  590. /// Holds data meant for an influxdb measurement in transit to the
  591. /// writing thread.
  592. ///
  593. /// TODO: convert `Map` to `SmallVec`?
  594. ///
  595. #[derive(Clone, Debug)]
  596. pub struct OwnedMeasurement {
  597. pub key: &'static str,
  598. pub timestamp: Option<i64>,
  599. //pub fields: Map<&'static str, OwnedValue>,
  600. //pub tags: Map<&'static str, &'static str>,
  601. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  602. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  603. }
  604. impl OwnedMeasurement {
  605. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  606. OwnedMeasurement {
  607. key,
  608. timestamp: None,
  609. tags: SmallVec::with_capacity(n_tags),
  610. fields: SmallVec::with_capacity(n_fields),
  611. }
  612. }
  613. pub fn new(key: &'static str) -> Self {
  614. OwnedMeasurement {
  615. key,
  616. timestamp: None,
  617. tags: SmallVec::new(),
  618. fields: SmallVec::new(),
  619. }
  620. }
  621. /// Unusual consuming `self` signature because primarily used by
  622. /// the `measure!` macro.
  623. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  624. self.tags.push((key, value));
  625. self
  626. }
  627. /// Unusual consuming `self` signature because primarily used by
  628. /// the `measure!` macro.
  629. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  630. self.fields.push((key, value));
  631. self
  632. }
  633. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  634. self.timestamp = Some(timestamp);
  635. self
  636. }
  637. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  638. match self.tags.iter().position(|kv| kv.0 == key) {
  639. Some(i) => {
  640. self.tags.get_mut(i)
  641. .map(|x| {
  642. x.0 = value;
  643. });
  644. self
  645. }
  646. None => {
  647. self.add_tag(key, value)
  648. }
  649. }
  650. }
  651. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  652. self.fields.iter()
  653. .find(|kv| kv.0 == key)
  654. .map(|kv| &kv.1)
  655. }
  656. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  657. self.tags.iter()
  658. .find(|kv| kv.0 == key)
  659. .map(|kv| kv.1)
  660. }
  661. }
  662. #[allow(unused_imports, unused_variables)]
  663. #[cfg(test)]
  664. mod tests {
  665. use super::*;
  666. use test::{black_box, Bencher};
  667. #[test]
  668. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  669. const VERSION: &str = "0.3.90";
  670. let tag_value = "one";
  671. let color = "red";
  672. let time = Utc::now();
  673. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  674. assert_eq!(m.get_tag("color"), Some("red"));
  675. assert_eq!(m.get_tag("version"), Some(VERSION));
  676. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  677. }
  678. #[test]
  679. fn it_uses_the_v_for_version_shortcut() {
  680. const VERSION: &str = "0.3.90";
  681. let tag_value = "one";
  682. let color = "red";
  683. let time = now();
  684. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  685. assert_eq!(m.get_tag("color"), Some("red"));
  686. assert_eq!(m.get_tag("version"), Some(VERSION));
  687. assert_eq!(m.timestamp, Some(time));
  688. }
  689. #[test]
  690. fn it_uses_the_new_tag_k_only_shortcut() {
  691. let tag_value = "one";
  692. let color = "red";
  693. let time = now();
  694. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  695. assert_eq!(m.get_tag("color"), Some("red"));
  696. assert_eq!(m.get_tag("tag_value"), Some("one"));
  697. assert_eq!(m.timestamp, Some(time));
  698. }
  699. #[test]
  700. fn it_uses_measure_macro_parenthesis_syntax() {
  701. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  702. assert_eq!(m.key, "test");
  703. assert_eq!(m.get_tag("a"), Some("b"));
  704. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  705. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  706. assert_eq!(m.timestamp, Some(1));
  707. }
  708. #[test]
  709. fn it_uses_measure_macro_on_a_self_attribute() {
  710. struct A {
  711. pub influx: InfluxWriter,
  712. }
  713. impl A {
  714. fn f(&self) {
  715. measure!(self.influx, test, t(color, "red"), i(n, 1));
  716. }
  717. }
  718. let a = A { influx: InfluxWriter::default() };
  719. a.f();
  720. }
  721. #[test]
  722. fn it_clones_an_influx_writer_to_check_both_drop() {
  723. let influx = InfluxWriter::default();
  724. measure!(influx, drop_test, i(a, 1), i(b, 2));
  725. {
  726. let influx = influx.clone();
  727. thread::spawn(move || {
  728. measure!(influx, drop_test, i(a, 3), i(b, 4));
  729. });
  730. }
  731. }
  732. #[bench]
  733. fn influx_writer_send_basic(b: &mut Bencher) {
  734. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  735. b.iter(|| {
  736. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  737. });
  738. }
  739. #[bench]
  740. fn influx_writer_send_price(b: &mut Bencher) {
  741. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  742. b.iter(|| {
  743. measure!(m, test,
  744. t(ticker, t!(xmr-btc).as_str()),
  745. t(exchange, "plnx"),
  746. d(bid, d128::zero()),
  747. d(ask, d128::zero()),
  748. );
  749. });
  750. }
  751. #[test]
  752. fn it_checks_color_tag_error_in_non_doctest() {
  753. let (tx, rx) = channel();
  754. measure!(tx, test, tag[color;"red"], int[n;1]);
  755. let meas: OwnedMeasurement = rx.recv().unwrap();
  756. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  757. }
  758. #[test]
  759. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  760. let meas = measure!(@make_meas test_measurement,
  761. tag [ one => "a" ],
  762. tag [ two => "b" ],
  763. int [ three => 2 ],
  764. float [ four => 1.2345 ],
  765. string [ five => String::from("d") ],
  766. bool [ six => true ],
  767. int [ seven => { 1 + 2 } ],
  768. time [ 1 ]
  769. );
  770. assert_eq!(meas.key, "test_measurement");
  771. assert_eq!(meas.get_tag("one"), Some("a"));
  772. assert_eq!(meas.get_tag("two"), Some("b"));
  773. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  774. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  775. assert_eq!(meas.timestamp, Some(1));
  776. }
  777. #[test]
  778. fn it_uses_the_measure_macro() {
  779. let (tx, rx) = channel();
  780. measure!(tx, test_measurement,
  781. tag [ one => "a" ],
  782. tag [ two => "b" ],
  783. int [ three => 2 ],
  784. float [ four => 1.2345 ],
  785. string [ five => String::from("d") ],
  786. bool [ six => true ],
  787. int [ seven => { 1 + 2 } ],
  788. time [ 1 ]
  789. );
  790. thread::sleep(Duration::from_millis(10));
  791. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  792. assert_eq!(meas.key, "test_measurement");
  793. assert_eq!(meas.get_tag("one"), Some("a"));
  794. assert_eq!(meas.get_tag("two"), Some("b"));
  795. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  796. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  797. assert_eq!(meas.timestamp, Some(1));
  798. }
  799. #[test]
  800. fn it_uses_measure_macro_for_d128_and_uuid() {
  801. let (tx, rx) = channel();
  802. let u = Uuid::new_v4();
  803. let d = d128::zero();
  804. let t = now();
  805. measure!(tx, test_measurement,
  806. tag[one; "a"],
  807. d128[two; d],
  808. uuid[three; u],
  809. time[t]
  810. );
  811. thread::sleep(Duration::from_millis(10));
  812. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  813. assert_eq!(meas.key, "test_measurement");
  814. assert_eq!(meas.get_tag("one"), Some("a"));
  815. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  816. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  817. assert_eq!(meas.timestamp, Some(t));
  818. }
  819. #[test]
  820. fn it_uses_the_measure_macro_alt_syntax() {
  821. let (tx, rx) = channel();
  822. measure!(tx, test_measurement,
  823. tag[one; "a"],
  824. tag[two; "b"],
  825. int[three; 2],
  826. float[four; 1.2345],
  827. string[five; String::from("d")],
  828. bool [ six => true ],
  829. int[seven; { 1 + 2 }],
  830. time[1]
  831. );
  832. thread::sleep(Duration::from_millis(10));
  833. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  834. assert_eq!(meas.key, "test_measurement");
  835. assert_eq!(meas.get_tag("one"), Some("a"));
  836. assert_eq!(meas.get_tag("two"), Some("b"));
  837. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  838. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  839. assert_eq!(meas.timestamp, Some(1));
  840. }
  841. #[test]
  842. fn it_checks_that_fields_are_separated_correctly() {
  843. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  844. assert_eq!(m.key, "test");
  845. assert_eq!(m.get_tag("a"), Some("one"));
  846. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  847. let mut buf = String::new();
  848. serialize_owned(&m, &mut buf);
  849. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  850. }
  851. #[test]
  852. fn try_to_break_measure_macro() {
  853. let (tx, _) = channel();
  854. measure!(tx, one, tag[x=>"y"], int[n;1]);
  855. measure!(tx, one, tag[x;"y"], int[n;1],);
  856. struct A {
  857. pub one: i32,
  858. pub two: i32,
  859. }
  860. struct B {
  861. pub a: A
  862. }
  863. let b = B { a: A { one: 1, two: 2 } };
  864. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  865. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  866. }
  867. #[bench]
  868. fn measure_macro_small(b: &mut Bencher) {
  869. let (tx, rx) = channel();
  870. let listener = thread::spawn(move || {
  871. loop { if rx.recv().is_err() { break } }
  872. });
  873. b.iter(|| {
  874. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  875. });
  876. }
  877. #[bench]
  878. fn measure_macro_medium(b: &mut Bencher) {
  879. let (tx, rx) = channel();
  880. let listener = thread::spawn(move || {
  881. loop { if rx.recv().is_err() { break } }
  882. });
  883. b.iter(|| {
  884. measure!(tx, test,
  885. tag[color; "red"],
  886. tag[mood => "playful"],
  887. tag [ ticker => "xmr_btc" ],
  888. float[ price => 1.2345 ],
  889. float[ amount => 56.323],
  890. int[n; 1],
  891. time[now()]
  892. );
  893. });
  894. }
  895. #[cfg(feature = "zmq")]
  896. #[cfg(feature = "warnings")]
  897. #[test]
  898. #[ignore]
  899. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  900. let ctx = zmq::Context::new();
  901. let socket = push(&ctx).unwrap();
  902. let (tx, rx) = channel();
  903. let w = writer(tx.clone());
  904. let mut buf = String::with_capacity(4096);
  905. let mut meas = Measurement::new("rust_test");
  906. meas.add_tag("a", "t");
  907. meas.add_field("c", Value::Float(1.23456));
  908. let now = now();
  909. meas.set_timestamp(now);
  910. serialize(&meas, &mut buf);
  911. socket.send_str(&buf, 0).unwrap();
  912. drop(w);
  913. }
  914. #[test]
  915. fn it_serializes_a_measurement_in_place() {
  916. let mut buf = String::with_capacity(4096);
  917. let mut meas = Measurement::new("rust_test");
  918. meas.add_tag("a", "b");
  919. meas.add_field("c", Value::Float(1.0));
  920. let now = now();
  921. meas.set_timestamp(now);
  922. serialize(&meas, &mut buf);
  923. let ans = format!("rust_test,a=b c=1 {}", now);
  924. assert_eq!(buf, ans);
  925. }
  926. #[test]
  927. fn it_serializes_a_hard_to_serialize_message() {
  928. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  929. let mut buf = String::new();
  930. let mut server_resp = String::new();
  931. let mut m = Measurement::new("rust_test");
  932. m.add_field("s", Value::String(&raw));
  933. let now = now();
  934. m.set_timestamp(now);
  935. serialize(&m, &mut buf);
  936. println!("{}", buf);
  937. buf.push_str("\n");
  938. let buf_copy = buf.clone();
  939. buf.push_str(&buf_copy);
  940. println!("{}", buf);
  941. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  942. let client = Client::new();
  943. match client.post(url.clone())
  944. .body(&buf)
  945. .send() {
  946. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  947. Ok(mut resp) => {
  948. resp.read_to_string(&mut server_resp).unwrap();
  949. panic!("{}", server_resp);
  950. }
  951. Err(why) => {
  952. panic!(why)
  953. }
  954. }
  955. }
  956. #[bench]
  957. fn serialize_owned_longer(b: &mut Bencher) {
  958. let mut buf = String::with_capacity(1024);
  959. let m =
  960. OwnedMeasurement::new("test")
  961. .add_tag("one", "a")
  962. .add_tag("two", "b")
  963. .add_tag("ticker", "xmr_btc")
  964. .add_tag("exchange", "plnx")
  965. .add_tag("side", "bid")
  966. .add_field("three", OwnedValue::Float(1.2345))
  967. .add_field("four", OwnedValue::Integer(57))
  968. .add_field("five", OwnedValue::Boolean(true))
  969. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  970. .set_timestamp(now());
  971. b.iter(|| {
  972. serialize_owned(&m, &mut buf);
  973. buf.clear()
  974. });
  975. }
  976. #[bench]
  977. fn serialize_owned_simple(b: &mut Bencher) {
  978. let mut buf = String::with_capacity(1024);
  979. let m =
  980. OwnedMeasurement::new("test")
  981. .add_tag("one", "a")
  982. .add_tag("two", "b")
  983. .add_field("three", OwnedValue::Float(1.2345))
  984. .add_field("four", OwnedValue::Integer(57))
  985. .set_timestamp(now());
  986. b.iter(|| {
  987. serialize_owned(&m, &mut buf);
  988. buf.clear()
  989. });
  990. }
  991. #[test]
  992. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  993. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  994. let mut buf = String::new();
  995. let mut server_resp = String::new();
  996. let m = OwnedMeasurement::new("rust_test")
  997. .add_field("s", OwnedValue::String(raw.to_string()))
  998. .set_timestamp(now());
  999. serialize_owned(&m, &mut buf);
  1000. println!("{}", buf);
  1001. buf.push_str("\n");
  1002. let buf_copy = buf.clone();
  1003. buf.push_str(&buf_copy);
  1004. println!("{}", buf);
  1005. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1006. let client = Client::new();
  1007. match client.post(url.clone())
  1008. .body(&buf)
  1009. .send() {
  1010. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1011. Ok(mut resp) => {
  1012. resp.read_to_string(&mut server_resp).unwrap();
  1013. panic!("{}", server_resp);
  1014. }
  1015. Err(why) => {
  1016. panic!(why)
  1017. }
  1018. }
  1019. }
  1020. }