You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1177 lines
40KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. #[cfg(feature = "warnings")]
  8. use std::fs;
  9. use std::time::{Instant, Duration};
  10. use std::hash::BuildHasherDefault;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. #[cfg(feature = "zmq")]
  17. use zmq;
  18. #[allow(unused_imports)]
  19. use chrono::{DateTime, Utc};
  20. use ordermap::OrderMap;
  21. use fnv::FnvHasher;
  22. use decimal::d128;
  23. use uuid::Uuid;
  24. use smallvec::SmallVec;
  25. use slog::Logger;
  26. use super::{nanos, file_logger, LOG_LEVEL};
  27. #[cfg(feature = "warnings")]
  28. use warnings::Warning;
  29. pub use super::{dur_nanos, dt_nanos};
  30. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  31. pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096;
  32. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  33. Map::with_capacity_and_hasher(capacity, Default::default())
  34. }
  35. /// Created this so I know what types can be passed through the
  36. /// `measure!` macro, which used to convert with `as i64` and
  37. /// `as f64` until I accidentally passed a function name, and it
  38. /// still compiled, but with garbage numbers.
  39. pub trait AsI64 {
  40. fn as_i64(x: Self) -> i64;
  41. }
  42. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  43. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  44. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  45. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  46. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  47. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  48. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  49. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  50. impl AsI64 for i16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  51. impl AsI64 for u8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  52. impl AsI64 for i8 { fn as_i64(x: Self) -> i64 { x as i64 } }
  53. /// Created this so I know what types can be passed through the
  54. /// `measure!` macro, which used to convert with `as i64` and
  55. /// `as f64` until I accidentally passed a function name, and it
  56. /// still compiled, but with garbage numbers.
  57. pub trait AsF64 {
  58. fn as_f64(x: Self) -> f64;
  59. }
  60. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  61. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  62. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  63. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  64. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  65. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  66. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  67. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  68. ///
  69. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  70. /// values, as well as sends it with the `Sender`.
  71. ///
  72. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  73. /// measurement (see `tests` mod).
  74. ///
  75. /// # Examples
  76. ///
  77. /// ```
  78. /// #![feature(try_from)]
  79. /// #[macro_use] extern crate logging;
  80. /// extern crate decimal;
  81. ///
  82. /// use std::sync::mpsc::channel;
  83. /// use decimal::d128;
  84. /// use logging::influx::*;
  85. ///
  86. /// fn main() {
  87. /// let (tx, rx) = channel();
  88. ///
  89. /// // "shorthand" syntax
  90. ///
  91. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  92. ///
  93. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  94. ///
  95. /// assert_eq!(meas.key, "test");
  96. /// assert_eq!(meas.get_tag("color"), Some("red"));
  97. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  98. ///
  99. /// // alternate syntax ...
  100. ///
  101. /// measure!(tx, test,
  102. /// tag [ one => "a" ],
  103. /// tag [ two => "b" ],
  104. /// int [ three => 2 ],
  105. /// float [ four => 1.2345 ],
  106. /// string [ five => String::from("d") ],
  107. /// bool [ six => true ],
  108. /// int [ seven => { 1 + 2 } ],
  109. /// time [ 1 ]
  110. /// );
  111. ///
  112. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  113. ///
  114. /// assert_eq!(meas.key, "test");
  115. /// assert_eq!(meas.get_tag("one"), Some("a"));
  116. /// assert_eq!(meas.get_tag("two"), Some("b"));
  117. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  118. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  119. /// assert_eq!(meas.timestamp, Some(1));
  120. ///
  121. /// // use the @make_meas flag to skip sending a measurement, instead merely
  122. /// // creating it.
  123. ///
  124. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  125. ///
  126. /// // each variant also has shorthand aliases
  127. ///
  128. /// let meas: OwnedMeasurement =
  129. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  130. /// }
  131. /// ```
  132. ///
  133. #[macro_export]
  134. macro_rules! measure {
  135. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  136. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  137. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  138. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  139. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  140. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  141. (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) };
  142. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  143. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  144. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  145. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  146. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  147. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  148. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  149. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  150. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  151. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  152. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  153. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  154. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  155. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  156. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  157. (@as_expr $e:expr) => {$e};
  158. (@count_tags) => {0usize};
  159. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  160. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  161. (@count_fields) => {0usize};
  162. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  163. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  164. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  165. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  166. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  167. };
  168. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  169. let n_tags = measure!(@count_tags $($t)*);
  170. let n_fields = measure!(@count_fields $($t)*);
  171. let mut meas =
  172. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  173. $(
  174. measure!(@kv $t, meas, $($tail)*);
  175. )*
  176. meas
  177. }};
  178. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  179. measure!($m, $name, $($t [ $($tail)* ] ),+)
  180. };
  181. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  182. #[allow(unused_imports)]
  183. use $crate::influx::{AsI64, AsF64};
  184. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  185. let _ = $m.send(measurement);
  186. }};
  187. }
  188. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  189. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  190. /// measurements have accumulated.
  191. ///
  192. #[derive(Debug)]
  193. pub struct InfluxWriter {
  194. host: String,
  195. db: String,
  196. tx: Sender<Option<OwnedMeasurement>>,
  197. thread: Option<Arc<thread::JoinHandle<()>>>,
  198. }
  199. impl Default for InfluxWriter {
  200. fn default() -> Self {
  201. //if cfg!(any(test, feature = "test")) {
  202. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  203. //} else {
  204. InfluxWriter::new("localhost", "test", "/tmp/influx-test.log", 4096)
  205. //}
  206. }
  207. }
  208. impl Clone for InfluxWriter {
  209. fn clone(&self) -> Self {
  210. debug_assert!(self.thread.is_some());
  211. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  212. InfluxWriter {
  213. host: self.host.to_string(),
  214. db: self.db.to_string(),
  215. tx: self.tx.clone(),
  216. thread,
  217. }
  218. }
  219. }
  220. impl InfluxWriter {
  221. pub fn host(&self) -> &str { self.host.as_str() }
  222. pub fn db(&self) -> &str { self.db.as_str() }
  223. /// Sends the `OwnedMeasurement` to the serialization thread.
  224. ///
  225. #[cfg_attr(feature = "inlines", inline)]
  226. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  227. self.tx.send(Some(m))
  228. }
  229. #[cfg_attr(feature = "inlines", inline)]
  230. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  231. #[cfg_attr(feature = "inlines", inline)]
  232. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  233. #[cfg_attr(feature = "inlines", inline)]
  234. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  235. #[cfg_attr(feature = "inlines", inline)]
  236. pub fn rsecs(&self, d: Duration) -> f64 {
  237. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  238. * 1000.0)
  239. .round()
  240. / 1000.0
  241. }
  242. #[cfg_attr(feature = "inlines", inline)]
  243. pub fn secs(&self, d: Duration) -> f64 {
  244. d.as_secs() as f64 + d.subsec_nanos() as f64 / 1_000_000_000_f64
  245. }
  246. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  247. self.tx.clone()
  248. }
  249. pub fn placeholder() -> Self {
  250. let (tx, _) = channel();
  251. Self {
  252. host: String::new(),
  253. db: String::new(),
  254. tx,
  255. thread: None,
  256. }
  257. }
  258. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  259. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  260. Self::with_logger(host, db, buffer_size, logger)
  261. }
  262. #[allow(unused_assignments)]
  263. pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self {
  264. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  265. let buffer_size = INFLUX_WRITER_MAX_BUFFER;
  266. #[cfg(feature = "no-influx-buffer")]
  267. let buffer_size = 0usize;
  268. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  269. let url =
  270. Url::parse_with_params(&format!("http://{}:8086/write", host),
  271. &[("db", db), ("precision", "ns")])
  272. .expect("influx writer url should parse");
  273. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  274. const MAX_PENDING: Duration = Duration::from_secs(1);
  275. let client = Client::new();
  276. debug!(logger, "initializing buffers");
  277. let mut buf = String::with_capacity(32 * 32 * 32);
  278. let mut count = 0;
  279. let mut last = Instant::now();
  280. let mut loop_time = Instant::now();
  281. let send = |buf: &str| {
  282. let resp = client.post(url.clone())
  283. .body(buf)
  284. .send();
  285. match resp {
  286. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  287. debug!(logger, "server responded ok: 204 NoContent");
  288. }
  289. Ok(mut resp) => {
  290. let mut server_resp = String::with_capacity(32 * 1024); // need to allocate here bc will be
  291. // sent to logging thread
  292. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  293. error!(logger, "influx server error";
  294. "status" => resp.status.to_string(),
  295. "body" => server_resp);
  296. }
  297. Err(why) => {
  298. error!(logger, "http request failed: {:?}", why);
  299. }
  300. }
  301. };
  302. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: &Instant, last: &mut Instant| -> usize {
  303. match prev {
  304. 0 if buffer_size > 0 => {
  305. serialize_owned(m, buf);
  306. 1
  307. }
  308. n if n < buffer_size && *loop_time - *last < MAX_PENDING => {
  309. buf.push_str("\n");
  310. serialize_owned(m, buf);
  311. n + 1
  312. }
  313. n => {
  314. buf.push_str("\n");
  315. serialize_owned(m, buf);
  316. debug!(logger, "sending buffer to influx"; "len" => n);
  317. send(buf);
  318. *last = *loop_time;
  319. buf.clear();
  320. 0
  321. }
  322. }
  323. };
  324. loop {
  325. loop_time = Instant::now();
  326. match rx.recv() {
  327. Ok(Some(mut meas)) => {
  328. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  329. if meas.fields.is_empty() {
  330. meas.fields.push(("n", OwnedValue::Integer(1)));
  331. }
  332. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  333. count = next(count, &meas, &mut buf, &loop_time, &mut last);
  334. }
  335. Ok(None) => {
  336. warn!(logger, "terminate signal rcvd"; "count" => count);
  337. if buf.len() > 0 {
  338. info!(logger, "sending remaining buffer to influx on terminate"; "count" => count);
  339. let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1));
  340. count = next(buffer_size, &meas, &mut buf, &loop_time, &mut last);
  341. info!(logger, "triggered send of remaining buffer"; "count" => count);
  342. if !buf.is_empty() {
  343. warn!(logger, "buffer sill isn't empty after 'wtrterm' meas";
  344. "count" => count, "buf.len()" => buf.len());
  345. send(&buf);
  346. }
  347. }
  348. info!(logger, "exiting loop"; "count" => count, "buf.len()" => buf.len());
  349. break
  350. }
  351. _ => {
  352. thread::sleep(Duration::new(0, 1))
  353. }
  354. }
  355. }
  356. }).unwrap();
  357. InfluxWriter {
  358. host: host.to_string(),
  359. db: db.to_string(),
  360. tx,
  361. thread: Some(Arc::new(thread))
  362. }
  363. }
  364. }
  365. impl Drop for InfluxWriter {
  366. fn drop(&mut self) {
  367. if let Some(arc) = self.thread.take() {
  368. if let Ok(thread) = Arc::try_unwrap(arc) {
  369. let _ = self.tx.send(None);
  370. let _ = thread.join();
  371. }
  372. }
  373. }
  374. }
  375. #[cfg(feature = "zmq")]
  376. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  377. #[cfg(feature = "zmq")]
  378. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  379. let socket = ctx.socket(zmq::PULL)?;
  380. socket.bind(WRITER_ADDR)?;
  381. socket.set_rcvhwm(0)?;
  382. Ok(socket)
  383. }
  384. #[cfg(feature = "zmq")]
  385. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  386. let socket = ctx.socket(zmq::PUSH)?;
  387. socket.connect(WRITER_ADDR)?;
  388. socket.set_sndhwm(0)?;
  389. Ok(socket)
  390. }
  391. /// This removes offending things rather than escaping them.
  392. ///
  393. fn escape_tag(s: &str) -> String {
  394. s.replace(" ", "")
  395. .replace(",", "")
  396. .replace("\"", "")
  397. }
  398. fn escape(s: &str) -> String {
  399. s.replace(" ", "\\ ")
  400. .replace(",", "\\,")
  401. }
  402. fn as_string(s: &str) -> String {
  403. // the second replace removes double escapes
  404. //
  405. format!("\"{}\"", s.replace("\"", "\\\"")
  406. .replace(r#"\\""#, r#"\""#))
  407. }
  408. #[test]
  409. fn it_checks_as_string_does_not_double_escape() {
  410. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  411. let escaped = as_string(&raw);
  412. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  413. }
  414. fn as_integer(i: &i64) -> String {
  415. format!("{}i", i)
  416. }
  417. fn as_float(f: &f64) -> String {
  418. f.to_string()
  419. }
  420. fn as_boolean(b: &bool) -> &str {
  421. if *b { "t" } else { "f" }
  422. }
  423. pub fn now() -> i64 {
  424. nanos(Utc::now()) as i64
  425. }
  426. /// Serialize the measurement into influx line protocol
  427. /// and append to the buffer.
  428. ///
  429. /// # Examples
  430. ///
  431. /// ```
  432. /// extern crate influent;
  433. /// extern crate logging;
  434. ///
  435. /// use influent::measurement::{Measurement, Value};
  436. /// use std::string::String;
  437. /// use logging::influx::serialize;
  438. ///
  439. /// fn main() {
  440. /// let mut buf = String::new();
  441. /// let mut m = Measurement::new("test");
  442. /// m.add_field("x", Value::Integer(1));
  443. /// serialize(&m, &mut buf);
  444. /// }
  445. ///
  446. /// ```
  447. ///
  448. pub fn serialize(measurement: &Measurement, line: &mut String) {
  449. line.push_str(&escape(measurement.key));
  450. for (tag, value) in measurement.tags.iter() {
  451. line.push_str(",");
  452. line.push_str(&escape(tag));
  453. line.push_str("=");
  454. line.push_str(&escape(value));
  455. }
  456. let mut was_spaced = false;
  457. for (field, value) in measurement.fields.iter() {
  458. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  459. line.push_str(&escape(field));
  460. line.push_str("=");
  461. match value {
  462. &Value::String(ref s) => line.push_str(&as_string(s)),
  463. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  464. &Value::Float(ref f) => line.push_str(&as_float(f)),
  465. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  466. };
  467. }
  468. match measurement.timestamp {
  469. Some(t) => {
  470. line.push_str(" ");
  471. line.push_str(&t.to_string());
  472. }
  473. _ => {}
  474. }
  475. }
  476. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  477. ///
  478. /// The serialized measurement is appended to the end of the string without
  479. /// any regard for what exited in it previously.
  480. ///
  481. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  482. line.push_str(&escape_tag(measurement.key));
  483. let add_tag = |line: &mut String, key: &str, value: &str| {
  484. line.push_str(",");
  485. line.push_str(&escape_tag(key));
  486. line.push_str("=");
  487. line.push_str(&escape(value));
  488. };
  489. for &(key, value) in measurement.tags.iter() {
  490. add_tag(line, key, value);
  491. }
  492. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  493. if is_first { line.push_str(" "); } else { line.push_str(","); }
  494. line.push_str(&escape_tag(key));
  495. line.push_str("=");
  496. match *value {
  497. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  498. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  499. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  500. OwnedValue::D128(ref d) => {
  501. if d.is_finite() {
  502. line.push_str(&format!("{}", d));
  503. } else {
  504. line.push_str("0.0");
  505. }
  506. }
  507. OwnedValue::Float(ref f) => {
  508. if f.is_finite() {
  509. line.push_str(&format!("{}", f));
  510. } else {
  511. line.push_str("-999.0");
  512. }
  513. }
  514. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  515. };
  516. };
  517. let mut fields = measurement.fields.iter();
  518. // first time separate from tags with space
  519. //
  520. fields.next().map(|kv| {
  521. add_field(line, &kv.0, &kv.1, true);
  522. });
  523. // then seperate the rest w/ comma
  524. //
  525. for kv in fields {
  526. add_field(line, kv.0, &kv.1, false);
  527. }
  528. if let Some(t) = measurement.timestamp {
  529. line.push_str(" ");
  530. line.push_str(&t.to_string());
  531. }
  532. }
  533. #[cfg(feature = "warnings")]
  534. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  535. #[cfg(feature = "zmq")]
  536. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  537. assert!(false);
  538. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  539. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  540. let _ = fs::create_dir("/tmp/mm");
  541. let ctx = zmq::Context::new();
  542. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  543. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  544. let client = Client::new();
  545. let mut buf = String::with_capacity(4096);
  546. let mut server_resp = String::with_capacity(4096);
  547. let mut count = 0;
  548. loop {
  549. if let Ok(bytes) = socket.recv_bytes(0) {
  550. if let Ok(msg) = String::from_utf8(bytes) {
  551. count = match count {
  552. 0 => {
  553. buf.push_str(&msg);
  554. 1
  555. }
  556. n @ 1...40 => {
  557. buf.push_str("\n");
  558. buf.push_str(&msg);
  559. n + 1
  560. }
  561. _ => {
  562. buf.push_str("\n");
  563. buf.push_str(&msg);
  564. match client.post(url.clone())
  565. .body(&buf)
  566. .send() {
  567. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  568. Ok(mut resp) => {
  569. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  570. let _ = warnings.send(
  571. Warning::Error(
  572. format!("Influx server: {}", server_resp)));
  573. server_resp.clear();
  574. }
  575. Err(why) => {
  576. let _ = warnings.send(
  577. Warning::Error(
  578. format!("Influx write error: {}", why)));
  579. }
  580. }
  581. buf.clear();
  582. 0
  583. }
  584. }
  585. }
  586. }
  587. }
  588. }).unwrap()
  589. }
  590. #[derive(Debug, Clone, PartialEq)]
  591. pub enum OwnedValue {
  592. String(String),
  593. Float(f64),
  594. Integer(i64),
  595. Boolean(bool),
  596. D128(d128),
  597. Uuid(Uuid),
  598. }
  599. /// Holds data meant for an influxdb measurement in transit to the
  600. /// writing thread.
  601. ///
  602. /// TODO: convert `Map` to `SmallVec`?
  603. ///
  604. #[derive(Clone, Debug)]
  605. pub struct OwnedMeasurement {
  606. pub key: &'static str,
  607. pub timestamp: Option<i64>,
  608. //pub fields: Map<&'static str, OwnedValue>,
  609. //pub tags: Map<&'static str, &'static str>,
  610. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  611. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  612. }
  613. impl OwnedMeasurement {
  614. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  615. OwnedMeasurement {
  616. key,
  617. timestamp: None,
  618. tags: SmallVec::with_capacity(n_tags),
  619. fields: SmallVec::with_capacity(n_fields),
  620. }
  621. }
  622. pub fn new(key: &'static str) -> Self {
  623. OwnedMeasurement {
  624. key,
  625. timestamp: None,
  626. tags: SmallVec::new(),
  627. fields: SmallVec::new(),
  628. }
  629. }
  630. /// Unusual consuming `self` signature because primarily used by
  631. /// the `measure!` macro.
  632. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  633. self.tags.push((key, value));
  634. self
  635. }
  636. /// Unusual consuming `self` signature because primarily used by
  637. /// the `measure!` macro.
  638. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  639. self.fields.push((key, value));
  640. self
  641. }
  642. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  643. self.timestamp = Some(timestamp);
  644. self
  645. }
  646. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  647. match self.tags.iter().position(|kv| kv.0 == key) {
  648. Some(i) => {
  649. self.tags.get_mut(i)
  650. .map(|x| {
  651. x.0 = value;
  652. });
  653. self
  654. }
  655. None => {
  656. self.add_tag(key, value)
  657. }
  658. }
  659. }
  660. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  661. self.fields.iter()
  662. .find(|kv| kv.0 == key)
  663. .map(|kv| &kv.1)
  664. }
  665. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  666. self.tags.iter()
  667. .find(|kv| kv.0 == key)
  668. .map(|kv| kv.1)
  669. }
  670. }
  671. #[allow(unused_imports, unused_variables)]
  672. #[cfg(test)]
  673. mod tests {
  674. use super::*;
  675. use test::{black_box, Bencher};
  676. #[test]
  677. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  678. const VERSION: &str = "0.3.90";
  679. let tag_value = "one";
  680. let color = "red";
  681. let time = Utc::now();
  682. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  683. assert_eq!(m.get_tag("color"), Some("red"));
  684. assert_eq!(m.get_tag("version"), Some(VERSION));
  685. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  686. }
  687. #[test]
  688. fn it_uses_the_v_for_version_shortcut() {
  689. const VERSION: &str = "0.3.90";
  690. let tag_value = "one";
  691. let color = "red";
  692. let time = now();
  693. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  694. assert_eq!(m.get_tag("color"), Some("red"));
  695. assert_eq!(m.get_tag("version"), Some(VERSION));
  696. assert_eq!(m.timestamp, Some(time));
  697. }
  698. #[test]
  699. fn it_uses_the_new_tag_k_only_shortcut() {
  700. let tag_value = "one";
  701. let color = "red";
  702. let time = now();
  703. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  704. assert_eq!(m.get_tag("color"), Some("red"));
  705. assert_eq!(m.get_tag("tag_value"), Some("one"));
  706. assert_eq!(m.timestamp, Some(time));
  707. }
  708. #[test]
  709. fn it_uses_measure_macro_parenthesis_syntax() {
  710. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  711. assert_eq!(m.key, "test");
  712. assert_eq!(m.get_tag("a"), Some("b"));
  713. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  714. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  715. assert_eq!(m.timestamp, Some(1));
  716. }
  717. #[test]
  718. fn it_uses_measure_macro_on_a_self_attribute() {
  719. struct A {
  720. pub influx: InfluxWriter,
  721. }
  722. impl A {
  723. fn f(&self) {
  724. measure!(self.influx, test, t(color, "red"), i(n, 1));
  725. }
  726. }
  727. let a = A { influx: InfluxWriter::default() };
  728. a.f();
  729. }
  730. #[test]
  731. fn it_clones_an_influx_writer_to_check_both_drop() {
  732. let influx = InfluxWriter::default();
  733. measure!(influx, drop_test, i(a, 1), i(b, 2));
  734. {
  735. let influx = influx.clone();
  736. thread::spawn(move || {
  737. measure!(influx, drop_test, i(a, 3), i(b, 4));
  738. });
  739. }
  740. }
  741. #[bench]
  742. fn influx_writer_send_basic(b: &mut Bencher) {
  743. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  744. b.iter(|| {
  745. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  746. });
  747. }
  748. #[bench]
  749. fn influx_writer_send_price(b: &mut Bencher) {
  750. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  751. b.iter(|| {
  752. measure!(m, test,
  753. t(ticker, t!(xmr-btc).as_str()),
  754. t(exchange, "plnx"),
  755. d(bid, d128::zero()),
  756. d(ask, d128::zero()),
  757. );
  758. });
  759. }
  760. #[test]
  761. fn it_checks_color_tag_error_in_non_doctest() {
  762. let (tx, rx) = channel();
  763. measure!(tx, test, tag[color;"red"], int[n;1]);
  764. let meas: OwnedMeasurement = rx.recv().unwrap();
  765. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  766. }
  767. #[test]
  768. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  769. let meas = measure!(@make_meas test_measurement,
  770. tag [ one => "a" ],
  771. tag [ two => "b" ],
  772. int [ three => 2 ],
  773. float [ four => 1.2345 ],
  774. string [ five => String::from("d") ],
  775. bool [ six => true ],
  776. int [ seven => { 1 + 2 } ],
  777. time [ 1 ]
  778. );
  779. assert_eq!(meas.key, "test_measurement");
  780. assert_eq!(meas.get_tag("one"), Some("a"));
  781. assert_eq!(meas.get_tag("two"), Some("b"));
  782. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  783. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  784. assert_eq!(meas.timestamp, Some(1));
  785. }
  786. #[test]
  787. fn it_uses_the_measure_macro() {
  788. let (tx, rx) = channel();
  789. measure!(tx, test_measurement,
  790. tag [ one => "a" ],
  791. tag [ two => "b" ],
  792. int [ three => 2 ],
  793. float [ four => 1.2345 ],
  794. string [ five => String::from("d") ],
  795. bool [ six => true ],
  796. int [ seven => { 1 + 2 } ],
  797. time [ 1 ]
  798. );
  799. thread::sleep(Duration::from_millis(10));
  800. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  801. assert_eq!(meas.key, "test_measurement");
  802. assert_eq!(meas.get_tag("one"), Some("a"));
  803. assert_eq!(meas.get_tag("two"), Some("b"));
  804. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  805. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  806. assert_eq!(meas.timestamp, Some(1));
  807. }
  808. #[test]
  809. fn it_uses_measure_macro_for_d128_and_uuid() {
  810. let (tx, rx) = channel();
  811. let u = Uuid::new_v4();
  812. let d = d128::zero();
  813. let t = now();
  814. measure!(tx, test_measurement,
  815. tag[one; "a"],
  816. d128[two; d],
  817. uuid[three; u],
  818. time[t]
  819. );
  820. thread::sleep(Duration::from_millis(10));
  821. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  822. assert_eq!(meas.key, "test_measurement");
  823. assert_eq!(meas.get_tag("one"), Some("a"));
  824. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  825. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  826. assert_eq!(meas.timestamp, Some(t));
  827. }
  828. #[test]
  829. fn it_uses_the_measure_macro_alt_syntax() {
  830. let (tx, rx) = channel();
  831. measure!(tx, test_measurement,
  832. tag[one; "a"],
  833. tag[two; "b"],
  834. int[three; 2],
  835. float[four; 1.2345],
  836. string[five; String::from("d")],
  837. bool [ six => true ],
  838. int[seven; { 1 + 2 }],
  839. time[1]
  840. );
  841. thread::sleep(Duration::from_millis(10));
  842. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  843. assert_eq!(meas.key, "test_measurement");
  844. assert_eq!(meas.get_tag("one"), Some("a"));
  845. assert_eq!(meas.get_tag("two"), Some("b"));
  846. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  847. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  848. assert_eq!(meas.timestamp, Some(1));
  849. }
  850. #[test]
  851. fn it_checks_that_fields_are_separated_correctly() {
  852. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  853. assert_eq!(m.key, "test");
  854. assert_eq!(m.get_tag("a"), Some("one"));
  855. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  856. let mut buf = String::new();
  857. serialize_owned(&m, &mut buf);
  858. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  859. }
  860. #[test]
  861. fn try_to_break_measure_macro() {
  862. let (tx, _) = channel();
  863. measure!(tx, one, tag[x=>"y"], int[n;1]);
  864. measure!(tx, one, tag[x;"y"], int[n;1],);
  865. struct A {
  866. pub one: i32,
  867. pub two: i32,
  868. }
  869. struct B {
  870. pub a: A
  871. }
  872. let b = B { a: A { one: 1, two: 2 } };
  873. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  874. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  875. }
  876. #[bench]
  877. fn measure_macro_small(b: &mut Bencher) {
  878. let (tx, rx) = channel();
  879. let listener = thread::spawn(move || {
  880. loop { if rx.recv().is_err() { break } }
  881. });
  882. b.iter(|| {
  883. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  884. });
  885. }
  886. #[bench]
  887. fn measure_macro_medium(b: &mut Bencher) {
  888. let (tx, rx) = channel();
  889. let listener = thread::spawn(move || {
  890. loop { if rx.recv().is_err() { break } }
  891. });
  892. b.iter(|| {
  893. measure!(tx, test,
  894. tag[color; "red"],
  895. tag[mood => "playful"],
  896. tag [ ticker => "xmr_btc" ],
  897. float[ price => 1.2345 ],
  898. float[ amount => 56.323],
  899. int[n; 1],
  900. time[now()]
  901. );
  902. });
  903. }
  904. #[cfg(feature = "zmq")]
  905. #[cfg(feature = "warnings")]
  906. #[test]
  907. #[ignore]
  908. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  909. let ctx = zmq::Context::new();
  910. let socket = push(&ctx).unwrap();
  911. let (tx, rx) = channel();
  912. let w = writer(tx.clone());
  913. let mut buf = String::with_capacity(4096);
  914. let mut meas = Measurement::new("rust_test");
  915. meas.add_tag("a", "t");
  916. meas.add_field("c", Value::Float(1.23456));
  917. let now = now();
  918. meas.set_timestamp(now);
  919. serialize(&meas, &mut buf);
  920. socket.send_str(&buf, 0).unwrap();
  921. drop(w);
  922. }
  923. #[test]
  924. fn it_serializes_a_measurement_in_place() {
  925. let mut buf = String::with_capacity(4096);
  926. let mut meas = Measurement::new("rust_test");
  927. meas.add_tag("a", "b");
  928. meas.add_field("c", Value::Float(1.0));
  929. let now = now();
  930. meas.set_timestamp(now);
  931. serialize(&meas, &mut buf);
  932. let ans = format!("rust_test,a=b c=1 {}", now);
  933. assert_eq!(buf, ans);
  934. }
  935. #[test]
  936. fn it_serializes_a_hard_to_serialize_message() {
  937. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  938. let mut buf = String::new();
  939. let mut server_resp = String::new();
  940. let mut m = Measurement::new("rust_test");
  941. m.add_field("s", Value::String(&raw));
  942. let now = now();
  943. m.set_timestamp(now);
  944. serialize(&m, &mut buf);
  945. println!("{}", buf);
  946. buf.push_str("\n");
  947. let buf_copy = buf.clone();
  948. buf.push_str(&buf_copy);
  949. println!("{}", buf);
  950. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  951. let client = Client::new();
  952. match client.post(url.clone())
  953. .body(&buf)
  954. .send() {
  955. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  956. Ok(mut resp) => {
  957. resp.read_to_string(&mut server_resp).unwrap();
  958. panic!("{}", server_resp);
  959. }
  960. Err(why) => {
  961. panic!(why)
  962. }
  963. }
  964. }
  965. #[bench]
  966. fn serialize_owned_longer(b: &mut Bencher) {
  967. let mut buf = String::with_capacity(1024);
  968. let m =
  969. OwnedMeasurement::new("test")
  970. .add_tag("one", "a")
  971. .add_tag("two", "b")
  972. .add_tag("ticker", "xmr_btc")
  973. .add_tag("exchange", "plnx")
  974. .add_tag("side", "bid")
  975. .add_field("three", OwnedValue::Float(1.2345))
  976. .add_field("four", OwnedValue::Integer(57))
  977. .add_field("five", OwnedValue::Boolean(true))
  978. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  979. .set_timestamp(now());
  980. b.iter(|| {
  981. serialize_owned(&m, &mut buf);
  982. buf.clear()
  983. });
  984. }
  985. #[bench]
  986. fn serialize_owned_simple(b: &mut Bencher) {
  987. let mut buf = String::with_capacity(1024);
  988. let m =
  989. OwnedMeasurement::new("test")
  990. .add_tag("one", "a")
  991. .add_tag("two", "b")
  992. .add_field("three", OwnedValue::Float(1.2345))
  993. .add_field("four", OwnedValue::Integer(57))
  994. .set_timestamp(now());
  995. b.iter(|| {
  996. serialize_owned(&m, &mut buf);
  997. buf.clear()
  998. });
  999. }
  1000. #[test]
  1001. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  1002. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  1003. let mut buf = String::new();
  1004. let mut server_resp = String::new();
  1005. let m = OwnedMeasurement::new("rust_test")
  1006. .add_field("s", OwnedValue::String(raw.to_string()))
  1007. .set_timestamp(now());
  1008. serialize_owned(&m, &mut buf);
  1009. println!("{}", buf);
  1010. buf.push_str("\n");
  1011. let buf_copy = buf.clone();
  1012. buf.push_str(&buf_copy);
  1013. println!("{}", buf);
  1014. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  1015. let client = Client::new();
  1016. match client.post(url.clone())
  1017. .body(&buf)
  1018. .send() {
  1019. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  1020. Ok(mut resp) => {
  1021. resp.read_to_string(&mut server_resp).unwrap();
  1022. panic!("{}", server_resp);
  1023. }
  1024. Err(why) => {
  1025. panic!(why)
  1026. }
  1027. }
  1028. }
  1029. }