You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1042 lines
34KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. #[cfg(feature = "warnings")]
  8. use std::fs;
  9. use std::time::Duration;
  10. use std::hash::BuildHasherDefault;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. use zmq;
  17. #[allow(unused_imports)]
  18. use chrono::{DateTime, Utc};
  19. use ordermap::OrderMap;
  20. use fnv::FnvHasher;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use smallvec::SmallVec;
  24. use slog::Logger;
  25. use super::{nanos, file_logger, LOG_LEVEL};
  26. #[cfg(feature = "warnings")]
  27. use warnings::Warning;
  28. pub use super::{dur_nanos, dt_nanos};
  29. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  30. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  31. Map::with_capacity_and_hasher(capacity, Default::default())
  32. }
  33. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  34. ///
  35. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  36. /// values, as well as sends it with the `Sender`.
  37. ///
  38. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  39. /// measurement (see `tests` mod).
  40. ///
  41. /// # Examples
  42. ///
  43. /// ```
  44. /// #[macro_use] extern crate logging;
  45. /// extern crate decimal;
  46. ///
  47. /// use std::sync::mpsc::channel;
  48. /// use decimal::d128;
  49. /// use logging::influx::*;
  50. ///
  51. /// fn main() {
  52. /// let (tx, rx) = channel();
  53. ///
  54. /// // "shorthand" syntax
  55. ///
  56. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  57. ///
  58. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  59. ///
  60. /// assert_eq!(meas.key, "test");
  61. /// assert_eq!(meas.get_tag("color"), Some("red"));
  62. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  63. ///
  64. /// // alternate syntax ...
  65. ///
  66. /// measure!(tx, test,
  67. /// tag [ one => "a" ],
  68. /// tag [ two => "b" ],
  69. /// int [ three => 2 ],
  70. /// float [ four => 1.2345 ],
  71. /// string [ five => String::from("d") ],
  72. /// bool [ six => true ],
  73. /// int [ seven => { 1 + 2 } ],
  74. /// time [ 1 ]
  75. /// );
  76. ///
  77. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  78. ///
  79. /// assert_eq!(meas.key, "test");
  80. /// assert_eq!(meas.get_tag("one"), Some("a"));
  81. /// assert_eq!(meas.get_tag("two"), Some("b"));
  82. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  83. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  84. /// assert_eq!(meas.timestamp, Some(1));
  85. ///
  86. /// // use the @make_meas flag to skip sending a measurement, instead merely
  87. /// // creating it.
  88. ///
  89. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  90. ///
  91. /// // each variant also has shorthand aliases
  92. ///
  93. /// let meas: OwnedMeasurement =
  94. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  95. /// }
  96. /// ```
  97. ///
  98. #[macro_export]
  99. macro_rules! measure {
  100. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  101. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  102. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  103. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  104. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  105. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  106. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  107. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  108. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  109. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  110. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  111. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  112. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  113. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  114. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  115. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  116. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  117. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  118. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  119. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  120. (@as_expr $e:expr) => {$e};
  121. (@count_tags) => {0usize};
  122. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  123. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  124. (@count_fields) => {0usize};
  125. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  126. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  127. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  128. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  129. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  130. };
  131. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  132. let n_tags = measure!(@count_tags $($t)*);
  133. let n_fields = measure!(@count_fields $($t)*);
  134. let mut meas =
  135. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  136. $(
  137. measure!(@kv $t, meas, $($tail)*);
  138. )*
  139. meas
  140. }};
  141. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  142. measure!($m, $name, $($t [ $($tail)* ] ),+)
  143. };
  144. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  145. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  146. let _ = $m.send(measurement);
  147. }};
  148. }
  149. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  150. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  151. /// measurements have accumulated.
  152. ///
  153. #[derive(Debug)]
  154. pub struct InfluxWriter {
  155. host: String,
  156. db: String,
  157. tx: Sender<Option<OwnedMeasurement>>,
  158. thread: Option<Arc<thread::JoinHandle<()>>>,
  159. }
  160. impl Default for InfluxWriter {
  161. fn default() -> Self {
  162. //if cfg!(any(test, feature = "test")) {
  163. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  164. //} else {
  165. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 4000)
  166. //}
  167. }
  168. }
  169. impl Clone for InfluxWriter {
  170. fn clone(&self) -> Self {
  171. debug_assert!(self.thread.is_some());
  172. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  173. InfluxWriter {
  174. host: self.host.to_string(),
  175. db: self.db.to_string(),
  176. tx: self.tx.clone(),
  177. thread,
  178. }
  179. }
  180. }
  181. impl InfluxWriter {
  182. /// Sends the `OwnedMeasurement` to the serialization thread.
  183. ///
  184. #[cfg_attr(feature = "inlines", inline)]
  185. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  186. self.tx.send(Some(m))
  187. }
  188. #[cfg_attr(feature = "inlines", inline)]
  189. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  190. #[cfg_attr(feature = "inlines", inline)]
  191. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  192. #[cfg_attr(feature = "inlines", inline)]
  193. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  194. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  195. self.tx.clone()
  196. }
  197. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  198. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  199. Self::with_logger(host, db, buffer_size, logger)
  200. }
  201. #[allow(unused_assignments)]
  202. pub fn with_logger(host: &str, db: &str, buffer_size: u16, logger: Logger) -> Self {
  203. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  204. #[cfg(feature = "no-influx-buffer")]
  205. let buffer_size = 0u16;
  206. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  207. let url =
  208. Url::parse_with_params(&format!("http://{}:8086/write", host),
  209. &[("db", db), ("precision", "ns")])
  210. .expect("influx writer url should parse");
  211. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  212. let client = Client::new();
  213. debug!(logger, "initializing buffers");
  214. let mut buf = String::with_capacity(32 * 32 * 32);
  215. let mut count = 0;
  216. let send = |buf: &str| {
  217. let resp = client.post(url.clone())
  218. .body(buf)
  219. .send();
  220. match resp {
  221. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  222. debug!(logger, "server responded ok: 204 NoContent");
  223. }
  224. Ok(mut resp) => {
  225. let mut server_resp = String::with_capacity(1024);
  226. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  227. error!(logger, "influx server error";
  228. "status" => resp.status.to_string(),
  229. "body" => server_resp);
  230. }
  231. Err(why) => {
  232. error!(logger, "http request failed: {:?}", why);
  233. }
  234. }
  235. };
  236. let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
  237. match prev {
  238. 0 if buffer_size > 0 => {
  239. serialize_owned(m, buf);
  240. 1
  241. }
  242. n if n < buffer_size => {
  243. buf.push_str("\n");
  244. serialize_owned(m, buf);
  245. n + 1
  246. }
  247. n => {
  248. buf.push_str("\n");
  249. serialize_owned(m, buf);
  250. debug!(logger, "sending buffer to influx"; "len" => n);
  251. send(buf);
  252. buf.clear();
  253. 0
  254. }
  255. }
  256. };
  257. loop {
  258. match rx.recv() {
  259. Ok(Some(mut meas)) => {
  260. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  261. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  262. count = next(count, &meas, &mut buf);
  263. }
  264. Ok(None) => {
  265. if buf.len() > 0 {
  266. debug!(logger, "sending buffer to influx"; "len" => count);
  267. send(&buf)
  268. }
  269. break
  270. }
  271. _ => {
  272. thread::sleep(Duration::new(0, 1))
  273. }
  274. }
  275. }
  276. }).unwrap();
  277. InfluxWriter {
  278. host: host.to_string(),
  279. db: db.to_string(),
  280. tx,
  281. thread: Some(Arc::new(thread))
  282. }
  283. }
  284. }
  285. impl Drop for InfluxWriter {
  286. fn drop(&mut self) {
  287. if let Some(arc) = self.thread.take() {
  288. if let Ok(thread) = Arc::try_unwrap(arc) {
  289. let _ = self.tx.send(None);
  290. let _ = thread.join();
  291. }
  292. }
  293. }
  294. }
  295. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  296. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  297. let socket = ctx.socket(zmq::PULL)?;
  298. socket.bind(WRITER_ADDR)?;
  299. socket.set_rcvhwm(0)?;
  300. Ok(socket)
  301. }
  302. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  303. let socket = ctx.socket(zmq::PUSH)?;
  304. socket.connect(WRITER_ADDR)?;
  305. socket.set_sndhwm(0)?;
  306. Ok(socket)
  307. }
  308. /// This removes offending things rather than escaping them.
  309. ///
  310. fn escape_tag(s: &str) -> String {
  311. s.replace(" ", "")
  312. .replace(",", "")
  313. .replace("\"", "")
  314. }
  315. fn escape(s: &str) -> String {
  316. s.replace(" ", "\\ ")
  317. .replace(",", "\\,")
  318. }
  319. fn as_string(s: &str) -> String {
  320. // the second replace removes double escapes
  321. //
  322. format!("\"{}\"", s.replace("\"", "\\\"")
  323. .replace(r#"\\""#, r#"\""#))
  324. }
  325. #[test]
  326. fn it_checks_as_string_does_not_double_escape() {
  327. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  328. let escaped = as_string(&raw);
  329. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  330. }
  331. fn as_integer(i: &i64) -> String {
  332. format!("{}i", i)
  333. }
  334. fn as_float(f: &f64) -> String {
  335. f.to_string()
  336. }
  337. fn as_boolean(b: &bool) -> &str {
  338. if *b { "t" } else { "f" }
  339. }
  340. pub fn now() -> i64 {
  341. nanos(Utc::now()) as i64
  342. }
  343. /// Serialize the measurement into influx line protocol
  344. /// and append to the buffer.
  345. ///
  346. /// # Examples
  347. ///
  348. /// ```
  349. /// extern crate influent;
  350. /// extern crate logging;
  351. ///
  352. /// use influent::measurement::{Measurement, Value};
  353. /// use std::string::String;
  354. /// use logging::influx::serialize;
  355. ///
  356. /// fn main() {
  357. /// let mut buf = String::new();
  358. /// let mut m = Measurement::new("test");
  359. /// m.add_field("x", Value::Integer(1));
  360. /// serialize(&m, &mut buf);
  361. /// }
  362. ///
  363. /// ```
  364. ///
  365. pub fn serialize(measurement: &Measurement, line: &mut String) {
  366. line.push_str(&escape(measurement.key));
  367. for (tag, value) in measurement.tags.iter() {
  368. line.push_str(",");
  369. line.push_str(&escape(tag));
  370. line.push_str("=");
  371. line.push_str(&escape(value));
  372. }
  373. let mut was_spaced = false;
  374. for (field, value) in measurement.fields.iter() {
  375. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  376. line.push_str(&escape(field));
  377. line.push_str("=");
  378. match value {
  379. &Value::String(ref s) => line.push_str(&as_string(s)),
  380. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  381. &Value::Float(ref f) => line.push_str(&as_float(f)),
  382. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  383. };
  384. }
  385. match measurement.timestamp {
  386. Some(t) => {
  387. line.push_str(" ");
  388. line.push_str(&t.to_string());
  389. }
  390. _ => {}
  391. }
  392. }
  393. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  394. ///
  395. /// The serialized measurement is appended to the end of the string without
  396. /// any regard for what exited in it previously.
  397. ///
  398. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  399. line.push_str(&escape_tag(measurement.key));
  400. let add_tag = |line: &mut String, key: &str, value: &str| {
  401. line.push_str(",");
  402. line.push_str(&escape_tag(key));
  403. line.push_str("=");
  404. line.push_str(&escape(value));
  405. };
  406. for &(key, value) in measurement.tags.iter() {
  407. add_tag(line, key, value);
  408. }
  409. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  410. if is_first { line.push_str(" "); } else { line.push_str(","); }
  411. line.push_str(&escape_tag(key));
  412. line.push_str("=");
  413. match *value {
  414. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  415. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  416. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  417. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  418. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  419. #[cfg(not(feature = "disable-short-uuid"))]
  420. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  421. #[cfg(feature = "disable-short-uuid")]
  422. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  423. };
  424. };
  425. let mut fields = measurement.fields.iter();
  426. // first time separate from tags with space
  427. //
  428. fields.next().map(|kv| {
  429. add_field(line, &kv.0, &kv.1, true);
  430. });
  431. // then seperate the rest w/ comma
  432. //
  433. for kv in fields {
  434. add_field(line, kv.0, &kv.1, false);
  435. }
  436. if let Some(t) = measurement.timestamp {
  437. line.push_str(" ");
  438. line.push_str(&t.to_string());
  439. }
  440. }
  441. #[cfg(feature = "warnings")]
  442. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  443. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  444. assert!(false);
  445. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  446. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  447. let _ = fs::create_dir("/tmp/mm");
  448. let ctx = zmq::Context::new();
  449. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  450. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  451. let client = Client::new();
  452. let mut buf = String::with_capacity(4096);
  453. let mut server_resp = String::with_capacity(4096);
  454. let mut count = 0;
  455. loop {
  456. if let Ok(bytes) = socket.recv_bytes(0) {
  457. if let Ok(msg) = String::from_utf8(bytes) {
  458. count = match count {
  459. 0 => {
  460. buf.push_str(&msg);
  461. 1
  462. }
  463. n @ 1...40 => {
  464. buf.push_str("\n");
  465. buf.push_str(&msg);
  466. n + 1
  467. }
  468. _ => {
  469. buf.push_str("\n");
  470. buf.push_str(&msg);
  471. match client.post(url.clone())
  472. .body(&buf)
  473. .send() {
  474. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  475. Ok(mut resp) => {
  476. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  477. let _ = warnings.send(
  478. Warning::Error(
  479. format!("Influx server: {}", server_resp)));
  480. server_resp.clear();
  481. }
  482. Err(why) => {
  483. let _ = warnings.send(
  484. Warning::Error(
  485. format!("Influx write error: {}", why)));
  486. }
  487. }
  488. buf.clear();
  489. 0
  490. }
  491. }
  492. }
  493. }
  494. }
  495. }).unwrap()
  496. }
  497. #[derive(Debug, Clone, PartialEq)]
  498. pub enum OwnedValue {
  499. String(String),
  500. Float(f64),
  501. Integer(i64),
  502. Boolean(bool),
  503. D128(d128),
  504. Uuid(Uuid),
  505. }
  506. /// Holds data meant for an influxdb measurement in transit to the
  507. /// writing thread.
  508. ///
  509. /// TODO: convert `Map` to `SmallVec`?
  510. ///
  511. #[derive(Clone, Debug)]
  512. pub struct OwnedMeasurement {
  513. pub key: &'static str,
  514. pub timestamp: Option<i64>,
  515. //pub fields: Map<&'static str, OwnedValue>,
  516. //pub tags: Map<&'static str, &'static str>,
  517. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  518. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  519. }
  520. impl OwnedMeasurement {
  521. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  522. OwnedMeasurement {
  523. key,
  524. timestamp: None,
  525. tags: SmallVec::with_capacity(n_tags),
  526. fields: SmallVec::with_capacity(n_fields),
  527. }
  528. }
  529. pub fn new(key: &'static str) -> Self {
  530. OwnedMeasurement {
  531. key,
  532. timestamp: None,
  533. tags: SmallVec::new(),
  534. fields: SmallVec::new(),
  535. }
  536. }
  537. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  538. self.tags.push((key, value));
  539. self
  540. }
  541. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  542. self.fields.push((key, value));
  543. self
  544. }
  545. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  546. self.timestamp = Some(timestamp);
  547. self
  548. }
  549. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  550. match self.tags.iter().position(|kv| kv.0 == key) {
  551. Some(i) => {
  552. self.tags.get_mut(i)
  553. .map(|x| {
  554. x.0 = value;
  555. });
  556. self
  557. }
  558. None => {
  559. self.add_tag(key, value)
  560. }
  561. }
  562. }
  563. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  564. self.fields.iter()
  565. .find(|kv| kv.0 == key)
  566. .map(|kv| &kv.1)
  567. }
  568. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  569. self.tags.iter()
  570. .find(|kv| kv.0 == key)
  571. .map(|kv| kv.1)
  572. }
  573. }
  574. #[allow(unused_imports, unused_variables)]
  575. #[cfg(test)]
  576. mod tests {
  577. use super::*;
  578. use test::{black_box, Bencher};
  579. #[test]
  580. fn it_uses_the_new_tag_k_only_shortcut() {
  581. let tag_value = "one";
  582. let color = "red";
  583. let time = now();
  584. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  585. assert_eq!(m.get_tag("color"), Some("red"));
  586. assert_eq!(m.get_tag("tag_value"), Some("one"));
  587. assert_eq!(m.timestamp, Some(time));
  588. }
  589. #[test]
  590. fn it_uses_measure_macro_parenthesis_syntax() {
  591. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  592. assert_eq!(m.key, "test");
  593. assert_eq!(m.get_tag("a"), Some("b"));
  594. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  595. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  596. assert_eq!(m.timestamp, Some(1));
  597. }
  598. #[test]
  599. fn it_uses_measure_macro_on_a_self_attribute() {
  600. struct A {
  601. pub influx: InfluxWriter,
  602. }
  603. impl A {
  604. fn f(&self) {
  605. measure!(self.influx, test, t(color, "red"), i(n, 1));
  606. }
  607. }
  608. let a = A { influx: InfluxWriter::default() };
  609. a.f();
  610. }
  611. #[test]
  612. fn it_clones_an_influx_writer_to_check_both_drop() {
  613. let influx = InfluxWriter::default();
  614. measure!(influx, drop_test, i(a, 1), i(b, 2));
  615. {
  616. let influx = influx.clone();
  617. thread::spawn(move || {
  618. measure!(influx, drop_test, i(a, 3), i(b, 4));
  619. });
  620. }
  621. }
  622. #[bench]
  623. fn influx_writer_send_basic(b: &mut Bencher) {
  624. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  625. b.iter(|| {
  626. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  627. });
  628. }
  629. #[bench]
  630. fn influx_writer_send_price(b: &mut Bencher) {
  631. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  632. b.iter(|| {
  633. measure!(m, test,
  634. t(ticker, t!(xmr-btc).as_str()),
  635. t(exchange, "plnx"),
  636. d(bid, d128::zero()),
  637. d(ask, d128::zero()),
  638. );
  639. });
  640. }
  641. #[test]
  642. fn it_checks_color_tag_error_in_non_doctest() {
  643. let (tx, rx) = channel();
  644. measure!(tx, test, tag[color;"red"], int[n;1]);
  645. let meas: OwnedMeasurement = rx.recv().unwrap();
  646. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  647. }
  648. #[test]
  649. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  650. let meas = measure!(@make_meas test_measurement,
  651. tag [ one => "a" ],
  652. tag [ two => "b" ],
  653. int [ three => 2 ],
  654. float [ four => 1.2345 ],
  655. string [ five => String::from("d") ],
  656. bool [ six => true ],
  657. int [ seven => { 1 + 2 } ],
  658. time [ 1 ]
  659. );
  660. assert_eq!(meas.key, "test_measurement");
  661. assert_eq!(meas.get_tag("one"), Some("a"));
  662. assert_eq!(meas.get_tag("two"), Some("b"));
  663. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  664. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  665. assert_eq!(meas.timestamp, Some(1));
  666. }
  667. #[test]
  668. fn it_uses_the_measure_macro() {
  669. let (tx, rx) = channel();
  670. measure!(tx, test_measurement,
  671. tag [ one => "a" ],
  672. tag [ two => "b" ],
  673. int [ three => 2 ],
  674. float [ four => 1.2345 ],
  675. string [ five => String::from("d") ],
  676. bool [ six => true ],
  677. int [ seven => { 1 + 2 } ],
  678. time [ 1 ]
  679. );
  680. thread::sleep(Duration::from_millis(10));
  681. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  682. assert_eq!(meas.key, "test_measurement");
  683. assert_eq!(meas.get_tag("one"), Some("a"));
  684. assert_eq!(meas.get_tag("two"), Some("b"));
  685. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  686. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  687. assert_eq!(meas.timestamp, Some(1));
  688. }
  689. #[test]
  690. fn it_uses_measure_macro_for_d128_and_uuid() {
  691. let (tx, rx) = channel();
  692. let u = Uuid::new_v4();
  693. let d = d128::zero();
  694. let t = now();
  695. measure!(tx, test_measurement,
  696. tag[one; "a"],
  697. d128[two; d],
  698. uuid[three; u],
  699. time[t]
  700. );
  701. thread::sleep(Duration::from_millis(10));
  702. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  703. assert_eq!(meas.key, "test_measurement");
  704. assert_eq!(meas.get_tag("one"), Some("a"));
  705. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  706. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  707. assert_eq!(meas.timestamp, Some(t));
  708. }
  709. #[test]
  710. fn it_uses_the_measure_macro_alt_syntax() {
  711. let (tx, rx) = channel();
  712. measure!(tx, test_measurement,
  713. tag[one; "a"],
  714. tag[two; "b"],
  715. int[three; 2],
  716. float[four; 1.2345],
  717. string[five; String::from("d")],
  718. bool [ six => true ],
  719. int[seven; { 1 + 2 }],
  720. time[1]
  721. );
  722. thread::sleep(Duration::from_millis(10));
  723. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  724. assert_eq!(meas.key, "test_measurement");
  725. assert_eq!(meas.get_tag("one"), Some("a"));
  726. assert_eq!(meas.get_tag("two"), Some("b"));
  727. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  728. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  729. assert_eq!(meas.timestamp, Some(1));
  730. }
  731. #[test]
  732. fn it_checks_that_fields_are_separated_correctly() {
  733. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  734. assert_eq!(m.key, "test");
  735. assert_eq!(m.get_tag("a"), Some("one"));
  736. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  737. let mut buf = String::new();
  738. serialize_owned(&m, &mut buf);
  739. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  740. }
  741. #[test]
  742. fn try_to_break_measure_macro() {
  743. let (tx, _) = channel();
  744. measure!(tx, one, tag[x=>"y"], int[n;1]);
  745. measure!(tx, one, tag[x;"y"], int[n;1],);
  746. struct A {
  747. pub one: i32,
  748. pub two: i32,
  749. }
  750. struct B {
  751. pub a: A
  752. }
  753. let b = B { a: A { one: 1, two: 2 } };
  754. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  755. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  756. }
  757. #[bench]
  758. fn measure_macro_small(b: &mut Bencher) {
  759. let (tx, rx) = channel();
  760. let listener = thread::spawn(move || {
  761. loop { if rx.recv().is_err() { break } }
  762. });
  763. b.iter(|| {
  764. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  765. });
  766. }
  767. #[bench]
  768. fn measure_macro_medium(b: &mut Bencher) {
  769. let (tx, rx) = channel();
  770. let listener = thread::spawn(move || {
  771. loop { if rx.recv().is_err() { break } }
  772. });
  773. b.iter(|| {
  774. measure!(tx, test,
  775. tag[color; "red"],
  776. tag[mood => "playful"],
  777. tag [ ticker => "xmr_btc" ],
  778. float[ price => 1.2345 ],
  779. float[ amount => 56.323],
  780. int[n; 1],
  781. time[now()]
  782. );
  783. });
  784. }
  785. #[cfg(feature = "warnings")]
  786. #[test]
  787. #[ignore]
  788. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  789. let ctx = zmq::Context::new();
  790. let socket = push(&ctx).unwrap();
  791. let (tx, rx) = channel();
  792. let w = writer(tx.clone());
  793. let mut buf = String::with_capacity(4096);
  794. let mut meas = Measurement::new("rust_test");
  795. meas.add_tag("a", "t");
  796. meas.add_field("c", Value::Float(1.23456));
  797. let now = now();
  798. meas.set_timestamp(now);
  799. serialize(&meas, &mut buf);
  800. socket.send_str(&buf, 0).unwrap();
  801. drop(w);
  802. }
  803. #[test]
  804. fn it_serializes_a_measurement_in_place() {
  805. let mut buf = String::with_capacity(4096);
  806. let mut meas = Measurement::new("rust_test");
  807. meas.add_tag("a", "b");
  808. meas.add_field("c", Value::Float(1.0));
  809. let now = now();
  810. meas.set_timestamp(now);
  811. serialize(&meas, &mut buf);
  812. let ans = format!("rust_test,a=b c=1 {}", now);
  813. assert_eq!(buf, ans);
  814. }
  815. #[test]
  816. fn it_serializes_a_hard_to_serialize_message() {
  817. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  818. let mut buf = String::new();
  819. let mut server_resp = String::new();
  820. let mut m = Measurement::new("rust_test");
  821. m.add_field("s", Value::String(&raw));
  822. let now = now();
  823. m.set_timestamp(now);
  824. serialize(&m, &mut buf);
  825. println!("{}", buf);
  826. buf.push_str("\n");
  827. let buf_copy = buf.clone();
  828. buf.push_str(&buf_copy);
  829. println!("{}", buf);
  830. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  831. let client = Client::new();
  832. match client.post(url.clone())
  833. .body(&buf)
  834. .send() {
  835. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  836. Ok(mut resp) => {
  837. resp.read_to_string(&mut server_resp).unwrap();
  838. panic!("{}", server_resp);
  839. }
  840. Err(why) => {
  841. panic!(why)
  842. }
  843. }
  844. }
  845. #[bench]
  846. fn serialize_owned_longer(b: &mut Bencher) {
  847. let mut buf = String::with_capacity(1024);
  848. let m =
  849. OwnedMeasurement::new("test")
  850. .add_tag("one", "a")
  851. .add_tag("two", "b")
  852. .add_tag("ticker", "xmr_btc")
  853. .add_tag("exchange", "plnx")
  854. .add_tag("side", "bid")
  855. .add_field("three", OwnedValue::Float(1.2345))
  856. .add_field("four", OwnedValue::Integer(57))
  857. .add_field("five", OwnedValue::Boolean(true))
  858. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  859. .set_timestamp(now());
  860. b.iter(|| {
  861. serialize_owned(&m, &mut buf);
  862. buf.clear()
  863. });
  864. }
  865. #[bench]
  866. fn serialize_owned_simple(b: &mut Bencher) {
  867. let mut buf = String::with_capacity(1024);
  868. let m =
  869. OwnedMeasurement::new("test")
  870. .add_tag("one", "a")
  871. .add_tag("two", "b")
  872. .add_field("three", OwnedValue::Float(1.2345))
  873. .add_field("four", OwnedValue::Integer(57))
  874. .set_timestamp(now());
  875. b.iter(|| {
  876. serialize_owned(&m, &mut buf);
  877. buf.clear()
  878. });
  879. }
  880. #[test]
  881. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  882. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  883. let mut buf = String::new();
  884. let mut server_resp = String::new();
  885. let m = OwnedMeasurement::new("rust_test")
  886. .add_field("s", OwnedValue::String(raw.to_string()))
  887. .set_timestamp(now());
  888. serialize_owned(&m, &mut buf);
  889. println!("{}", buf);
  890. buf.push_str("\n");
  891. let buf_copy = buf.clone();
  892. buf.push_str(&buf_copy);
  893. println!("{}", buf);
  894. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  895. let client = Client::new();
  896. match client.post(url.clone())
  897. .body(&buf)
  898. .send() {
  899. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  900. Ok(mut resp) => {
  901. resp.read_to_string(&mut server_resp).unwrap();
  902. panic!("{}", server_resp);
  903. }
  904. Err(why) => {
  905. panic!(why)
  906. }
  907. }
  908. }
  909. }