You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1062 lines
36KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. use std::fs;
  8. use std::time::Duration;
  9. use std::hash::BuildHasherDefault;
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. #[allow(unused_imports)]
  17. use chrono::{DateTime, Utc};
  18. use sloggers::types::Severity;
  19. use ordermap::OrderMap;
  20. use fnv::FnvHasher;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use super::{nanos, file_logger, LOG_LEVEL};
  24. #[cfg(feature = "warnings")]
  25. use warnings::Warning;
  26. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  27. const ZMQ_RCV_HWM: i32 = 0;
  28. const ZMQ_SND_HWM: i32 = 0;
  29. const BUFFER_SIZE: u16 = 80;
  30. #[cfg(not(any(test, feature = "test")))]
  31. const DB_NAME: &'static str = "mm2";
  32. #[cfg(any(test, feature = "test"))]
  33. const DB_NAME: &'static str = "mm2_test";
  34. #[cfg(not(any(feature = "scholes")))]
  35. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  36. #[cfg(feature = "scholes")]
  37. const DB_HOST: &'static str = "http://159.203.81.249:8086/write";
  38. pub use super::{dur_nanos, dt_nanos};
  39. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  40. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  41. Map::with_capacity_and_hasher(capacity, Default::default())
  42. }
  43. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  44. ///
  45. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  46. /// values, as well as sends it with the `Sender`.
  47. ///
  48. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  49. /// measurement (see `tests` mod).
  50. ///
  51. /// # Examples
  52. ///
  53. /// ```
  54. /// #[macro_use] extern crate logging;
  55. /// extern crate decimal;
  56. ///
  57. /// use std::sync::mpsc::channel;
  58. /// use decimal::d128;
  59. /// use logging::influx::*;
  60. ///
  61. /// fn main() {
  62. /// let (tx, rx) = channel();
  63. ///
  64. /// // "shorthand" syntax
  65. ///
  66. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  67. ///
  68. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  69. ///
  70. /// assert_eq!(meas.key, "test");
  71. /// assert_eq!(meas.tags.get("color"), Some(&"red"));
  72. /// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
  73. ///
  74. /// // alternate syntax ...
  75. ///
  76. /// measure!(tx, test,
  77. /// tag [ one => "a" ],
  78. /// tag [ two => "b" ],
  79. /// int [ three => 2 ],
  80. /// float [ four => 1.2345 ],
  81. /// string [ five => String::from("d") ],
  82. /// bool [ six => true ],
  83. /// int [ seven => { 1 + 2 } ],
  84. /// time [ 1 ]
  85. /// );
  86. ///
  87. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  88. ///
  89. /// assert_eq!(meas.key, "test");
  90. /// assert_eq!(meas.tags.get("one"), Some(&"a"));
  91. /// assert_eq!(meas.tags.get("two"), Some(&"b"));
  92. /// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  93. /// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  94. /// assert_eq!(meas.timestamp, Some(1));
  95. ///
  96. /// // use the @make_meas flag to skip sending a measurement, instead merely
  97. /// // creating it.
  98. ///
  99. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  100. ///
  101. /// // each variant also has shorthand aliases
  102. ///
  103. /// let meas: OwnedMeasurement =
  104. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  105. /// }
  106. /// ```
  107. ///
  108. #[macro_export]
  109. macro_rules! measure {
  110. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  111. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  112. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  113. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  114. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  115. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  116. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  117. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  118. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  119. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  120. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  121. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  122. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  123. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  124. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  125. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  126. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  127. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  128. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  129. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  130. (@as_expr $e:expr) => {$e};
  131. (@count_tags) => {0usize};
  132. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  133. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  134. (@count_fields) => {0usize};
  135. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  136. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  137. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  138. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  139. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  140. };
  141. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  142. let n_tags = measure!(@count_tags $($t)*);
  143. let n_fields = measure!(@count_fields $($t)*);
  144. let mut meas =
  145. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  146. $(
  147. measure!(@kv $t, meas, $($tail)*);
  148. )*
  149. meas
  150. }};
  151. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  152. measure!($m, $name, $($t [ $($tail)* ] ),+)
  153. };
  154. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  155. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  156. let _ = $m.send(measurement);
  157. }};
  158. }
  159. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  160. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  161. /// measurements have accumulated.
  162. ///
  163. #[derive(Debug)]
  164. pub struct InfluxWriter {
  165. host: String,
  166. db: String,
  167. tx: Sender<Option<OwnedMeasurement>>,
  168. thread: Option<Arc<thread::JoinHandle<()>>>,
  169. }
  170. impl Default for InfluxWriter {
  171. fn default() -> Self {
  172. if cfg!(any(test, feature = "test")) {
  173. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 1)
  174. } else {
  175. InfluxWriter::new("localhost", "mm2", "/home/jstrong/src/logging/var/log/influx-default.log", BUFFER_SIZE)
  176. }
  177. }
  178. }
  179. impl Clone for InfluxWriter {
  180. fn clone(&self) -> Self {
  181. debug_assert!(self.thread.is_some());
  182. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  183. InfluxWriter {
  184. host: self.host.to_string(),
  185. db: self.db.to_string(),
  186. tx: self.tx.clone(),
  187. thread,
  188. }
  189. }
  190. }
  191. impl InfluxWriter {
  192. /// Sends the `OwnedMeasurement` to the serialization thread.
  193. ///
  194. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  195. self.tx.send(Some(m))
  196. }
  197. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  198. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  199. self.tx.clone()
  200. }
  201. #[allow(unused_assignments)]
  202. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  203. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  204. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  205. #[cfg(feature = "no-influx-buffer")]
  206. let buffer_size = 0u16;
  207. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  208. let url =
  209. Url::parse_with_params(&format!("http://{}:8086/write", host),
  210. &[("db", db), ("precision", "ns")])
  211. .expect("influx writer url should parse");
  212. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  213. let client = Client::new();
  214. debug!(logger, "initializing buffers");
  215. let mut buf = String::with_capacity(32 * 32 * 32);
  216. let mut count = 0;
  217. let send = |buf: &str| {
  218. let resp = client.post(url.clone())
  219. .body(buf)
  220. .send();
  221. match resp {
  222. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  223. debug!(logger, "server responded ok: 204 NoContent");
  224. }
  225. Ok(mut resp) => {
  226. let mut server_resp = String::with_capacity(1024);
  227. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  228. error!(logger, "influx server error";
  229. "status" => resp.status.to_string(),
  230. "body" => server_resp);
  231. }
  232. Err(why) => {
  233. error!(logger, "http request failed: {:?}", why);
  234. }
  235. }
  236. };
  237. let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
  238. match prev {
  239. 0 if buffer_size > 0 => {
  240. serialize_owned(m, buf);
  241. 1
  242. }
  243. n if n < buffer_size => {
  244. buf.push_str("\n");
  245. serialize_owned(m, buf);
  246. n + 1
  247. }
  248. n => {
  249. buf.push_str("\n");
  250. serialize_owned(m, buf);
  251. debug!(logger, "sending buffer to influx"; "len" => n);
  252. send(buf);
  253. buf.clear();
  254. 0
  255. }
  256. }
  257. };
  258. loop {
  259. match rx.recv() {
  260. Ok(Some(mut meas)) => {
  261. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  262. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  263. count = next(count, &meas, &mut buf);
  264. }
  265. Ok(None) => {
  266. if buf.len() > 0 {
  267. debug!(logger, "sending buffer to influx"; "len" => count);
  268. send(&buf)
  269. }
  270. break
  271. }
  272. _ => {
  273. thread::sleep(Duration::new(0, 1))
  274. }
  275. }
  276. }
  277. }).unwrap();
  278. InfluxWriter {
  279. host: host.to_string(),
  280. db: db.to_string(),
  281. tx,
  282. thread: Some(Arc::new(thread))
  283. }
  284. }
  285. }
  286. impl Drop for InfluxWriter {
  287. fn drop(&mut self) {
  288. if let Some(arc) = self.thread.take() {
  289. if let Ok(thread) = Arc::try_unwrap(arc) {
  290. let _ = self.tx.send(None);
  291. let _ = thread.join();
  292. }
  293. }
  294. }
  295. }
  296. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  297. let socket = ctx.socket(zmq::PULL)?;
  298. socket.bind(WRITER_ADDR)?;
  299. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  300. Ok(socket)
  301. }
  302. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  303. let socket = ctx.socket(zmq::PUSH)?;
  304. socket.connect(WRITER_ADDR)?;
  305. socket.set_sndhwm(ZMQ_SND_HWM)?;
  306. Ok(socket)
  307. }
  308. /// This removes offending things rather than escaping them.
  309. ///
  310. fn escape_tag(s: &str) -> String {
  311. s.replace(" ", "")
  312. .replace(",", "")
  313. .replace("\"", "")
  314. }
  315. fn escape(s: &str) -> String {
  316. s.replace(" ", "\\ ")
  317. .replace(",", "\\,")
  318. }
  319. fn as_string(s: &str) -> String {
  320. // the second replace removes double escapes
  321. //
  322. format!("\"{}\"", s.replace("\"", "\\\"")
  323. .replace(r#"\\""#, r#"\""#))
  324. }
  325. #[test]
  326. fn it_checks_as_string_does_not_double_escape() {
  327. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  328. let escaped = as_string(&raw);
  329. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  330. }
  331. fn as_integer(i: &i64) -> String {
  332. format!("{}i", i)
  333. }
  334. fn as_float(f: &f64) -> String {
  335. f.to_string()
  336. }
  337. fn as_boolean(b: &bool) -> &str {
  338. if *b { "t" } else { "f" }
  339. }
  340. pub fn now() -> i64 {
  341. nanos(Utc::now()) as i64
  342. }
  343. /// Serialize the measurement into influx line protocol
  344. /// and append to the buffer.
  345. ///
  346. /// # Examples
  347. ///
  348. /// ```
  349. /// extern crate influent;
  350. /// extern crate logging;
  351. ///
  352. /// use influent::measurement::{Measurement, Value};
  353. /// use std::string::String;
  354. /// use logging::influx::serialize;
  355. ///
  356. /// fn main() {
  357. /// let mut buf = String::new();
  358. /// let mut m = Measurement::new("test");
  359. /// m.add_field("x", Value::Integer(1));
  360. /// serialize(&m, &mut buf);
  361. /// }
  362. ///
  363. /// ```
  364. ///
  365. pub fn serialize(measurement: &Measurement, line: &mut String) {
  366. line.push_str(&escape(measurement.key));
  367. for (tag, value) in measurement.tags.iter() {
  368. line.push_str(",");
  369. line.push_str(&escape(tag));
  370. line.push_str("=");
  371. line.push_str(&escape(value));
  372. }
  373. let mut was_spaced = false;
  374. for (field, value) in measurement.fields.iter() {
  375. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  376. line.push_str(&escape(field));
  377. line.push_str("=");
  378. match value {
  379. &Value::String(ref s) => line.push_str(&as_string(s)),
  380. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  381. &Value::Float(ref f) => line.push_str(&as_float(f)),
  382. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  383. };
  384. }
  385. match measurement.timestamp {
  386. Some(t) => {
  387. line.push_str(" ");
  388. line.push_str(&t.to_string());
  389. }
  390. _ => {}
  391. }
  392. }
  393. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  394. ///
  395. /// The serialized measurement is appended to the end of the string without
  396. /// any regard for what exited in it previously.
  397. ///
  398. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  399. line.push_str(&escape_tag(measurement.key));
  400. let add_tag = |line: &mut String, key: &str, value: &str| {
  401. line.push_str(",");
  402. line.push_str(&escape_tag(key));
  403. line.push_str("=");
  404. line.push_str(&escape(value));
  405. };
  406. for (key, value) in measurement.tags.iter() {
  407. add_tag(line, key, value);
  408. }
  409. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  410. if is_first { line.push_str(" "); } else { line.push_str(","); }
  411. line.push_str(&escape_tag(key));
  412. line.push_str("=");
  413. match *value {
  414. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  415. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  416. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  417. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  418. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  419. #[cfg(not(feature = "disable-short-uuid"))]
  420. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  421. #[cfg(feature = "disable-short-uuid")]
  422. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  423. };
  424. };
  425. let mut fields = measurement.fields.iter();
  426. // first time separate from tags with space
  427. //
  428. fields.next().map(|kv| {
  429. add_field(line, kv.0, kv.1, true);
  430. });
  431. // then seperate the rest w/ comma
  432. //
  433. for kv in fields {
  434. add_field(line, kv.0, kv.1, false);
  435. }
  436. if let Some(t) = measurement.timestamp {
  437. line.push_str(" ");
  438. line.push_str(&t.to_string());
  439. }
  440. }
  441. #[cfg(feature = "warnings")]
  442. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  443. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  444. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  445. let _ = fs::create_dir("/tmp/mm");
  446. let ctx = zmq::Context::new();
  447. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  448. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  449. let client = Client::new();
  450. let mut buf = String::with_capacity(4096);
  451. let mut server_resp = String::with_capacity(4096);
  452. let mut count = 0;
  453. loop {
  454. if let Ok(bytes) = socket.recv_bytes(0) {
  455. if let Ok(msg) = String::from_utf8(bytes) {
  456. count = match count {
  457. 0 => {
  458. buf.push_str(&msg);
  459. 1
  460. }
  461. n @ 1...40 => {
  462. buf.push_str("\n");
  463. buf.push_str(&msg);
  464. n + 1
  465. }
  466. _ => {
  467. buf.push_str("\n");
  468. buf.push_str(&msg);
  469. match client.post(url.clone())
  470. .body(&buf)
  471. .send() {
  472. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  473. Ok(mut resp) => {
  474. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  475. let _ = warnings.send(
  476. Warning::Error(
  477. format!("Influx server: {}", server_resp)));
  478. server_resp.clear();
  479. }
  480. Err(why) => {
  481. let _ = warnings.send(
  482. Warning::Error(
  483. format!("Influx write error: {}", why)));
  484. }
  485. }
  486. buf.clear();
  487. 0
  488. }
  489. }
  490. }
  491. }
  492. }
  493. }).unwrap()
  494. }
  495. #[derive(Debug, Clone, PartialEq)]
  496. pub enum OwnedValue {
  497. String(String),
  498. Float(f64),
  499. Integer(i64),
  500. Boolean(bool),
  501. D128(d128),
  502. Uuid(Uuid),
  503. }
  504. #[derive(Clone, Debug)]
  505. pub struct OwnedMeasurement {
  506. pub key: &'static str,
  507. pub timestamp: Option<i64>,
  508. pub fields: Map<&'static str, OwnedValue>,
  509. pub tags: Map<&'static str, &'static str>,
  510. }
  511. impl OwnedMeasurement {
  512. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  513. OwnedMeasurement {
  514. key,
  515. timestamp: None,
  516. tags: new_map(n_tags),
  517. fields: new_map(n_fields),
  518. }
  519. }
  520. pub fn new(key: &'static str) -> Self {
  521. OwnedMeasurement::with_capacity(key, 4, 4)
  522. }
  523. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  524. self.tags.insert(key, value);
  525. self
  526. }
  527. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  528. self.fields.insert(key, value);
  529. self
  530. }
  531. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  532. self.timestamp = Some(timestamp);
  533. self
  534. }
  535. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  536. *self.tags.entry(key).or_insert(value) = value;
  537. self
  538. }
  539. }
  540. #[allow(unused_imports, unused_variables)]
  541. #[cfg(test)]
  542. mod tests {
  543. use super::*;
  544. use test::{black_box, Bencher};
  545. #[test]
  546. fn it_uses_the_new_tag_k_only_shortcut() {
  547. let tag_value = "one";
  548. let color = "red";
  549. let time = now();
  550. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  551. assert_eq!(m.tags.get("color"), Some(&"red"));
  552. assert_eq!(m.tags.get("tag_value"), Some(&"one"));
  553. assert_eq!(m.timestamp, Some(time));
  554. }
  555. #[test]
  556. fn it_uses_measure_macro_parenthesis_syntax() {
  557. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  558. assert_eq!(m.key, "test");
  559. assert_eq!(m.tags.get("a"), Some(&"b"));
  560. assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
  561. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  562. assert_eq!(m.timestamp, Some(1));
  563. }
  564. #[test]
  565. fn it_uses_measure_macro_on_a_self_attribute() {
  566. struct A {
  567. pub influx: InfluxWriter,
  568. }
  569. impl A {
  570. fn f(&self) {
  571. measure!(self.influx, test, t(color, "red"), i(n, 1));
  572. }
  573. }
  574. let a = A { influx: InfluxWriter::default() };
  575. a.f();
  576. }
  577. #[test]
  578. fn it_clones_an_influx_writer_to_check_both_drop() {
  579. let influx = InfluxWriter::default();
  580. measure!(influx, drop_test, i(a, 1), i(b, 2));
  581. {
  582. let influx = influx.clone();
  583. thread::spawn(move || {
  584. measure!(influx, drop_test, i(a, 3), i(b, 4));
  585. });
  586. }
  587. }
  588. #[bench]
  589. fn influx_writer_send_basic(b: &mut Bencher) {
  590. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  591. b.iter(|| {
  592. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  593. });
  594. }
  595. #[bench]
  596. fn influx_writer_send_price(b: &mut Bencher) {
  597. let m = InfluxWriter::default();
  598. b.iter(|| {
  599. measure!(m, test,
  600. tag[ticker; t!(xmr-btc).to_str()],
  601. tag[exchange; "plnx"],
  602. d128[bid; d128::zero()],
  603. d128[ask; d128::zero()],
  604. );
  605. });
  606. }
  607. #[test]
  608. fn it_checks_color_tag_error_in_non_doctest() {
  609. let (tx, rx) = channel();
  610. measure!(tx, test, tag[color;"red"], int[n;1]);
  611. let meas: OwnedMeasurement = rx.recv().unwrap();
  612. assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
  613. }
  614. #[test]
  615. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  616. let meas = measure!(@make_meas test_measurement,
  617. tag [ one => "a" ],
  618. tag [ two => "b" ],
  619. int [ three => 2 ],
  620. float [ four => 1.2345 ],
  621. string [ five => String::from("d") ],
  622. bool [ six => true ],
  623. int [ seven => { 1 + 2 } ],
  624. time [ 1 ]
  625. );
  626. assert_eq!(meas.key, "test_measurement");
  627. assert_eq!(meas.tags.get("one"), Some(&"a"));
  628. assert_eq!(meas.tags.get("two"), Some(&"b"));
  629. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  630. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  631. assert_eq!(meas.timestamp, Some(1));
  632. }
  633. #[test]
  634. fn it_uses_the_measure_macro() {
  635. let (tx, rx) = channel();
  636. measure!(tx, test_measurement,
  637. tag [ one => "a" ],
  638. tag [ two => "b" ],
  639. int [ three => 2 ],
  640. float [ four => 1.2345 ],
  641. string [ five => String::from("d") ],
  642. bool [ six => true ],
  643. int [ seven => { 1 + 2 } ],
  644. time [ 1 ]
  645. );
  646. thread::sleep(Duration::from_millis(10));
  647. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  648. assert_eq!(meas.key, "test_measurement");
  649. assert_eq!(meas.tags.get("one"), Some(&"a"));
  650. assert_eq!(meas.tags.get("two"), Some(&"b"));
  651. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  652. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  653. assert_eq!(meas.timestamp, Some(1));
  654. }
  655. #[test]
  656. fn it_uses_measure_macro_for_d128_and_uuid() {
  657. let (tx, rx) = channel();
  658. let u = Uuid::new_v4();
  659. let d = d128::zero();
  660. let t = now();
  661. measure!(tx, test_measurement,
  662. tag[one; "a"],
  663. d128[two; d],
  664. uuid[three; u],
  665. time[t]
  666. );
  667. thread::sleep(Duration::from_millis(10));
  668. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  669. assert_eq!(meas.key, "test_measurement");
  670. assert_eq!(meas.tags.get("one"), Some(&"a"));
  671. assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
  672. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
  673. assert_eq!(meas.timestamp, Some(t));
  674. }
  675. #[test]
  676. fn it_uses_the_measure_macro_alt_syntax() {
  677. let (tx, rx) = channel();
  678. measure!(tx, test_measurement,
  679. tag[one; "a"],
  680. tag[two; "b"],
  681. int[three; 2],
  682. float[four; 1.2345],
  683. string[five; String::from("d")],
  684. bool [ six => true ],
  685. int[seven; { 1 + 2 }],
  686. time[1]
  687. );
  688. thread::sleep(Duration::from_millis(10));
  689. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  690. assert_eq!(meas.key, "test_measurement");
  691. assert_eq!(meas.tags.get("one"), Some(&"a"));
  692. assert_eq!(meas.tags.get("two"), Some(&"b"));
  693. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  694. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  695. assert_eq!(meas.timestamp, Some(1));
  696. }
  697. #[test]
  698. fn it_checks_that_fields_are_separated_correctly() {
  699. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  700. assert_eq!(m.key, "test");
  701. assert_eq!(m.tags.get("a"), Some(&"one"));
  702. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  703. let mut buf = String::new();
  704. serialize_owned(&m, &mut buf);
  705. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  706. }
  707. #[test]
  708. fn try_to_break_measure_macro() {
  709. let (tx, _) = channel();
  710. measure!(tx, one, tag[x=>"y"], int[n;1]);
  711. measure!(tx, one, tag[x;"y"], int[n;1],);
  712. struct A {
  713. pub one: i32,
  714. pub two: i32,
  715. }
  716. struct B {
  717. pub a: A
  718. }
  719. let b = B { a: A { one: 1, two: 2 } };
  720. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  721. assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
  722. }
  723. #[bench]
  724. fn measure_macro_small(b: &mut Bencher) {
  725. let (tx, rx) = channel();
  726. let listener = thread::spawn(move || {
  727. loop { if rx.recv().is_err() { break } }
  728. });
  729. b.iter(|| {
  730. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  731. });
  732. }
  733. #[bench]
  734. fn measure_macro_medium(b: &mut Bencher) {
  735. let (tx, rx) = channel();
  736. let listener = thread::spawn(move || {
  737. loop { if rx.recv().is_err() { break } }
  738. });
  739. b.iter(|| {
  740. measure!(tx, test,
  741. tag[color; "red"],
  742. tag[mood => "playful"],
  743. tag [ ticker => "xmr_btc" ],
  744. float[ price => 1.2345 ],
  745. float[ amount => 56.323],
  746. int[n; 1],
  747. time[now()]
  748. );
  749. });
  750. }
  751. #[cfg(feature = "warnings")]
  752. #[test]
  753. #[ignore]
  754. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  755. let ctx = zmq::Context::new();
  756. let socket = push(&ctx).unwrap();
  757. let (tx, rx) = channel();
  758. let w = writer(tx.clone());
  759. let mut buf = String::with_capacity(4096);
  760. let mut meas = Measurement::new("rust_test");
  761. meas.add_tag("a", "t");
  762. meas.add_field("c", Value::Float(1.23456));
  763. let now = now();
  764. meas.set_timestamp(now);
  765. serialize(&meas, &mut buf);
  766. socket.send_str(&buf, 0).unwrap();
  767. drop(w);
  768. }
  769. #[test]
  770. fn it_serializes_a_measurement_in_place() {
  771. let mut buf = String::with_capacity(4096);
  772. let mut meas = Measurement::new("rust_test");
  773. meas.add_tag("a", "b");
  774. meas.add_field("c", Value::Float(1.0));
  775. let now = now();
  776. meas.set_timestamp(now);
  777. serialize(&meas, &mut buf);
  778. let ans = format!("rust_test,a=b c=1 {}", now);
  779. assert_eq!(buf, ans);
  780. }
  781. #[test]
  782. fn it_serializes_a_hard_to_serialize_message() {
  783. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  784. let mut buf = String::new();
  785. let mut server_resp = String::new();
  786. let mut m = Measurement::new("rust_test");
  787. m.add_field("s", Value::String(&raw));
  788. let now = now();
  789. m.set_timestamp(now);
  790. serialize(&m, &mut buf);
  791. println!("{}", buf);
  792. buf.push_str("\n");
  793. let buf_copy = buf.clone();
  794. buf.push_str(&buf_copy);
  795. println!("{}", buf);
  796. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  797. let client = Client::new();
  798. match client.post(url.clone())
  799. .body(&buf)
  800. .send() {
  801. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  802. Ok(mut resp) => {
  803. resp.read_to_string(&mut server_resp).unwrap();
  804. panic!("{}", server_resp);
  805. }
  806. Err(why) => {
  807. panic!(why)
  808. }
  809. }
  810. }
  811. #[bench]
  812. fn serialize_owned_longer(b: &mut Bencher) {
  813. let mut buf = String::with_capacity(1024);
  814. let m =
  815. OwnedMeasurement::new("test")
  816. .add_tag("one", "a")
  817. .add_tag("two", "b")
  818. .add_tag("ticker", "xmr_btc")
  819. .add_tag("exchange", "plnx")
  820. .add_tag("side", "bid")
  821. .add_field("three", OwnedValue::Float(1.2345))
  822. .add_field("four", OwnedValue::Integer(57))
  823. .add_field("five", OwnedValue::Boolean(true))
  824. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  825. .set_timestamp(now());
  826. b.iter(|| {
  827. serialize_owned(&m, &mut buf);
  828. buf.clear()
  829. });
  830. }
  831. #[bench]
  832. fn serialize_owned_simple(b: &mut Bencher) {
  833. let mut buf = String::with_capacity(1024);
  834. let m =
  835. OwnedMeasurement::new("test")
  836. .add_tag("one", "a")
  837. .add_tag("two", "b")
  838. .add_field("three", OwnedValue::Float(1.2345))
  839. .add_field("four", OwnedValue::Integer(57))
  840. .set_timestamp(now());
  841. b.iter(|| {
  842. serialize_owned(&m, &mut buf);
  843. buf.clear()
  844. });
  845. }
  846. #[test]
  847. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  848. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  849. let mut buf = String::new();
  850. let mut server_resp = String::new();
  851. let m = OwnedMeasurement::new("rust_test")
  852. .add_field("s", OwnedValue::String(raw.to_string()))
  853. .set_timestamp(now());
  854. serialize_owned(&m, &mut buf);
  855. println!("{}", buf);
  856. buf.push_str("\n");
  857. let buf_copy = buf.clone();
  858. buf.push_str(&buf_copy);
  859. println!("{}", buf);
  860. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  861. let client = Client::new();
  862. match client.post(url.clone())
  863. .body(&buf)
  864. .send() {
  865. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  866. Ok(mut resp) => {
  867. resp.read_to_string(&mut server_resp).unwrap();
  868. panic!("{}", server_resp);
  869. }
  870. Err(why) => {
  871. panic!(why)
  872. }
  873. }
  874. }
  875. // macro_rules! make_measurement {
  876. // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  877. // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  878. // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  879. // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  880. // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
  881. // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
  882. // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  883. // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
  884. //
  885. // (@count_tags) => {0usize};
  886. // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  887. // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  888. //
  889. // (@count_fields) => {0usize};
  890. // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  891. // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  892. // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  893. //
  894. // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  895. // let n_tags = measure!(@count_tags $($t)*);
  896. // let n_fields = measure!(@count_fields $($t)*);
  897. // let mut meas =
  898. // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  899. // $(
  900. // measure!(@kv $t, meas, $($tail)*);
  901. // )*
  902. // //let _ = $m.send(meas);
  903. // meas
  904. // }};
  905. // }
  906. //
  907. // #[test]
  908. // fn it_checks_n_tags_is_correct() {
  909. // let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
  910. // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
  911. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
  912. // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
  913. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
  914. //
  915. // let m4 =
  916. // make_measurement!(tx, test,
  917. // tag[a;"b"],
  918. // tag[c;"d"],
  919. // int[n; 1],
  920. // tag[e;"f"],
  921. // float[x; 1.234],
  922. // tag[g;"h"],
  923. // time[1],
  924. // );
  925. // assert_eq!(m4.n_tags, 4);
  926. // assert_eq!(m4.n_fields, 2);
  927. // }
  928. }