You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1047 lines
36KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::mpsc::{Sender, channel, SendError};
  5. use std::thread;
  6. use std::fs;
  7. use std::time::Duration;
  8. use std::hash::BuildHasherDefault;
  9. use hyper::status::StatusCode;
  10. use hyper::client::response::Response;
  11. use hyper::Url;
  12. use hyper::client::Client;
  13. use influent::measurement::{Measurement, Value};
  14. use zmq;
  15. #[allow(unused_imports)]
  16. use chrono::{DateTime, Utc};
  17. use sloggers::types::Severity;
  18. use ordermap::OrderMap;
  19. use fnv::FnvHasher;
  20. use decimal::d128;
  21. use uuid::Uuid;
  22. use super::{nanos, file_logger};
  23. use warnings::Warning;
  24. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  25. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  26. const DB_NAME: &'static str = "mm";
  27. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  28. //const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  29. const ZMQ_RCV_HWM: i32 = 0;
  30. const ZMQ_SND_HWM: i32 = 0;
  31. const BUFFER_SIZE: u8 = 80;
  32. pub use super::{dur_nanos, dt_nanos};
  33. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  34. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  35. Map::with_capacity_and_hasher(capacity, Default::default())
  36. }
  37. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  38. ///
  39. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  40. /// values, as well as sends it with the `Sender`.
  41. ///
  42. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  43. /// measurement (see `tests` mod).
  44. ///
  45. /// # Examples
  46. ///
  47. /// ```
  48. /// #[macro_use] extern crate logging;
  49. /// extern crate decimal;
  50. ///
  51. /// use std::sync::mpsc::channel;
  52. /// use decimal::d128;
  53. /// use logging::influx::*;
  54. ///
  55. /// fn main() {
  56. /// let (tx, rx) = channel();
  57. ///
  58. /// // "shorthand" syntax
  59. ///
  60. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  61. ///
  62. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  63. ///
  64. /// assert_eq!(meas.key, "test");
  65. /// assert_eq!(meas.tags.get("color"), Some(&"red"));
  66. /// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
  67. ///
  68. /// // alternate syntax ...
  69. ///
  70. /// measure!(tx, test,
  71. /// tag [ one => "a" ],
  72. /// tag [ two => "b" ],
  73. /// int [ three => 2 ],
  74. /// float [ four => 1.2345 ],
  75. /// string [ five => String::from("d") ],
  76. /// bool [ six => true ],
  77. /// int [ seven => { 1 + 2 } ],
  78. /// time [ 1 ]
  79. /// );
  80. ///
  81. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  82. ///
  83. /// assert_eq!(meas.key, "test");
  84. /// assert_eq!(meas.tags.get("one"), Some(&"a"));
  85. /// assert_eq!(meas.tags.get("two"), Some(&"b"));
  86. /// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  87. /// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  88. /// assert_eq!(meas.timestamp, Some(1));
  89. ///
  90. /// // use the @make_meas flag to skip sending a measurement, instead merely
  91. /// // creating it.
  92. ///
  93. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  94. ///
  95. /// // each variant also has shorthand aliases
  96. ///
  97. /// let meas: OwnedMeasurement =
  98. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  99. /// }
  100. /// ```
  101. ///
  102. #[macro_export]
  103. macro_rules! measure {
  104. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  105. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  106. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  107. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  108. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  109. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  110. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  111. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  112. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  113. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  114. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  115. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  116. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  117. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  118. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  119. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  120. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  121. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  122. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  123. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  124. (@as_expr $e:expr) => {$e};
  125. (@count_tags) => {0usize};
  126. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  127. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  128. (@count_fields) => {0usize};
  129. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  130. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  131. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  132. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  133. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  134. };
  135. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  136. let n_tags = measure!(@count_tags $($t)*);
  137. let n_fields = measure!(@count_fields $($t)*);
  138. let mut meas =
  139. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  140. $(
  141. measure!(@kv $t, meas, $($tail)*);
  142. )*
  143. meas
  144. }};
  145. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  146. measure!($m, $name, $($t [ $($tail)* ] ),+)
  147. };
  148. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  149. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  150. let _ = $m.send(measurement);
  151. }};
  152. }
  153. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  154. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  155. /// measurements have accumulated.
  156. ///
  157. pub struct InfluxWriter {
  158. host: &'static str,
  159. db: &'static str,
  160. tx: Sender<OwnedMeasurement>,
  161. kill_switch: Sender<()>,
  162. thread: Option<thread::JoinHandle<()>>,
  163. }
  164. impl Default for InfluxWriter {
  165. fn default() -> Self {
  166. InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE)
  167. }
  168. }
  169. impl Clone for InfluxWriter {
  170. fn clone(&self) -> Self {
  171. let (tx, _) = channel();
  172. InfluxWriter {
  173. host: self.host,
  174. db: self.db,
  175. tx: self.tx.clone(),
  176. kill_switch: tx,
  177. thread: None,
  178. }
  179. }
  180. }
  181. impl InfluxWriter {
  182. /// Sends the `OwnedMeasurement` to the serialization thread.
  183. ///
  184. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<OwnedMeasurement>> {
  185. self.tx.send(m)
  186. }
  187. pub fn tx(&self) -> Sender<OwnedMeasurement> {
  188. self.tx.clone()
  189. }
  190. #[allow(unused_assignments)]
  191. pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self {
  192. let (kill_switch, terminate) = channel();
  193. let (tx, rx) = channel();
  194. let logger = file_logger(log_path, Severity::Info);
  195. let thread = thread::spawn(move || {
  196. debug!(logger, "initializing url";
  197. "DB_HOST" => host,
  198. "DB_NAME" => db);
  199. #[cfg(not(test))]
  200. let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
  201. #[cfg(not(test))]
  202. let client = Client::new();
  203. debug!(logger, "initializing buffers");
  204. let mut meas_buf = String::with_capacity(32 * 32 * 32);
  205. let mut buf = String::with_capacity(32 * 32 * 32);
  206. let mut count = 0;
  207. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  208. trace!(logger, "appending serialized measurement to buffer";
  209. "prev" => prev,
  210. "buf.len()" => buf.len());
  211. match prev {
  212. 0 if buffer_size > 0 => {
  213. buf.push_str(s);
  214. 1
  215. }
  216. n if n < buffer_size => {
  217. buf.push_str("\n");
  218. buf.push_str(s);
  219. n + 1
  220. }
  221. _ => {
  222. buf.push_str("\n");
  223. if s.len() > 0 {
  224. buf.push_str(s);
  225. }
  226. trace!(logger, "sending buffer to influx";
  227. "buf.len()" => buf.len());
  228. #[cfg(not(test))]
  229. {
  230. let resp = client.post(url.clone())
  231. .body(buf.as_str())
  232. .send();
  233. match resp {
  234. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  235. trace!(logger, "server responded ok: 204 NoContent");
  236. }
  237. Ok(mut resp) => {
  238. let mut server_resp = String::with_capacity(1024);
  239. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  240. error!(logger, "influx server error";
  241. "status" => resp.status.to_string(),
  242. "body" => server_resp);
  243. }
  244. Err(why) => {
  245. error!(logger, "http request failed: {:?}", why);
  246. }
  247. }
  248. }
  249. buf.clear();
  250. 0
  251. }
  252. }
  253. };
  254. let mut rcvd_msg = false;
  255. loop {
  256. rcvd_msg = false;
  257. let _ = rx.recv_timeout(Duration::from_millis(10))
  258. .map(|mut meas: OwnedMeasurement| {
  259. // if we didn't set the timestamp, it would end up
  260. // being whenever we accumulated `BUFFER_SIZE` messages,
  261. // which might be some period of time after we received
  262. // the message.
  263. //
  264. if meas.timestamp.is_none() {
  265. meas.timestamp = Some(now());
  266. }
  267. trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
  268. serialize_owned(&meas, &mut meas_buf);
  269. count = next(count, &meas_buf, &mut buf);
  270. meas_buf.clear();
  271. rcvd_msg = true;
  272. });
  273. let end = terminate.try_recv()
  274. .map(|_| {
  275. let _ = next(::std::u8::MAX, "", &mut buf);
  276. true
  277. }).unwrap_or(false);
  278. if end { break }
  279. #[cfg(feature = "no-thrash")]
  280. {
  281. if !rcvd_msg {
  282. thread::sleep(Duration::new(0, 5000));
  283. }
  284. }
  285. }
  286. debug!(logger, "goodbye");
  287. });
  288. InfluxWriter {
  289. host,
  290. db,
  291. tx,
  292. kill_switch,
  293. thread: Some(thread)
  294. }
  295. }
  296. }
  297. impl Drop for InfluxWriter {
  298. fn drop(&mut self) {
  299. let _ = self.kill_switch.send(()).unwrap();
  300. if let Some(thread) = self.thread.take() {
  301. let _ = thread.join();
  302. }
  303. }
  304. }
  305. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  306. let socket = ctx.socket(zmq::PULL)?;
  307. socket.bind(WRITER_ADDR)?;
  308. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  309. Ok(socket)
  310. }
  311. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  312. let socket = ctx.socket(zmq::PUSH)?;
  313. socket.connect(WRITER_ADDR)?;
  314. socket.set_sndhwm(ZMQ_SND_HWM)?;
  315. Ok(socket)
  316. }
  317. /// This removes offending things rather than escaping them.
  318. ///
  319. fn escape_tag(s: &str) -> String {
  320. s.replace(" ", "")
  321. .replace(",", "")
  322. .replace("\"", "")
  323. }
  324. fn escape(s: &str) -> String {
  325. s.replace(" ", "\\ ")
  326. .replace(",", "\\,")
  327. }
  328. fn as_string(s: &str) -> String {
  329. // the second replace removes double escapes
  330. //
  331. format!("\"{}\"", s.replace("\"", "\\\"")
  332. .replace(r#"\\""#, r#"\""#))
  333. }
  334. #[test]
  335. fn it_checks_as_string_does_not_double_escape() {
  336. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  337. let escaped = as_string(&raw);
  338. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  339. }
  340. fn as_integer(i: &i64) -> String {
  341. format!("{}i", i)
  342. }
  343. fn as_float(f: &f64) -> String {
  344. f.to_string()
  345. }
  346. fn as_boolean(b: &bool) -> &str {
  347. if *b { "t" } else { "f" }
  348. }
  349. pub fn now() -> i64 {
  350. nanos(Utc::now()) as i64
  351. }
  352. /// Serialize the measurement into influx line protocol
  353. /// and append to the buffer.
  354. ///
  355. /// # Examples
  356. ///
  357. /// ```
  358. /// extern crate influent;
  359. /// extern crate logging;
  360. ///
  361. /// use influent::measurement::{Measurement, Value};
  362. /// use std::string::String;
  363. /// use logging::influx::serialize;
  364. ///
  365. /// fn main() {
  366. /// let mut buf = String::new();
  367. /// let mut m = Measurement::new("test");
  368. /// m.add_field("x", Value::Integer(1));
  369. /// serialize(&m, &mut buf);
  370. /// }
  371. ///
  372. /// ```
  373. ///
  374. pub fn serialize(measurement: &Measurement, line: &mut String) {
  375. line.push_str(&escape(measurement.key));
  376. for (tag, value) in measurement.tags.iter() {
  377. line.push_str(",");
  378. line.push_str(&escape(tag));
  379. line.push_str("=");
  380. line.push_str(&escape(value));
  381. }
  382. let mut was_spaced = false;
  383. for (field, value) in measurement.fields.iter() {
  384. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  385. line.push_str(&escape(field));
  386. line.push_str("=");
  387. match value {
  388. &Value::String(ref s) => line.push_str(&as_string(s)),
  389. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  390. &Value::Float(ref f) => line.push_str(&as_float(f)),
  391. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  392. };
  393. }
  394. match measurement.timestamp {
  395. Some(t) => {
  396. line.push_str(" ");
  397. line.push_str(&t.to_string());
  398. }
  399. _ => {}
  400. }
  401. }
  402. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  403. ///
  404. /// The serialized measurement is appended to the end of the string without
  405. /// any regard for what exited in it previously.
  406. ///
  407. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  408. line.push_str(&escape_tag(measurement.key));
  409. let add_tag = |line: &mut String, key: &str, value: &str| {
  410. line.push_str(",");
  411. line.push_str(&escape_tag(key));
  412. line.push_str("=");
  413. line.push_str(&escape(value));
  414. };
  415. for (key, value) in measurement.tags.iter() {
  416. add_tag(line, key, value);
  417. }
  418. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  419. if is_first { line.push_str(" "); } else { line.push_str(","); }
  420. line.push_str(&escape_tag(key));
  421. line.push_str("=");
  422. match *value {
  423. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  424. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  425. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  426. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  427. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  428. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  429. };
  430. };
  431. let mut fields = measurement.fields.iter();
  432. // first time separate from tags with space
  433. //
  434. fields.next().map(|kv| {
  435. add_field(line, kv.0, kv.1, true);
  436. });
  437. // then seperate the rest w/ comma
  438. //
  439. for kv in fields {
  440. add_field(line, kv.0, kv.1, false);
  441. }
  442. if let Some(t) = measurement.timestamp {
  443. line.push_str(" ");
  444. line.push_str(&t.to_string());
  445. }
  446. }
  447. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  448. thread::spawn(move || {
  449. let _ = fs::create_dir("/tmp/mm");
  450. let ctx = zmq::Context::new();
  451. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  452. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  453. let client = Client::new();
  454. let mut buf = String::with_capacity(4096);
  455. let mut server_resp = String::with_capacity(4096);
  456. let mut count = 0;
  457. loop {
  458. if let Ok(bytes) = socket.recv_bytes(0) {
  459. if let Ok(msg) = String::from_utf8(bytes) {
  460. count = match count {
  461. 0 => {
  462. buf.push_str(&msg);
  463. 1
  464. }
  465. n @ 1...40 => {
  466. buf.push_str("\n");
  467. buf.push_str(&msg);
  468. n + 1
  469. }
  470. _ => {
  471. buf.push_str("\n");
  472. buf.push_str(&msg);
  473. match client.post(url.clone())
  474. .body(&buf)
  475. .send() {
  476. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  477. Ok(mut resp) => {
  478. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  479. let _ = warnings.send(
  480. Warning::Error(
  481. format!("Influx server: {}", server_resp)));
  482. server_resp.clear();
  483. }
  484. Err(why) => {
  485. let _ = warnings.send(
  486. Warning::Error(
  487. format!("Influx write error: {}", why)));
  488. }
  489. }
  490. buf.clear();
  491. 0
  492. }
  493. }
  494. }
  495. }
  496. }
  497. })
  498. }
  499. #[derive(Debug, Clone, PartialEq)]
  500. pub enum OwnedValue {
  501. String(String),
  502. Float(f64),
  503. Integer(i64),
  504. Boolean(bool),
  505. D128(d128),
  506. Uuid(Uuid),
  507. }
  508. #[derive(Clone, Debug)]
  509. pub struct OwnedMeasurement {
  510. pub key: &'static str,
  511. pub timestamp: Option<i64>,
  512. pub fields: Map<&'static str, OwnedValue>,
  513. pub tags: Map<&'static str, &'static str>,
  514. }
  515. impl OwnedMeasurement {
  516. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  517. OwnedMeasurement {
  518. key,
  519. timestamp: None,
  520. tags: new_map(n_tags),
  521. fields: new_map(n_fields),
  522. }
  523. }
  524. pub fn new(key: &'static str) -> Self {
  525. OwnedMeasurement::with_capacity(key, 4, 4)
  526. }
  527. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  528. self.tags.insert(key, value);
  529. self
  530. }
  531. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  532. self.fields.insert(key, value);
  533. self
  534. }
  535. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  536. self.timestamp = Some(timestamp);
  537. self
  538. }
  539. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  540. *self.tags.entry(key).or_insert(value) = value;
  541. self
  542. }
  543. }
  544. #[allow(unused_imports, unused_variables)]
  545. #[cfg(test)]
  546. mod tests {
  547. use super::*;
  548. use test::{black_box, Bencher};
  549. #[test]
  550. fn it_uses_the_new_tag_k_only_shortcut() {
  551. let tag_value = "one";
  552. let color = "red";
  553. let time = now();
  554. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  555. assert_eq!(m.tags.get("color"), Some(&"red"));
  556. assert_eq!(m.tags.get("tag_value"), Some(&"one"));
  557. assert_eq!(m.timestamp, Some(time));
  558. }
  559. #[test]
  560. fn it_uses_measure_macro_parenthesis_syntax() {
  561. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  562. assert_eq!(m.key, "test");
  563. assert_eq!(m.tags.get("a"), Some(&"b"));
  564. assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
  565. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  566. assert_eq!(m.timestamp, Some(1));
  567. }
  568. #[test]
  569. fn it_uses_measure_macro_on_a_self_attribute() {
  570. struct A {
  571. pub influx: InfluxWriter,
  572. }
  573. impl A {
  574. fn f(&self) {
  575. measure!(self.influx, test, t(color, "red"), i(n, 1));
  576. }
  577. }
  578. let a = A { influx: InfluxWriter::default() };
  579. a.f();
  580. }
  581. #[bench]
  582. fn influx_writer_send_basic(b: &mut Bencher) {
  583. let m = InfluxWriter::default();
  584. b.iter(|| {
  585. measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]);
  586. });
  587. }
  588. #[bench]
  589. fn influx_writer_send_price(b: &mut Bencher) {
  590. let m = InfluxWriter::default();
  591. b.iter(|| {
  592. measure!(m, test,
  593. tag[ticker; t!(xmr-btc).to_str()],
  594. tag[exchange; "plnx"],
  595. d128[bid; d128::zero()],
  596. d128[ask; d128::zero()],
  597. );
  598. });
  599. }
  600. #[test]
  601. fn it_checks_color_tag_error_in_non_doctest() {
  602. let (tx, rx) = channel();
  603. measure!(tx, test, tag[color;"red"], int[n;1]);
  604. let meas: OwnedMeasurement = rx.recv().unwrap();
  605. assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
  606. }
  607. #[test]
  608. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  609. let meas = measure!(@make_meas test_measurement,
  610. tag [ one => "a" ],
  611. tag [ two => "b" ],
  612. int [ three => 2 ],
  613. float [ four => 1.2345 ],
  614. string [ five => String::from("d") ],
  615. bool [ six => true ],
  616. int [ seven => { 1 + 2 } ],
  617. time [ 1 ]
  618. );
  619. assert_eq!(meas.key, "test_measurement");
  620. assert_eq!(meas.tags.get("one"), Some(&"a"));
  621. assert_eq!(meas.tags.get("two"), Some(&"b"));
  622. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  623. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  624. assert_eq!(meas.timestamp, Some(1));
  625. }
  626. #[test]
  627. fn it_uses_the_measure_macro() {
  628. let (tx, rx) = channel();
  629. measure!(tx, test_measurement,
  630. tag [ one => "a" ],
  631. tag [ two => "b" ],
  632. int [ three => 2 ],
  633. float [ four => 1.2345 ],
  634. string [ five => String::from("d") ],
  635. bool [ six => true ],
  636. int [ seven => { 1 + 2 } ],
  637. time [ 1 ]
  638. );
  639. thread::sleep(Duration::from_millis(10));
  640. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  641. assert_eq!(meas.key, "test_measurement");
  642. assert_eq!(meas.tags.get("one"), Some(&"a"));
  643. assert_eq!(meas.tags.get("two"), Some(&"b"));
  644. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  645. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  646. assert_eq!(meas.timestamp, Some(1));
  647. }
  648. #[test]
  649. fn it_uses_measure_macro_for_d128_and_uuid() {
  650. let (tx, rx) = channel();
  651. let u = Uuid::new_v4();
  652. let d = d128::zero();
  653. let t = now();
  654. measure!(tx, test_measurement,
  655. tag[one; "a"],
  656. d128[two; d],
  657. uuid[three; u],
  658. time[t]
  659. );
  660. thread::sleep(Duration::from_millis(10));
  661. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  662. assert_eq!(meas.key, "test_measurement");
  663. assert_eq!(meas.tags.get("one"), Some(&"a"));
  664. assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
  665. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
  666. assert_eq!(meas.timestamp, Some(t));
  667. }
  668. #[test]
  669. fn it_uses_the_measure_macro_alt_syntax() {
  670. let (tx, rx) = channel();
  671. measure!(tx, test_measurement,
  672. tag[one; "a"],
  673. tag[two; "b"],
  674. int[three; 2],
  675. float[four; 1.2345],
  676. string[five; String::from("d")],
  677. bool [ six => true ],
  678. int[seven; { 1 + 2 }],
  679. time[1]
  680. );
  681. thread::sleep(Duration::from_millis(10));
  682. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  683. assert_eq!(meas.key, "test_measurement");
  684. assert_eq!(meas.tags.get("one"), Some(&"a"));
  685. assert_eq!(meas.tags.get("two"), Some(&"b"));
  686. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  687. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  688. assert_eq!(meas.timestamp, Some(1));
  689. }
  690. #[test]
  691. fn it_checks_that_fields_are_separated_correctly() {
  692. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  693. assert_eq!(m.key, "test");
  694. assert_eq!(m.tags.get("a"), Some(&"one"));
  695. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  696. let mut buf = String::new();
  697. serialize_owned(&m, &mut buf);
  698. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  699. }
  700. #[test]
  701. fn try_to_break_measure_macro() {
  702. let (tx, _) = channel();
  703. measure!(tx, one, tag[x=>"y"], int[n;1]);
  704. measure!(tx, one, tag[x;"y"], int[n;1],);
  705. struct A {
  706. pub one: i32,
  707. pub two: i32,
  708. }
  709. struct B {
  710. pub a: A
  711. }
  712. let b = B { a: A { one: 1, two: 2 } };
  713. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  714. assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
  715. }
  716. #[bench]
  717. fn measure_macro_small(b: &mut Bencher) {
  718. let (tx, rx) = channel();
  719. let listener = thread::spawn(move || {
  720. loop { if rx.recv().is_err() { break } }
  721. });
  722. b.iter(|| {
  723. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  724. });
  725. }
  726. #[bench]
  727. fn measure_macro_medium(b: &mut Bencher) {
  728. let (tx, rx) = channel();
  729. let listener = thread::spawn(move || {
  730. loop { if rx.recv().is_err() { break } }
  731. });
  732. b.iter(|| {
  733. measure!(tx, test,
  734. tag[color; "red"],
  735. tag[mood => "playful"],
  736. tag [ ticker => "xmr_btc" ],
  737. float[ price => 1.2345 ],
  738. float[ amount => 56.323],
  739. int[n; 1],
  740. time[now()]
  741. );
  742. });
  743. }
  744. #[test]
  745. #[ignore]
  746. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  747. let ctx = zmq::Context::new();
  748. let socket = push(&ctx).unwrap();
  749. let (tx, rx) = channel();
  750. let w = writer(tx.clone());
  751. let mut buf = String::with_capacity(4096);
  752. let mut meas = Measurement::new("rust_test");
  753. meas.add_tag("a", "t");
  754. meas.add_field("c", Value::Float(1.23456));
  755. let now = now();
  756. meas.set_timestamp(now);
  757. serialize(&meas, &mut buf);
  758. socket.send_str(&buf, 0).unwrap();
  759. drop(w);
  760. }
  761. #[test]
  762. fn it_serializes_a_measurement_in_place() {
  763. let mut buf = String::with_capacity(4096);
  764. let mut meas = Measurement::new("rust_test");
  765. meas.add_tag("a", "b");
  766. meas.add_field("c", Value::Float(1.0));
  767. let now = now();
  768. meas.set_timestamp(now);
  769. serialize(&meas, &mut buf);
  770. let ans = format!("rust_test,a=b c=1 {}", now);
  771. assert_eq!(buf, ans);
  772. }
  773. #[test]
  774. fn it_serializes_a_hard_to_serialize_message() {
  775. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  776. let mut buf = String::new();
  777. let mut server_resp = String::new();
  778. let mut m = Measurement::new("rust_test");
  779. m.add_field("s", Value::String(&raw));
  780. let now = now();
  781. m.set_timestamp(now);
  782. serialize(&m, &mut buf);
  783. println!("{}", buf);
  784. buf.push_str("\n");
  785. let buf_copy = buf.clone();
  786. buf.push_str(&buf_copy);
  787. println!("{}", buf);
  788. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  789. let client = Client::new();
  790. match client.post(url.clone())
  791. .body(&buf)
  792. .send() {
  793. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  794. Ok(mut resp) => {
  795. resp.read_to_string(&mut server_resp).unwrap();
  796. panic!("{}", server_resp);
  797. }
  798. Err(why) => {
  799. panic!(why)
  800. }
  801. }
  802. }
  803. #[bench]
  804. fn serialize_owned_longer(b: &mut Bencher) {
  805. let mut buf = String::with_capacity(1024);
  806. let m =
  807. OwnedMeasurement::new("test")
  808. .add_tag("one", "a")
  809. .add_tag("two", "b")
  810. .add_tag("ticker", "xmr_btc")
  811. .add_tag("exchange", "plnx")
  812. .add_tag("side", "bid")
  813. .add_field("three", OwnedValue::Float(1.2345))
  814. .add_field("four", OwnedValue::Integer(57))
  815. .add_field("five", OwnedValue::Boolean(true))
  816. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  817. .set_timestamp(now());
  818. b.iter(|| {
  819. serialize_owned(&m, &mut buf);
  820. buf.clear()
  821. });
  822. }
  823. #[bench]
  824. fn serialize_owned_simple(b: &mut Bencher) {
  825. let mut buf = String::with_capacity(1024);
  826. let m =
  827. OwnedMeasurement::new("test")
  828. .add_tag("one", "a")
  829. .add_tag("two", "b")
  830. .add_field("three", OwnedValue::Float(1.2345))
  831. .add_field("four", OwnedValue::Integer(57))
  832. .set_timestamp(now());
  833. b.iter(|| {
  834. serialize_owned(&m, &mut buf);
  835. buf.clear()
  836. });
  837. }
  838. #[test]
  839. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  840. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  841. let mut buf = String::new();
  842. let mut server_resp = String::new();
  843. let m = OwnedMeasurement::new("rust_test")
  844. .add_field("s", OwnedValue::String(raw.to_string()))
  845. .set_timestamp(now());
  846. serialize_owned(&m, &mut buf);
  847. println!("{}", buf);
  848. buf.push_str("\n");
  849. let buf_copy = buf.clone();
  850. buf.push_str(&buf_copy);
  851. println!("{}", buf);
  852. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  853. let client = Client::new();
  854. match client.post(url.clone())
  855. .body(&buf)
  856. .send() {
  857. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  858. Ok(mut resp) => {
  859. resp.read_to_string(&mut server_resp).unwrap();
  860. panic!("{}", server_resp);
  861. }
  862. Err(why) => {
  863. panic!(why)
  864. }
  865. }
  866. }
  867. // macro_rules! make_measurement {
  868. // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  869. // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  870. // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  871. // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  872. // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
  873. // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
  874. // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  875. // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
  876. //
  877. // (@count_tags) => {0usize};
  878. // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  879. // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  880. //
  881. // (@count_fields) => {0usize};
  882. // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  883. // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  884. // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  885. //
  886. // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  887. // let n_tags = measure!(@count_tags $($t)*);
  888. // let n_fields = measure!(@count_fields $($t)*);
  889. // let mut meas =
  890. // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  891. // $(
  892. // measure!(@kv $t, meas, $($tail)*);
  893. // )*
  894. // //let _ = $m.send(meas);
  895. // meas
  896. // }};
  897. // }
  898. //
  899. // #[test]
  900. // fn it_checks_n_tags_is_correct() {
  901. // let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
  902. // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
  903. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
  904. // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
  905. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
  906. //
  907. // let m4 =
  908. // make_measurement!(tx, test,
  909. // tag[a;"b"],
  910. // tag[c;"d"],
  911. // int[n; 1],
  912. // tag[e;"f"],
  913. // float[x; 1.234],
  914. // tag[g;"h"],
  915. // time[1],
  916. // );
  917. // assert_eq!(m4.n_tags, 4);
  918. // assert_eq!(m4.n_fields, 2);
  919. // }
  920. }