You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1059 lines
36KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. use std::fs;
  8. use std::time::Duration;
  9. use std::hash::BuildHasherDefault;
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. #[allow(unused_imports)]
  17. use chrono::{DateTime, Utc};
  18. use sloggers::types::Severity;
  19. use ordermap::OrderMap;
  20. use fnv::FnvHasher;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use super::{nanos, file_logger, LOG_LEVEL};
  24. use warnings::Warning;
  25. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  26. const ZMQ_RCV_HWM: i32 = 0;
  27. const ZMQ_SND_HWM: i32 = 0;
  28. const BUFFER_SIZE: u16 = 80;
  29. #[cfg(not(any(test, feature = "test")))]
  30. const DB_NAME: &'static str = "mm2";
  31. #[cfg(any(test, feature = "test"))]
  32. const DB_NAME: &'static str = "mm2_test";
  33. #[cfg(all(not(feature = "harrison"), any(test, feature = "test", feature = "localhost")))]
  34. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  35. #[cfg(feature = "harrison")]
  36. const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  37. #[cfg(not(any(feature = "harrison", feature = "localhost", test, feature = "test")))]
  38. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  39. pub use super::{dur_nanos, dt_nanos};
  40. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  41. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  42. Map::with_capacity_and_hasher(capacity, Default::default())
  43. }
  44. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  45. ///
  46. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  47. /// values, as well as sends it with the `Sender`.
  48. ///
  49. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  50. /// measurement (see `tests` mod).
  51. ///
  52. /// # Examples
  53. ///
  54. /// ```
  55. /// #[macro_use] extern crate logging;
  56. /// extern crate decimal;
  57. ///
  58. /// use std::sync::mpsc::channel;
  59. /// use decimal::d128;
  60. /// use logging::influx::*;
  61. ///
  62. /// fn main() {
  63. /// let (tx, rx) = channel();
  64. ///
  65. /// // "shorthand" syntax
  66. ///
  67. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  68. ///
  69. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  70. ///
  71. /// assert_eq!(meas.key, "test");
  72. /// assert_eq!(meas.tags.get("color"), Some(&"red"));
  73. /// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
  74. ///
  75. /// // alternate syntax ...
  76. ///
  77. /// measure!(tx, test,
  78. /// tag [ one => "a" ],
  79. /// tag [ two => "b" ],
  80. /// int [ three => 2 ],
  81. /// float [ four => 1.2345 ],
  82. /// string [ five => String::from("d") ],
  83. /// bool [ six => true ],
  84. /// int [ seven => { 1 + 2 } ],
  85. /// time [ 1 ]
  86. /// );
  87. ///
  88. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  89. ///
  90. /// assert_eq!(meas.key, "test");
  91. /// assert_eq!(meas.tags.get("one"), Some(&"a"));
  92. /// assert_eq!(meas.tags.get("two"), Some(&"b"));
  93. /// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  94. /// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  95. /// assert_eq!(meas.timestamp, Some(1));
  96. ///
  97. /// // use the @make_meas flag to skip sending a measurement, instead merely
  98. /// // creating it.
  99. ///
  100. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  101. ///
  102. /// // each variant also has shorthand aliases
  103. ///
  104. /// let meas: OwnedMeasurement =
  105. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  106. /// }
  107. /// ```
  108. ///
  109. #[macro_export]
  110. macro_rules! measure {
  111. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  112. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  113. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  114. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  115. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  116. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  117. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  118. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  119. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  120. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  121. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  122. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  123. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  124. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  125. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  126. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  127. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  128. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  129. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  130. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  131. (@as_expr $e:expr) => {$e};
  132. (@count_tags) => {0usize};
  133. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  134. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  135. (@count_fields) => {0usize};
  136. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  137. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  138. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  139. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  140. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  141. };
  142. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  143. let n_tags = measure!(@count_tags $($t)*);
  144. let n_fields = measure!(@count_fields $($t)*);
  145. let mut meas =
  146. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  147. $(
  148. measure!(@kv $t, meas, $($tail)*);
  149. )*
  150. meas
  151. }};
  152. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  153. measure!($m, $name, $($t [ $($tail)* ] ),+)
  154. };
  155. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  156. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  157. let _ = $m.send(measurement);
  158. }};
  159. }
  160. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  161. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  162. /// measurements have accumulated.
  163. ///
  164. #[derive(Debug)]
  165. pub struct InfluxWriter {
  166. host: &'static str,
  167. db: &'static str,
  168. tx: Sender<Option<OwnedMeasurement>>,
  169. thread: Option<Arc<thread::JoinHandle<()>>>,
  170. }
  171. impl Default for InfluxWriter {
  172. fn default() -> Self {
  173. if cfg!(any(test, feature = "test")) {
  174. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 1)
  175. } else if cfg!(feature = "localhost") {
  176. InfluxWriter::new("localhost", "mm2", "/home/jstrong/src/logging/var/log/influx-default.log", BUFFER_SIZE)
  177. } else {
  178. InfluxWriter::new("washington.0ptimus.internal", "mm2", "var/influx-default.log", BUFFER_SIZE)
  179. }
  180. }
  181. }
  182. impl Clone for InfluxWriter {
  183. fn clone(&self) -> Self {
  184. debug_assert!(self.thread.is_some());
  185. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  186. InfluxWriter {
  187. host: self.host,
  188. db: self.db,
  189. tx: self.tx.clone(),
  190. thread,
  191. }
  192. }
  193. }
  194. impl InfluxWriter {
  195. /// Sends the `OwnedMeasurement` to the serialization thread.
  196. ///
  197. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  198. self.tx.send(Some(m))
  199. }
  200. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  201. self.tx.clone()
  202. }
  203. #[allow(unused_assignments)]
  204. pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u16) -> Self {
  205. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  206. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  207. let buffer_size = if cfg!(feature = "trace") { 0u16 } else { buffer_size };
  208. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  209. debug!(logger, "initializing url";
  210. "DB_HOST" => host,
  211. "DB_NAME" => db);
  212. let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
  213. let client = Client::new();
  214. debug!(logger, "initializing buffers");
  215. //let mut meas_buf = String::with_capacity(32 * 32 * 32);
  216. let mut buf = String::with_capacity(32 * 32 * 32);
  217. let mut count = 0;
  218. let send = |buf: &str| {
  219. let resp = client.post(url.clone())
  220. .body(buf)
  221. .send();
  222. match resp {
  223. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  224. debug!(logger, "server responded ok: 204 NoContent");
  225. }
  226. Ok(mut resp) => {
  227. let mut server_resp = String::with_capacity(1024);
  228. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  229. error!(logger, "influx server error";
  230. "status" => resp.status.to_string(),
  231. "body" => server_resp);
  232. }
  233. Err(why) => {
  234. error!(logger, "http request failed: {:?}", why);
  235. }
  236. }
  237. };
  238. let next2 = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
  239. // trace!(logger, "appending serialized measurement to buffer";
  240. // "prev" => prev,
  241. // "buf.len()" => buf.len());
  242. match prev {
  243. 0 if buffer_size > 0 => {
  244. serialize_owned(m, buf);
  245. 1
  246. }
  247. n if n < buffer_size => {
  248. buf.push_str("\n");
  249. serialize_owned(m, buf);
  250. n + 1
  251. }
  252. n => {
  253. buf.push_str("\n");
  254. serialize_owned(m, buf);
  255. //if s.len() > 0 { buf.push_str(s); }
  256. debug!(logger, "sending buffer to influx"; "len" => n);
  257. send(buf);
  258. buf.clear();
  259. 0
  260. }
  261. }
  262. };
  263. loop {
  264. //match rx.try_recv() {
  265. match rx.recv() {
  266. Ok(Some(mut meas)) => {
  267. if meas.timestamp.is_none() {
  268. meas.timestamp = Some(now());
  269. }
  270. trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
  271. count = next2(count, &meas, &mut buf);
  272. }
  273. Ok(None) => {
  274. if buf.len() > 0 {
  275. debug!(logger, "sending buffer to influx"; "len" => count);
  276. send(&buf)
  277. }
  278. break
  279. }
  280. _ => {
  281. //#[cfg(feature = "no-thrash")]
  282. thread::sleep(Duration::new(0, 1))
  283. }
  284. }
  285. }
  286. }).unwrap();
  287. InfluxWriter {
  288. host,
  289. db,
  290. tx,
  291. thread: Some(Arc::new(thread))
  292. }
  293. }
  294. }
  295. impl Drop for InfluxWriter {
  296. fn drop(&mut self) {
  297. if let Some(arc) = self.thread.take() {
  298. if let Ok(thread) = Arc::try_unwrap(arc) {
  299. let _ = self.tx.send(None);
  300. let _ = thread.join();
  301. }
  302. }
  303. }
  304. }
  305. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  306. let socket = ctx.socket(zmq::PULL)?;
  307. socket.bind(WRITER_ADDR)?;
  308. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  309. Ok(socket)
  310. }
  311. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  312. let socket = ctx.socket(zmq::PUSH)?;
  313. socket.connect(WRITER_ADDR)?;
  314. socket.set_sndhwm(ZMQ_SND_HWM)?;
  315. Ok(socket)
  316. }
  317. /// This removes offending things rather than escaping them.
  318. ///
  319. fn escape_tag(s: &str) -> String {
  320. s.replace(" ", "")
  321. .replace(",", "")
  322. .replace("\"", "")
  323. }
  324. fn escape(s: &str) -> String {
  325. s.replace(" ", "\\ ")
  326. .replace(",", "\\,")
  327. }
  328. fn as_string(s: &str) -> String {
  329. // the second replace removes double escapes
  330. //
  331. format!("\"{}\"", s.replace("\"", "\\\"")
  332. .replace(r#"\\""#, r#"\""#))
  333. }
  334. #[test]
  335. fn it_checks_as_string_does_not_double_escape() {
  336. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  337. let escaped = as_string(&raw);
  338. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  339. }
  340. fn as_integer(i: &i64) -> String {
  341. format!("{}i", i)
  342. }
  343. fn as_float(f: &f64) -> String {
  344. f.to_string()
  345. }
  346. fn as_boolean(b: &bool) -> &str {
  347. if *b { "t" } else { "f" }
  348. }
  349. pub fn now() -> i64 {
  350. nanos(Utc::now()) as i64
  351. }
  352. /// Serialize the measurement into influx line protocol
  353. /// and append to the buffer.
  354. ///
  355. /// # Examples
  356. ///
  357. /// ```
  358. /// extern crate influent;
  359. /// extern crate logging;
  360. ///
  361. /// use influent::measurement::{Measurement, Value};
  362. /// use std::string::String;
  363. /// use logging::influx::serialize;
  364. ///
  365. /// fn main() {
  366. /// let mut buf = String::new();
  367. /// let mut m = Measurement::new("test");
  368. /// m.add_field("x", Value::Integer(1));
  369. /// serialize(&m, &mut buf);
  370. /// }
  371. ///
  372. /// ```
  373. ///
  374. pub fn serialize(measurement: &Measurement, line: &mut String) {
  375. line.push_str(&escape(measurement.key));
  376. for (tag, value) in measurement.tags.iter() {
  377. line.push_str(",");
  378. line.push_str(&escape(tag));
  379. line.push_str("=");
  380. line.push_str(&escape(value));
  381. }
  382. let mut was_spaced = false;
  383. for (field, value) in measurement.fields.iter() {
  384. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  385. line.push_str(&escape(field));
  386. line.push_str("=");
  387. match value {
  388. &Value::String(ref s) => line.push_str(&as_string(s)),
  389. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  390. &Value::Float(ref f) => line.push_str(&as_float(f)),
  391. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  392. };
  393. }
  394. match measurement.timestamp {
  395. Some(t) => {
  396. line.push_str(" ");
  397. line.push_str(&t.to_string());
  398. }
  399. _ => {}
  400. }
  401. }
  402. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  403. ///
  404. /// The serialized measurement is appended to the end of the string without
  405. /// any regard for what exited in it previously.
  406. ///
  407. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  408. line.push_str(&escape_tag(measurement.key));
  409. let add_tag = |line: &mut String, key: &str, value: &str| {
  410. line.push_str(",");
  411. line.push_str(&escape_tag(key));
  412. line.push_str("=");
  413. line.push_str(&escape(value));
  414. };
  415. for (key, value) in measurement.tags.iter() {
  416. add_tag(line, key, value);
  417. }
  418. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  419. if is_first { line.push_str(" "); } else { line.push_str(","); }
  420. line.push_str(&escape_tag(key));
  421. line.push_str("=");
  422. match *value {
  423. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  424. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  425. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  426. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  427. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  428. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  429. };
  430. };
  431. let mut fields = measurement.fields.iter();
  432. // first time separate from tags with space
  433. //
  434. fields.next().map(|kv| {
  435. add_field(line, kv.0, kv.1, true);
  436. });
  437. // then seperate the rest w/ comma
  438. //
  439. for kv in fields {
  440. add_field(line, kv.0, kv.1, false);
  441. }
  442. if let Some(t) = measurement.timestamp {
  443. line.push_str(" ");
  444. line.push_str(&t.to_string());
  445. }
  446. }
  447. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  448. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  449. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  450. let _ = fs::create_dir("/tmp/mm");
  451. let ctx = zmq::Context::new();
  452. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  453. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  454. let client = Client::new();
  455. let mut buf = String::with_capacity(4096);
  456. let mut server_resp = String::with_capacity(4096);
  457. let mut count = 0;
  458. loop {
  459. if let Ok(bytes) = socket.recv_bytes(0) {
  460. if let Ok(msg) = String::from_utf8(bytes) {
  461. count = match count {
  462. 0 => {
  463. buf.push_str(&msg);
  464. 1
  465. }
  466. n @ 1...40 => {
  467. buf.push_str("\n");
  468. buf.push_str(&msg);
  469. n + 1
  470. }
  471. _ => {
  472. buf.push_str("\n");
  473. buf.push_str(&msg);
  474. match client.post(url.clone())
  475. .body(&buf)
  476. .send() {
  477. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  478. Ok(mut resp) => {
  479. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  480. let _ = warnings.send(
  481. Warning::Error(
  482. format!("Influx server: {}", server_resp)));
  483. server_resp.clear();
  484. }
  485. Err(why) => {
  486. let _ = warnings.send(
  487. Warning::Error(
  488. format!("Influx write error: {}", why)));
  489. }
  490. }
  491. buf.clear();
  492. 0
  493. }
  494. }
  495. }
  496. }
  497. }
  498. }).unwrap()
  499. }
  500. #[derive(Debug, Clone, PartialEq)]
  501. pub enum OwnedValue {
  502. String(String),
  503. Float(f64),
  504. Integer(i64),
  505. Boolean(bool),
  506. D128(d128),
  507. Uuid(Uuid),
  508. }
  509. #[derive(Clone, Debug)]
  510. pub struct OwnedMeasurement {
  511. pub key: &'static str,
  512. pub timestamp: Option<i64>,
  513. pub fields: Map<&'static str, OwnedValue>,
  514. pub tags: Map<&'static str, &'static str>,
  515. }
  516. impl OwnedMeasurement {
  517. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  518. OwnedMeasurement {
  519. key,
  520. timestamp: None,
  521. tags: new_map(n_tags),
  522. fields: new_map(n_fields),
  523. }
  524. }
  525. pub fn new(key: &'static str) -> Self {
  526. OwnedMeasurement::with_capacity(key, 4, 4)
  527. }
  528. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  529. self.tags.insert(key, value);
  530. self
  531. }
  532. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  533. self.fields.insert(key, value);
  534. self
  535. }
  536. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  537. self.timestamp = Some(timestamp);
  538. self
  539. }
  540. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  541. *self.tags.entry(key).or_insert(value) = value;
  542. self
  543. }
  544. }
  545. #[allow(unused_imports, unused_variables)]
  546. #[cfg(test)]
  547. mod tests {
  548. use super::*;
  549. use test::{black_box, Bencher};
  550. #[test]
  551. fn it_uses_the_new_tag_k_only_shortcut() {
  552. let tag_value = "one";
  553. let color = "red";
  554. let time = now();
  555. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  556. assert_eq!(m.tags.get("color"), Some(&"red"));
  557. assert_eq!(m.tags.get("tag_value"), Some(&"one"));
  558. assert_eq!(m.timestamp, Some(time));
  559. }
  560. #[test]
  561. fn it_uses_measure_macro_parenthesis_syntax() {
  562. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  563. assert_eq!(m.key, "test");
  564. assert_eq!(m.tags.get("a"), Some(&"b"));
  565. assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
  566. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  567. assert_eq!(m.timestamp, Some(1));
  568. }
  569. #[test]
  570. fn it_uses_measure_macro_on_a_self_attribute() {
  571. struct A {
  572. pub influx: InfluxWriter,
  573. }
  574. impl A {
  575. fn f(&self) {
  576. measure!(self.influx, test, t(color, "red"), i(n, 1));
  577. }
  578. }
  579. let a = A { influx: InfluxWriter::default() };
  580. a.f();
  581. }
  582. #[test]
  583. fn it_clones_an_influx_writer_to_check_both_drop() {
  584. let influx = InfluxWriter::default();
  585. measure!(influx, drop_test, i(a, 1), i(b, 2));
  586. {
  587. let influx = influx.clone();
  588. thread::spawn(move || {
  589. measure!(influx, drop_test, i(a, 3), i(b, 4));
  590. });
  591. }
  592. }
  593. #[bench]
  594. fn influx_writer_send_basic(b: &mut Bencher) {
  595. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  596. b.iter(|| {
  597. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  598. });
  599. }
  600. #[bench]
  601. fn influx_writer_send_price(b: &mut Bencher) {
  602. let m = InfluxWriter::default();
  603. b.iter(|| {
  604. measure!(m, test,
  605. tag[ticker; t!(xmr-btc).to_str()],
  606. tag[exchange; "plnx"],
  607. d128[bid; d128::zero()],
  608. d128[ask; d128::zero()],
  609. );
  610. });
  611. }
  612. #[test]
  613. fn it_checks_color_tag_error_in_non_doctest() {
  614. let (tx, rx) = channel();
  615. measure!(tx, test, tag[color;"red"], int[n;1]);
  616. let meas: OwnedMeasurement = rx.recv().unwrap();
  617. assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
  618. }
  619. #[test]
  620. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  621. let meas = measure!(@make_meas test_measurement,
  622. tag [ one => "a" ],
  623. tag [ two => "b" ],
  624. int [ three => 2 ],
  625. float [ four => 1.2345 ],
  626. string [ five => String::from("d") ],
  627. bool [ six => true ],
  628. int [ seven => { 1 + 2 } ],
  629. time [ 1 ]
  630. );
  631. assert_eq!(meas.key, "test_measurement");
  632. assert_eq!(meas.tags.get("one"), Some(&"a"));
  633. assert_eq!(meas.tags.get("two"), Some(&"b"));
  634. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  635. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  636. assert_eq!(meas.timestamp, Some(1));
  637. }
  638. #[test]
  639. fn it_uses_the_measure_macro() {
  640. let (tx, rx) = channel();
  641. measure!(tx, test_measurement,
  642. tag [ one => "a" ],
  643. tag [ two => "b" ],
  644. int [ three => 2 ],
  645. float [ four => 1.2345 ],
  646. string [ five => String::from("d") ],
  647. bool [ six => true ],
  648. int [ seven => { 1 + 2 } ],
  649. time [ 1 ]
  650. );
  651. thread::sleep(Duration::from_millis(10));
  652. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  653. assert_eq!(meas.key, "test_measurement");
  654. assert_eq!(meas.tags.get("one"), Some(&"a"));
  655. assert_eq!(meas.tags.get("two"), Some(&"b"));
  656. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  657. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  658. assert_eq!(meas.timestamp, Some(1));
  659. }
  660. #[test]
  661. fn it_uses_measure_macro_for_d128_and_uuid() {
  662. let (tx, rx) = channel();
  663. let u = Uuid::new_v4();
  664. let d = d128::zero();
  665. let t = now();
  666. measure!(tx, test_measurement,
  667. tag[one; "a"],
  668. d128[two; d],
  669. uuid[three; u],
  670. time[t]
  671. );
  672. thread::sleep(Duration::from_millis(10));
  673. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  674. assert_eq!(meas.key, "test_measurement");
  675. assert_eq!(meas.tags.get("one"), Some(&"a"));
  676. assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
  677. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
  678. assert_eq!(meas.timestamp, Some(t));
  679. }
  680. #[test]
  681. fn it_uses_the_measure_macro_alt_syntax() {
  682. let (tx, rx) = channel();
  683. measure!(tx, test_measurement,
  684. tag[one; "a"],
  685. tag[two; "b"],
  686. int[three; 2],
  687. float[four; 1.2345],
  688. string[five; String::from("d")],
  689. bool [ six => true ],
  690. int[seven; { 1 + 2 }],
  691. time[1]
  692. );
  693. thread::sleep(Duration::from_millis(10));
  694. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  695. assert_eq!(meas.key, "test_measurement");
  696. assert_eq!(meas.tags.get("one"), Some(&"a"));
  697. assert_eq!(meas.tags.get("two"), Some(&"b"));
  698. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  699. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  700. assert_eq!(meas.timestamp, Some(1));
  701. }
  702. #[test]
  703. fn it_checks_that_fields_are_separated_correctly() {
  704. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  705. assert_eq!(m.key, "test");
  706. assert_eq!(m.tags.get("a"), Some(&"one"));
  707. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  708. let mut buf = String::new();
  709. serialize_owned(&m, &mut buf);
  710. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  711. }
  712. #[test]
  713. fn try_to_break_measure_macro() {
  714. let (tx, _) = channel();
  715. measure!(tx, one, tag[x=>"y"], int[n;1]);
  716. measure!(tx, one, tag[x;"y"], int[n;1],);
  717. struct A {
  718. pub one: i32,
  719. pub two: i32,
  720. }
  721. struct B {
  722. pub a: A
  723. }
  724. let b = B { a: A { one: 1, two: 2 } };
  725. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  726. assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
  727. }
  728. #[bench]
  729. fn measure_macro_small(b: &mut Bencher) {
  730. let (tx, rx) = channel();
  731. let listener = thread::spawn(move || {
  732. loop { if rx.recv().is_err() { break } }
  733. });
  734. b.iter(|| {
  735. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  736. });
  737. }
  738. #[bench]
  739. fn measure_macro_medium(b: &mut Bencher) {
  740. let (tx, rx) = channel();
  741. let listener = thread::spawn(move || {
  742. loop { if rx.recv().is_err() { break } }
  743. });
  744. b.iter(|| {
  745. measure!(tx, test,
  746. tag[color; "red"],
  747. tag[mood => "playful"],
  748. tag [ ticker => "xmr_btc" ],
  749. float[ price => 1.2345 ],
  750. float[ amount => 56.323],
  751. int[n; 1],
  752. time[now()]
  753. );
  754. });
  755. }
  756. #[test]
  757. #[ignore]
  758. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  759. let ctx = zmq::Context::new();
  760. let socket = push(&ctx).unwrap();
  761. let (tx, rx) = channel();
  762. let w = writer(tx.clone());
  763. let mut buf = String::with_capacity(4096);
  764. let mut meas = Measurement::new("rust_test");
  765. meas.add_tag("a", "t");
  766. meas.add_field("c", Value::Float(1.23456));
  767. let now = now();
  768. meas.set_timestamp(now);
  769. serialize(&meas, &mut buf);
  770. socket.send_str(&buf, 0).unwrap();
  771. drop(w);
  772. }
  773. #[test]
  774. fn it_serializes_a_measurement_in_place() {
  775. let mut buf = String::with_capacity(4096);
  776. let mut meas = Measurement::new("rust_test");
  777. meas.add_tag("a", "b");
  778. meas.add_field("c", Value::Float(1.0));
  779. let now = now();
  780. meas.set_timestamp(now);
  781. serialize(&meas, &mut buf);
  782. let ans = format!("rust_test,a=b c=1 {}", now);
  783. assert_eq!(buf, ans);
  784. }
  785. #[test]
  786. fn it_serializes_a_hard_to_serialize_message() {
  787. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  788. let mut buf = String::new();
  789. let mut server_resp = String::new();
  790. let mut m = Measurement::new("rust_test");
  791. m.add_field("s", Value::String(&raw));
  792. let now = now();
  793. m.set_timestamp(now);
  794. serialize(&m, &mut buf);
  795. println!("{}", buf);
  796. buf.push_str("\n");
  797. let buf_copy = buf.clone();
  798. buf.push_str(&buf_copy);
  799. println!("{}", buf);
  800. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  801. let client = Client::new();
  802. match client.post(url.clone())
  803. .body(&buf)
  804. .send() {
  805. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  806. Ok(mut resp) => {
  807. resp.read_to_string(&mut server_resp).unwrap();
  808. panic!("{}", server_resp);
  809. }
  810. Err(why) => {
  811. panic!(why)
  812. }
  813. }
  814. }
  815. #[bench]
  816. fn serialize_owned_longer(b: &mut Bencher) {
  817. let mut buf = String::with_capacity(1024);
  818. let m =
  819. OwnedMeasurement::new("test")
  820. .add_tag("one", "a")
  821. .add_tag("two", "b")
  822. .add_tag("ticker", "xmr_btc")
  823. .add_tag("exchange", "plnx")
  824. .add_tag("side", "bid")
  825. .add_field("three", OwnedValue::Float(1.2345))
  826. .add_field("four", OwnedValue::Integer(57))
  827. .add_field("five", OwnedValue::Boolean(true))
  828. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  829. .set_timestamp(now());
  830. b.iter(|| {
  831. serialize_owned(&m, &mut buf);
  832. buf.clear()
  833. });
  834. }
  835. #[bench]
  836. fn serialize_owned_simple(b: &mut Bencher) {
  837. let mut buf = String::with_capacity(1024);
  838. let m =
  839. OwnedMeasurement::new("test")
  840. .add_tag("one", "a")
  841. .add_tag("two", "b")
  842. .add_field("three", OwnedValue::Float(1.2345))
  843. .add_field("four", OwnedValue::Integer(57))
  844. .set_timestamp(now());
  845. b.iter(|| {
  846. serialize_owned(&m, &mut buf);
  847. buf.clear()
  848. });
  849. }
  850. #[test]
  851. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  852. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  853. let mut buf = String::new();
  854. let mut server_resp = String::new();
  855. let m = OwnedMeasurement::new("rust_test")
  856. .add_field("s", OwnedValue::String(raw.to_string()))
  857. .set_timestamp(now());
  858. serialize_owned(&m, &mut buf);
  859. println!("{}", buf);
  860. buf.push_str("\n");
  861. let buf_copy = buf.clone();
  862. buf.push_str(&buf_copy);
  863. println!("{}", buf);
  864. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  865. let client = Client::new();
  866. match client.post(url.clone())
  867. .body(&buf)
  868. .send() {
  869. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  870. Ok(mut resp) => {
  871. resp.read_to_string(&mut server_resp).unwrap();
  872. panic!("{}", server_resp);
  873. }
  874. Err(why) => {
  875. panic!(why)
  876. }
  877. }
  878. }
  879. // macro_rules! make_measurement {
  880. // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  881. // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  882. // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  883. // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  884. // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
  885. // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
  886. // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  887. // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
  888. //
  889. // (@count_tags) => {0usize};
  890. // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  891. // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  892. //
  893. // (@count_fields) => {0usize};
  894. // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  895. // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  896. // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  897. //
  898. // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  899. // let n_tags = measure!(@count_tags $($t)*);
  900. // let n_fields = measure!(@count_fields $($t)*);
  901. // let mut meas =
  902. // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  903. // $(
  904. // measure!(@kv $t, meas, $($tail)*);
  905. // )*
  906. // //let _ = $m.send(meas);
  907. // meas
  908. // }};
  909. // }
  910. //
  911. // #[test]
  912. // fn it_checks_n_tags_is_correct() {
  913. // let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
  914. // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
  915. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
  916. // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
  917. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
  918. //
  919. // let m4 =
  920. // make_measurement!(tx, test,
  921. // tag[a;"b"],
  922. // tag[c;"d"],
  923. // int[n; 1],
  924. // tag[e;"f"],
  925. // float[x; 1.234],
  926. // tag[g;"h"],
  927. // time[1],
  928. // );
  929. // assert_eq!(m4.n_tags, 4);
  930. // assert_eq!(m4.n_fields, 2);
  931. // }
  932. }