You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1147 lines
38KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. #[cfg(feature = "warnings")]
  8. use std::fs;
  9. use std::time::{Instant, Duration};
  10. use std::hash::BuildHasherDefault;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. #[cfg(feature = "zmq")]
  17. use zmq;
  18. #[allow(unused_imports)]
  19. use chrono::{DateTime, Utc};
  20. use ordermap::OrderMap;
  21. use fnv::FnvHasher;
  22. use decimal::d128;
  23. use uuid::Uuid;
  24. use smallvec::SmallVec;
  25. use slog::Logger;
  26. use super::{nanos, file_logger, LOG_LEVEL};
  27. #[cfg(feature = "warnings")]
  28. use warnings::Warning;
  29. pub use super::{dur_nanos, dt_nanos};
  30. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  31. pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096;
  32. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  33. Map::with_capacity_and_hasher(capacity, Default::default())
  34. }
  35. /// Created this so I know what types can be passed through the
  36. /// `measure!` macro, which used to convert with `as i64` and
  37. /// `as f64` until I accidentally passed a function name, and it
  38. /// still compiled, but with garbage numbers.
  39. pub trait AsI64 {
  40. fn as_i64(x: Self) -> i64;
  41. }
  42. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  43. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  44. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  45. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  46. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  47. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  48. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  49. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  50. /// Created this so I know what types can be passed through the
  51. /// `measure!` macro, which used to convert with `as i64` and
  52. /// `as f64` until I accidentally passed a function name, and it
  53. /// still compiled, but with garbage numbers.
  54. pub trait AsF64 {
  55. fn as_f64(x: Self) -> f64;
  56. }
  57. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  58. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  59. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  60. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  61. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  62. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  63. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  64. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  65. ///
  66. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  67. /// values, as well as sends it with the `Sender`.
  68. ///
  69. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  70. /// measurement (see `tests` mod).
  71. ///
  72. /// # Examples
  73. ///
  74. /// ```
  75. /// #![feature(try_from)]
  76. /// #[macro_use] extern crate logging;
  77. /// extern crate decimal;
  78. ///
  79. /// use std::sync::mpsc::channel;
  80. /// use decimal::d128;
  81. /// use logging::influx::*;
  82. ///
  83. /// fn main() {
  84. /// let (tx, rx) = channel();
  85. ///
  86. /// // "shorthand" syntax
  87. ///
  88. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  89. ///
  90. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  91. ///
  92. /// assert_eq!(meas.key, "test");
  93. /// assert_eq!(meas.get_tag("color"), Some("red"));
  94. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  95. ///
  96. /// // alternate syntax ...
  97. ///
  98. /// measure!(tx, test,
  99. /// tag [ one => "a" ],
  100. /// tag [ two => "b" ],
  101. /// int [ three => 2 ],
  102. /// float [ four => 1.2345 ],
  103. /// string [ five => String::from("d") ],
  104. /// bool [ six => true ],
  105. /// int [ seven => { 1 + 2 } ],
  106. /// time [ 1 ]
  107. /// );
  108. ///
  109. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  110. ///
  111. /// assert_eq!(meas.key, "test");
  112. /// assert_eq!(meas.get_tag("one"), Some("a"));
  113. /// assert_eq!(meas.get_tag("two"), Some("b"));
  114. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  115. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  116. /// assert_eq!(meas.timestamp, Some(1));
  117. ///
  118. /// // use the @make_meas flag to skip sending a measurement, instead merely
  119. /// // creating it.
  120. ///
  121. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  122. ///
  123. /// // each variant also has shorthand aliases
  124. ///
  125. /// let meas: OwnedMeasurement =
  126. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  127. /// }
  128. /// ```
  129. ///
  130. #[macro_export]
  131. macro_rules! measure {
  132. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  133. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  134. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  135. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  136. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  137. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  138. (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) };
  139. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  140. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  141. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  142. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  143. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  144. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  145. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  146. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  147. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  148. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  149. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  150. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  151. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  152. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  153. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  154. (@as_expr $e:expr) => {$e};
  155. (@count_tags) => {0usize};
  156. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  157. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  158. (@count_fields) => {0usize};
  159. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  160. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  161. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  162. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  163. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  164. };
  165. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  166. let n_tags = measure!(@count_tags $($t)*);
  167. let n_fields = measure!(@count_fields $($t)*);
  168. let mut meas =
  169. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  170. $(
  171. measure!(@kv $t, meas, $($tail)*);
  172. )*
  173. meas
  174. }};
  175. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  176. measure!($m, $name, $($t [ $($tail)* ] ),+)
  177. };
  178. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  179. #[allow(unused_imports)]
  180. use $crate::influx::{AsI64, AsF64};
  181. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  182. let _ = $m.send(measurement);
  183. }};
  184. }
  185. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  186. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  187. /// measurements have accumulated.
  188. ///
  189. #[derive(Debug)]
  190. pub struct InfluxWriter {
  191. host: String,
  192. db: String,
  193. tx: Sender<Option<OwnedMeasurement>>,
  194. thread: Option<Arc<thread::JoinHandle<()>>>,
  195. }
  196. impl Default for InfluxWriter {
  197. fn default() -> Self {
  198. //if cfg!(any(test, feature = "test")) {
  199. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  200. //} else {
  201. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 4000)
  202. //}
  203. }
  204. }
  205. impl Clone for InfluxWriter {
  206. fn clone(&self) -> Self {
  207. debug_assert!(self.thread.is_some());
  208. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  209. InfluxWriter {
  210. host: self.host.to_string(),
  211. db: self.db.to_string(),
  212. tx: self.tx.clone(),
  213. thread,
  214. }
  215. }
  216. }
  217. impl InfluxWriter {
  218. /// Sends the `OwnedMeasurement` to the serialization thread.
  219. ///
  220. #[cfg_attr(feature = "inlines", inline)]
  221. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  222. self.tx.send(Some(m))
  223. }
  224. #[cfg_attr(feature = "inlines", inline)]
  225. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  226. #[cfg_attr(feature = "inlines", inline)]
  227. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  228. #[cfg_attr(feature = "inlines", inline)]
  229. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  230. #[cfg_attr(feature = "inlines", inline)]
  231. pub fn secs(&self, d: Duration) -> f64 {
  232. ((d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64))
  233. * 1000.0)
  234. .round()
  235. / 1000.0
  236. }
  237. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  238. self.tx.clone()
  239. }
  240. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  241. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  242. Self::with_logger(host, db, buffer_size, logger)
  243. }
  244. #[allow(unused_assignments)]
  245. pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self {
  246. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  247. let buffer_size = INFLUX_WRITER_MAX_BUFFER;
  248. #[cfg(feature = "no-influx-buffer")]
  249. let buffer_size = 0usize;
  250. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  251. let url =
  252. Url::parse_with_params(&format!("http://{}:8086/write", host),
  253. &[("db", db), ("precision", "ns")])
  254. .expect("influx writer url should parse");
  255. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  256. let client = Client::new();
  257. debug!(logger, "initializing buffers");
  258. let mut buf = String::with_capacity(32 * 32 * 32);
  259. let mut count = 0;
  260. let mut last = Instant::now();
  261. let mut loop_time = Instant::now();
  262. let send = |buf: &str| {
  263. let resp = client.post(url.clone())
  264. .body(buf)
  265. .send();
  266. match resp {
  267. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  268. debug!(logger, "server responded ok: 204 NoContent");
  269. }
  270. Ok(mut resp) => {
  271. let mut server_resp = String::with_capacity(32 * 1024); // need to allocate here bc will be
  272. // sent to logging thread
  273. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  274. error!(logger, "influx server error";
  275. "status" => resp.status.to_string(),
  276. "body" => server_resp);
  277. }
  278. Err(why) => {
  279. error!(logger, "http request failed: {:?}", why);
  280. }
  281. }
  282. };
  283. let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: &Instant, last: &mut Instant| -> usize {
  284. match prev {
  285. 0 if buffer_size > 0 => {
  286. serialize_owned(m, buf);
  287. 1
  288. }
  289. n if n < buffer_size && *loop_time - *last < Duration::from_secs(2) => {
  290. buf.push_str("\n");
  291. serialize_owned(m, buf);
  292. n + 1
  293. }
  294. n => {
  295. buf.push_str("\n");
  296. serialize_owned(m, buf);
  297. debug!(logger, "sending buffer to influx"; "len" => n);
  298. send(buf);
  299. *last = *loop_time;
  300. buf.clear();
  301. 0
  302. }
  303. }
  304. };
  305. loop {
  306. loop_time = Instant::now();
  307. match rx.recv() {
  308. Ok(Some(mut meas)) => {
  309. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  310. if meas.fields.is_empty() {
  311. meas.fields.push(("n", OwnedValue::Integer(1)));
  312. }
  313. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  314. count = next(count, &meas, &mut buf, &loop_time, &mut last);
  315. }
  316. Ok(None) => {
  317. if buf.len() > 0 {
  318. debug!(logger, "sending buffer to influx"; "len" => count);
  319. send(&buf)
  320. }
  321. break
  322. }
  323. _ => {
  324. thread::sleep(Duration::new(0, 1))
  325. }
  326. }
  327. }
  328. }).unwrap();
  329. InfluxWriter {
  330. host: host.to_string(),
  331. db: db.to_string(),
  332. tx,
  333. thread: Some(Arc::new(thread))
  334. }
  335. }
  336. }
  337. impl Drop for InfluxWriter {
  338. fn drop(&mut self) {
  339. if let Some(arc) = self.thread.take() {
  340. if let Ok(thread) = Arc::try_unwrap(arc) {
  341. let _ = self.tx.send(None);
  342. let _ = thread.join();
  343. }
  344. }
  345. }
  346. }
  347. #[cfg(feature = "zmq")]
  348. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  349. #[cfg(feature = "zmq")]
  350. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  351. let socket = ctx.socket(zmq::PULL)?;
  352. socket.bind(WRITER_ADDR)?;
  353. socket.set_rcvhwm(0)?;
  354. Ok(socket)
  355. }
  356. #[cfg(feature = "zmq")]
  357. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  358. let socket = ctx.socket(zmq::PUSH)?;
  359. socket.connect(WRITER_ADDR)?;
  360. socket.set_sndhwm(0)?;
  361. Ok(socket)
  362. }
  363. /// This removes offending things rather than escaping them.
  364. ///
  365. fn escape_tag(s: &str) -> String {
  366. s.replace(" ", "")
  367. .replace(",", "")
  368. .replace("\"", "")
  369. }
  370. fn escape(s: &str) -> String {
  371. s.replace(" ", "\\ ")
  372. .replace(",", "\\,")
  373. }
  374. fn as_string(s: &str) -> String {
  375. // the second replace removes double escapes
  376. //
  377. format!("\"{}\"", s.replace("\"", "\\\"")
  378. .replace(r#"\\""#, r#"\""#))
  379. }
  380. #[test]
  381. fn it_checks_as_string_does_not_double_escape() {
  382. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  383. let escaped = as_string(&raw);
  384. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  385. }
  386. fn as_integer(i: &i64) -> String {
  387. format!("{}i", i)
  388. }
  389. fn as_float(f: &f64) -> String {
  390. f.to_string()
  391. }
  392. fn as_boolean(b: &bool) -> &str {
  393. if *b { "t" } else { "f" }
  394. }
  395. pub fn now() -> i64 {
  396. nanos(Utc::now()) as i64
  397. }
  398. /// Serialize the measurement into influx line protocol
  399. /// and append to the buffer.
  400. ///
  401. /// # Examples
  402. ///
  403. /// ```
  404. /// extern crate influent;
  405. /// extern crate logging;
  406. ///
  407. /// use influent::measurement::{Measurement, Value};
  408. /// use std::string::String;
  409. /// use logging::influx::serialize;
  410. ///
  411. /// fn main() {
  412. /// let mut buf = String::new();
  413. /// let mut m = Measurement::new("test");
  414. /// m.add_field("x", Value::Integer(1));
  415. /// serialize(&m, &mut buf);
  416. /// }
  417. ///
  418. /// ```
  419. ///
  420. pub fn serialize(measurement: &Measurement, line: &mut String) {
  421. line.push_str(&escape(measurement.key));
  422. for (tag, value) in measurement.tags.iter() {
  423. line.push_str(",");
  424. line.push_str(&escape(tag));
  425. line.push_str("=");
  426. line.push_str(&escape(value));
  427. }
  428. let mut was_spaced = false;
  429. for (field, value) in measurement.fields.iter() {
  430. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  431. line.push_str(&escape(field));
  432. line.push_str("=");
  433. match value {
  434. &Value::String(ref s) => line.push_str(&as_string(s)),
  435. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  436. &Value::Float(ref f) => line.push_str(&as_float(f)),
  437. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  438. };
  439. }
  440. match measurement.timestamp {
  441. Some(t) => {
  442. line.push_str(" ");
  443. line.push_str(&t.to_string());
  444. }
  445. _ => {}
  446. }
  447. }
  448. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  449. ///
  450. /// The serialized measurement is appended to the end of the string without
  451. /// any regard for what exited in it previously.
  452. ///
  453. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  454. line.push_str(&escape_tag(measurement.key));
  455. let add_tag = |line: &mut String, key: &str, value: &str| {
  456. line.push_str(",");
  457. line.push_str(&escape_tag(key));
  458. line.push_str("=");
  459. line.push_str(&escape(value));
  460. };
  461. for &(key, value) in measurement.tags.iter() {
  462. add_tag(line, key, value);
  463. }
  464. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  465. if is_first { line.push_str(" "); } else { line.push_str(","); }
  466. line.push_str(&escape_tag(key));
  467. line.push_str("=");
  468. match *value {
  469. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  470. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  471. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  472. OwnedValue::D128(ref d) => {
  473. if d.is_finite() {
  474. line.push_str(&format!("{}", d));
  475. } else {
  476. line.push_str("0.0");
  477. }
  478. }
  479. OwnedValue::Float(ref f) => {
  480. if f.is_finite() {
  481. line.push_str(&format!("{}", f));
  482. } else {
  483. line.push_str("0.0");
  484. }
  485. }
  486. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  487. };
  488. };
  489. let mut fields = measurement.fields.iter();
  490. // first time separate from tags with space
  491. //
  492. fields.next().map(|kv| {
  493. add_field(line, &kv.0, &kv.1, true);
  494. });
  495. // then seperate the rest w/ comma
  496. //
  497. for kv in fields {
  498. add_field(line, kv.0, &kv.1, false);
  499. }
  500. if let Some(t) = measurement.timestamp {
  501. line.push_str(" ");
  502. line.push_str(&t.to_string());
  503. }
  504. }
  505. #[cfg(feature = "warnings")]
  506. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  507. #[cfg(feature = "zmq")]
  508. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  509. assert!(false);
  510. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  511. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  512. let _ = fs::create_dir("/tmp/mm");
  513. let ctx = zmq::Context::new();
  514. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  515. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  516. let client = Client::new();
  517. let mut buf = String::with_capacity(4096);
  518. let mut server_resp = String::with_capacity(4096);
  519. let mut count = 0;
  520. loop {
  521. if let Ok(bytes) = socket.recv_bytes(0) {
  522. if let Ok(msg) = String::from_utf8(bytes) {
  523. count = match count {
  524. 0 => {
  525. buf.push_str(&msg);
  526. 1
  527. }
  528. n @ 1...40 => {
  529. buf.push_str("\n");
  530. buf.push_str(&msg);
  531. n + 1
  532. }
  533. _ => {
  534. buf.push_str("\n");
  535. buf.push_str(&msg);
  536. match client.post(url.clone())
  537. .body(&buf)
  538. .send() {
  539. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  540. Ok(mut resp) => {
  541. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  542. let _ = warnings.send(
  543. Warning::Error(
  544. format!("Influx server: {}", server_resp)));
  545. server_resp.clear();
  546. }
  547. Err(why) => {
  548. let _ = warnings.send(
  549. Warning::Error(
  550. format!("Influx write error: {}", why)));
  551. }
  552. }
  553. buf.clear();
  554. 0
  555. }
  556. }
  557. }
  558. }
  559. }
  560. }).unwrap()
  561. }
  562. #[derive(Debug, Clone, PartialEq)]
  563. pub enum OwnedValue {
  564. String(String),
  565. Float(f64),
  566. Integer(i64),
  567. Boolean(bool),
  568. D128(d128),
  569. Uuid(Uuid),
  570. }
  571. /// Holds data meant for an influxdb measurement in transit to the
  572. /// writing thread.
  573. ///
  574. /// TODO: convert `Map` to `SmallVec`?
  575. ///
  576. #[derive(Clone, Debug)]
  577. pub struct OwnedMeasurement {
  578. pub key: &'static str,
  579. pub timestamp: Option<i64>,
  580. //pub fields: Map<&'static str, OwnedValue>,
  581. //pub tags: Map<&'static str, &'static str>,
  582. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  583. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  584. }
  585. impl OwnedMeasurement {
  586. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  587. OwnedMeasurement {
  588. key,
  589. timestamp: None,
  590. tags: SmallVec::with_capacity(n_tags),
  591. fields: SmallVec::with_capacity(n_fields),
  592. }
  593. }
  594. pub fn new(key: &'static str) -> Self {
  595. OwnedMeasurement {
  596. key,
  597. timestamp: None,
  598. tags: SmallVec::new(),
  599. fields: SmallVec::new(),
  600. }
  601. }
  602. /// Unusual consuming `self` signature because primarily used by
  603. /// the `measure!` macro.
  604. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  605. self.tags.push((key, value));
  606. self
  607. }
  608. /// Unusual consuming `self` signature because primarily used by
  609. /// the `measure!` macro.
  610. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  611. self.fields.push((key, value));
  612. self
  613. }
  614. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  615. self.timestamp = Some(timestamp);
  616. self
  617. }
  618. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  619. match self.tags.iter().position(|kv| kv.0 == key) {
  620. Some(i) => {
  621. self.tags.get_mut(i)
  622. .map(|x| {
  623. x.0 = value;
  624. });
  625. self
  626. }
  627. None => {
  628. self.add_tag(key, value)
  629. }
  630. }
  631. }
  632. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  633. self.fields.iter()
  634. .find(|kv| kv.0 == key)
  635. .map(|kv| &kv.1)
  636. }
  637. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  638. self.tags.iter()
  639. .find(|kv| kv.0 == key)
  640. .map(|kv| kv.1)
  641. }
  642. }
  643. #[allow(unused_imports, unused_variables)]
  644. #[cfg(test)]
  645. mod tests {
  646. use super::*;
  647. use test::{black_box, Bencher};
  648. #[test]
  649. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  650. const VERSION: &str = "0.3.90";
  651. let tag_value = "one";
  652. let color = "red";
  653. let time = Utc::now();
  654. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  655. assert_eq!(m.get_tag("color"), Some("red"));
  656. assert_eq!(m.get_tag("version"), Some(VERSION));
  657. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  658. }
  659. #[test]
  660. fn it_uses_the_v_for_version_shortcut() {
  661. const VERSION: &str = "0.3.90";
  662. let tag_value = "one";
  663. let color = "red";
  664. let time = now();
  665. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  666. assert_eq!(m.get_tag("color"), Some("red"));
  667. assert_eq!(m.get_tag("version"), Some(VERSION));
  668. assert_eq!(m.timestamp, Some(time));
  669. }
  670. #[test]
  671. fn it_uses_the_new_tag_k_only_shortcut() {
  672. let tag_value = "one";
  673. let color = "red";
  674. let time = now();
  675. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  676. assert_eq!(m.get_tag("color"), Some("red"));
  677. assert_eq!(m.get_tag("tag_value"), Some("one"));
  678. assert_eq!(m.timestamp, Some(time));
  679. }
  680. #[test]
  681. fn it_uses_measure_macro_parenthesis_syntax() {
  682. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  683. assert_eq!(m.key, "test");
  684. assert_eq!(m.get_tag("a"), Some("b"));
  685. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  686. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  687. assert_eq!(m.timestamp, Some(1));
  688. }
  689. #[test]
  690. fn it_uses_measure_macro_on_a_self_attribute() {
  691. struct A {
  692. pub influx: InfluxWriter,
  693. }
  694. impl A {
  695. fn f(&self) {
  696. measure!(self.influx, test, t(color, "red"), i(n, 1));
  697. }
  698. }
  699. let a = A { influx: InfluxWriter::default() };
  700. a.f();
  701. }
  702. #[test]
  703. fn it_clones_an_influx_writer_to_check_both_drop() {
  704. let influx = InfluxWriter::default();
  705. measure!(influx, drop_test, i(a, 1), i(b, 2));
  706. {
  707. let influx = influx.clone();
  708. thread::spawn(move || {
  709. measure!(influx, drop_test, i(a, 3), i(b, 4));
  710. });
  711. }
  712. }
  713. #[bench]
  714. fn influx_writer_send_basic(b: &mut Bencher) {
  715. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  716. b.iter(|| {
  717. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  718. });
  719. }
  720. #[bench]
  721. fn influx_writer_send_price(b: &mut Bencher) {
  722. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  723. b.iter(|| {
  724. measure!(m, test,
  725. t(ticker, t!(xmr-btc).as_str()),
  726. t(exchange, "plnx"),
  727. d(bid, d128::zero()),
  728. d(ask, d128::zero()),
  729. );
  730. });
  731. }
  732. #[test]
  733. fn it_checks_color_tag_error_in_non_doctest() {
  734. let (tx, rx) = channel();
  735. measure!(tx, test, tag[color;"red"], int[n;1]);
  736. let meas: OwnedMeasurement = rx.recv().unwrap();
  737. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  738. }
  739. #[test]
  740. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  741. let meas = measure!(@make_meas test_measurement,
  742. tag [ one => "a" ],
  743. tag [ two => "b" ],
  744. int [ three => 2 ],
  745. float [ four => 1.2345 ],
  746. string [ five => String::from("d") ],
  747. bool [ six => true ],
  748. int [ seven => { 1 + 2 } ],
  749. time [ 1 ]
  750. );
  751. assert_eq!(meas.key, "test_measurement");
  752. assert_eq!(meas.get_tag("one"), Some("a"));
  753. assert_eq!(meas.get_tag("two"), Some("b"));
  754. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  755. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  756. assert_eq!(meas.timestamp, Some(1));
  757. }
  758. #[test]
  759. fn it_uses_the_measure_macro() {
  760. let (tx, rx) = channel();
  761. measure!(tx, test_measurement,
  762. tag [ one => "a" ],
  763. tag [ two => "b" ],
  764. int [ three => 2 ],
  765. float [ four => 1.2345 ],
  766. string [ five => String::from("d") ],
  767. bool [ six => true ],
  768. int [ seven => { 1 + 2 } ],
  769. time [ 1 ]
  770. );
  771. thread::sleep(Duration::from_millis(10));
  772. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  773. assert_eq!(meas.key, "test_measurement");
  774. assert_eq!(meas.get_tag("one"), Some("a"));
  775. assert_eq!(meas.get_tag("two"), Some("b"));
  776. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  777. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  778. assert_eq!(meas.timestamp, Some(1));
  779. }
  780. #[test]
  781. fn it_uses_measure_macro_for_d128_and_uuid() {
  782. let (tx, rx) = channel();
  783. let u = Uuid::new_v4();
  784. let d = d128::zero();
  785. let t = now();
  786. measure!(tx, test_measurement,
  787. tag[one; "a"],
  788. d128[two; d],
  789. uuid[three; u],
  790. time[t]
  791. );
  792. thread::sleep(Duration::from_millis(10));
  793. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  794. assert_eq!(meas.key, "test_measurement");
  795. assert_eq!(meas.get_tag("one"), Some("a"));
  796. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  797. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  798. assert_eq!(meas.timestamp, Some(t));
  799. }
  800. #[test]
  801. fn it_uses_the_measure_macro_alt_syntax() {
  802. let (tx, rx) = channel();
  803. measure!(tx, test_measurement,
  804. tag[one; "a"],
  805. tag[two; "b"],
  806. int[three; 2],
  807. float[four; 1.2345],
  808. string[five; String::from("d")],
  809. bool [ six => true ],
  810. int[seven; { 1 + 2 }],
  811. time[1]
  812. );
  813. thread::sleep(Duration::from_millis(10));
  814. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  815. assert_eq!(meas.key, "test_measurement");
  816. assert_eq!(meas.get_tag("one"), Some("a"));
  817. assert_eq!(meas.get_tag("two"), Some("b"));
  818. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  819. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  820. assert_eq!(meas.timestamp, Some(1));
  821. }
  822. #[test]
  823. fn it_checks_that_fields_are_separated_correctly() {
  824. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  825. assert_eq!(m.key, "test");
  826. assert_eq!(m.get_tag("a"), Some("one"));
  827. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  828. let mut buf = String::new();
  829. serialize_owned(&m, &mut buf);
  830. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  831. }
  832. #[test]
  833. fn try_to_break_measure_macro() {
  834. let (tx, _) = channel();
  835. measure!(tx, one, tag[x=>"y"], int[n;1]);
  836. measure!(tx, one, tag[x;"y"], int[n;1],);
  837. struct A {
  838. pub one: i32,
  839. pub two: i32,
  840. }
  841. struct B {
  842. pub a: A
  843. }
  844. let b = B { a: A { one: 1, two: 2 } };
  845. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  846. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  847. }
  848. #[bench]
  849. fn measure_macro_small(b: &mut Bencher) {
  850. let (tx, rx) = channel();
  851. let listener = thread::spawn(move || {
  852. loop { if rx.recv().is_err() { break } }
  853. });
  854. b.iter(|| {
  855. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  856. });
  857. }
  858. #[bench]
  859. fn measure_macro_medium(b: &mut Bencher) {
  860. let (tx, rx) = channel();
  861. let listener = thread::spawn(move || {
  862. loop { if rx.recv().is_err() { break } }
  863. });
  864. b.iter(|| {
  865. measure!(tx, test,
  866. tag[color; "red"],
  867. tag[mood => "playful"],
  868. tag [ ticker => "xmr_btc" ],
  869. float[ price => 1.2345 ],
  870. float[ amount => 56.323],
  871. int[n; 1],
  872. time[now()]
  873. );
  874. });
  875. }
  876. #[cfg(feature = "zmq")]
  877. #[cfg(feature = "warnings")]
  878. #[test]
  879. #[ignore]
  880. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  881. let ctx = zmq::Context::new();
  882. let socket = push(&ctx).unwrap();
  883. let (tx, rx) = channel();
  884. let w = writer(tx.clone());
  885. let mut buf = String::with_capacity(4096);
  886. let mut meas = Measurement::new("rust_test");
  887. meas.add_tag("a", "t");
  888. meas.add_field("c", Value::Float(1.23456));
  889. let now = now();
  890. meas.set_timestamp(now);
  891. serialize(&meas, &mut buf);
  892. socket.send_str(&buf, 0).unwrap();
  893. drop(w);
  894. }
  895. #[test]
  896. fn it_serializes_a_measurement_in_place() {
  897. let mut buf = String::with_capacity(4096);
  898. let mut meas = Measurement::new("rust_test");
  899. meas.add_tag("a", "b");
  900. meas.add_field("c", Value::Float(1.0));
  901. let now = now();
  902. meas.set_timestamp(now);
  903. serialize(&meas, &mut buf);
  904. let ans = format!("rust_test,a=b c=1 {}", now);
  905. assert_eq!(buf, ans);
  906. }
  907. #[test]
  908. fn it_serializes_a_hard_to_serialize_message() {
  909. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  910. let mut buf = String::new();
  911. let mut server_resp = String::new();
  912. let mut m = Measurement::new("rust_test");
  913. m.add_field("s", Value::String(&raw));
  914. let now = now();
  915. m.set_timestamp(now);
  916. serialize(&m, &mut buf);
  917. println!("{}", buf);
  918. buf.push_str("\n");
  919. let buf_copy = buf.clone();
  920. buf.push_str(&buf_copy);
  921. println!("{}", buf);
  922. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  923. let client = Client::new();
  924. match client.post(url.clone())
  925. .body(&buf)
  926. .send() {
  927. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  928. Ok(mut resp) => {
  929. resp.read_to_string(&mut server_resp).unwrap();
  930. panic!("{}", server_resp);
  931. }
  932. Err(why) => {
  933. panic!(why)
  934. }
  935. }
  936. }
  937. #[bench]
  938. fn serialize_owned_longer(b: &mut Bencher) {
  939. let mut buf = String::with_capacity(1024);
  940. let m =
  941. OwnedMeasurement::new("test")
  942. .add_tag("one", "a")
  943. .add_tag("two", "b")
  944. .add_tag("ticker", "xmr_btc")
  945. .add_tag("exchange", "plnx")
  946. .add_tag("side", "bid")
  947. .add_field("three", OwnedValue::Float(1.2345))
  948. .add_field("four", OwnedValue::Integer(57))
  949. .add_field("five", OwnedValue::Boolean(true))
  950. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  951. .set_timestamp(now());
  952. b.iter(|| {
  953. serialize_owned(&m, &mut buf);
  954. buf.clear()
  955. });
  956. }
  957. #[bench]
  958. fn serialize_owned_simple(b: &mut Bencher) {
  959. let mut buf = String::with_capacity(1024);
  960. let m =
  961. OwnedMeasurement::new("test")
  962. .add_tag("one", "a")
  963. .add_tag("two", "b")
  964. .add_field("three", OwnedValue::Float(1.2345))
  965. .add_field("four", OwnedValue::Integer(57))
  966. .set_timestamp(now());
  967. b.iter(|| {
  968. serialize_owned(&m, &mut buf);
  969. buf.clear()
  970. });
  971. }
  972. #[test]
  973. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  974. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  975. let mut buf = String::new();
  976. let mut server_resp = String::new();
  977. let m = OwnedMeasurement::new("rust_test")
  978. .add_field("s", OwnedValue::String(raw.to_string()))
  979. .set_timestamp(now());
  980. serialize_owned(&m, &mut buf);
  981. println!("{}", buf);
  982. buf.push_str("\n");
  983. let buf_copy = buf.clone();
  984. buf.push_str(&buf_copy);
  985. println!("{}", buf);
  986. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  987. let client = Client::new();
  988. match client.post(url.clone())
  989. .body(&buf)
  990. .send() {
  991. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  992. Ok(mut resp) => {
  993. resp.read_to_string(&mut server_resp).unwrap();
  994. panic!("{}", server_resp);
  995. }
  996. Err(why) => {
  997. panic!(why)
  998. }
  999. }
  1000. }
  1001. }