You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1055 lines
36KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. use std::fs;
  8. use std::time::Duration;
  9. use std::hash::BuildHasherDefault;
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. #[allow(unused_imports)]
  17. use chrono::{DateTime, Utc};
  18. use sloggers::types::Severity;
  19. use ordermap::OrderMap;
  20. use fnv::FnvHasher;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use super::{nanos, file_logger, LOG_LEVEL};
  24. use warnings::Warning;
  25. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  26. const ZMQ_RCV_HWM: i32 = 0;
  27. const ZMQ_SND_HWM: i32 = 0;
  28. const BUFFER_SIZE: u16 = 80;
  29. #[cfg(not(any(test, feature = "test")))]
  30. const DB_NAME: &'static str = "mm2";
  31. #[cfg(any(test, feature = "test"))]
  32. const DB_NAME: &'static str = "mm2_test";
  33. #[cfg(all(not(feature = "harrison"), any(test, feature = "test", feature = "localhost")))]
  34. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  35. #[cfg(feature = "harrison")]
  36. const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  37. #[cfg(not(any(feature = "harrison", feature = "localhost", test, feature = "test")))]
  38. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  39. pub use super::{dur_nanos, dt_nanos};
  40. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  41. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  42. Map::with_capacity_and_hasher(capacity, Default::default())
  43. }
  44. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  45. ///
  46. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  47. /// values, as well as sends it with the `Sender`.
  48. ///
  49. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  50. /// measurement (see `tests` mod).
  51. ///
  52. /// # Examples
  53. ///
  54. /// ```
  55. /// #[macro_use] extern crate logging;
  56. /// extern crate decimal;
  57. ///
  58. /// use std::sync::mpsc::channel;
  59. /// use decimal::d128;
  60. /// use logging::influx::*;
  61. ///
  62. /// fn main() {
  63. /// let (tx, rx) = channel();
  64. ///
  65. /// // "shorthand" syntax
  66. ///
  67. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  68. ///
  69. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  70. ///
  71. /// assert_eq!(meas.key, "test");
  72. /// assert_eq!(meas.tags.get("color"), Some(&"red"));
  73. /// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
  74. ///
  75. /// // alternate syntax ...
  76. ///
  77. /// measure!(tx, test,
  78. /// tag [ one => "a" ],
  79. /// tag [ two => "b" ],
  80. /// int [ three => 2 ],
  81. /// float [ four => 1.2345 ],
  82. /// string [ five => String::from("d") ],
  83. /// bool [ six => true ],
  84. /// int [ seven => { 1 + 2 } ],
  85. /// time [ 1 ]
  86. /// );
  87. ///
  88. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  89. ///
  90. /// assert_eq!(meas.key, "test");
  91. /// assert_eq!(meas.tags.get("one"), Some(&"a"));
  92. /// assert_eq!(meas.tags.get("two"), Some(&"b"));
  93. /// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  94. /// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  95. /// assert_eq!(meas.timestamp, Some(1));
  96. ///
  97. /// // use the @make_meas flag to skip sending a measurement, instead merely
  98. /// // creating it.
  99. ///
  100. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  101. ///
  102. /// // each variant also has shorthand aliases
  103. ///
  104. /// let meas: OwnedMeasurement =
  105. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  106. /// }
  107. /// ```
  108. ///
  109. #[macro_export]
  110. macro_rules! measure {
  111. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  112. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  113. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  114. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  115. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  116. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  117. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  118. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  119. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  120. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  121. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  122. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  123. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  124. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  125. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  126. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  127. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  128. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  129. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  130. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  131. (@as_expr $e:expr) => {$e};
  132. (@count_tags) => {0usize};
  133. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  134. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  135. (@count_fields) => {0usize};
  136. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  137. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  138. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  139. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  140. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  141. };
  142. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  143. let n_tags = measure!(@count_tags $($t)*);
  144. let n_fields = measure!(@count_fields $($t)*);
  145. let mut meas =
  146. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  147. $(
  148. measure!(@kv $t, meas, $($tail)*);
  149. )*
  150. meas
  151. }};
  152. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  153. measure!($m, $name, $($t [ $($tail)* ] ),+)
  154. };
  155. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  156. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  157. let _ = $m.send(measurement);
  158. }};
  159. }
  160. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  161. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  162. /// measurements have accumulated.
  163. ///
  164. #[derive(Debug)]
  165. pub struct InfluxWriter {
  166. host: &'static str,
  167. db: &'static str,
  168. tx: Sender<Option<OwnedMeasurement>>,
  169. thread: Option<Arc<thread::JoinHandle<()>>>,
  170. }
  171. impl Default for InfluxWriter {
  172. fn default() -> Self {
  173. if cfg!(any(test, feature = "test")) {
  174. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 1)
  175. } else if cfg!(feature = "localhost") {
  176. InfluxWriter::new("localhost", "mm2", "/home/jstrong/src/logging/var/log/influx-default.log", BUFFER_SIZE)
  177. } else {
  178. InfluxWriter::new("washington.0ptimus.internal", "mm2", "var/influx-default.log", BUFFER_SIZE)
  179. }
  180. }
  181. }
  182. impl Clone for InfluxWriter {
  183. fn clone(&self) -> Self {
  184. debug_assert!(self.thread.is_some());
  185. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  186. InfluxWriter {
  187. host: self.host,
  188. db: self.db,
  189. tx: self.tx.clone(),
  190. thread,
  191. }
  192. }
  193. }
  194. impl InfluxWriter {
  195. /// Sends the `OwnedMeasurement` to the serialization thread.
  196. ///
  197. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  198. self.tx.send(Some(m))
  199. }
  200. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  201. self.tx.clone()
  202. }
  203. #[allow(unused_assignments)]
  204. pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u16) -> Self {
  205. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  206. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  207. let buffer_size = if cfg!(feature = "trace") { 0u16 } else { buffer_size };
  208. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  209. debug!(logger, "initializing url";
  210. "DB_HOST" => host,
  211. "DB_NAME" => db);
  212. let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
  213. let client = Client::new();
  214. debug!(logger, "initializing buffers");
  215. //let mut meas_buf = String::with_capacity(32 * 32 * 32);
  216. let mut buf = String::with_capacity(32 * 32 * 32);
  217. let mut count = 0;
  218. let send = |buf: &str| {
  219. let resp = client.post(url.clone())
  220. .body(buf)
  221. .send();
  222. match resp {
  223. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  224. debug!(logger, "server responded ok: 204 NoContent");
  225. }
  226. Ok(mut resp) => {
  227. let mut server_resp = String::with_capacity(1024);
  228. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  229. error!(logger, "influx server error";
  230. "status" => resp.status.to_string(),
  231. "body" => server_resp);
  232. }
  233. Err(why) => {
  234. error!(logger, "http request failed: {:?}", why);
  235. }
  236. }
  237. };
  238. let next2 = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
  239. // trace!(logger, "appending serialized measurement to buffer";
  240. // "prev" => prev,
  241. // "buf.len()" => buf.len());
  242. match prev {
  243. 0 if buffer_size > 0 => {
  244. serialize_owned(m, buf);
  245. 1
  246. }
  247. n if n < buffer_size => {
  248. buf.push_str("\n");
  249. serialize_owned(m, buf);
  250. n + 1
  251. }
  252. n => {
  253. buf.push_str("\n");
  254. serialize_owned(m, buf);
  255. //if s.len() > 0 { buf.push_str(s); }
  256. debug!(logger, "sending buffer to influx"; "len" => n);
  257. send(buf);
  258. buf.clear();
  259. 0
  260. }
  261. }
  262. };
  263. loop {
  264. match rx.try_recv() {
  265. Ok(Some(mut meas)) => {
  266. if meas.timestamp.is_none() {
  267. meas.timestamp = Some(now());
  268. }
  269. trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
  270. count = next2(count, &meas, &mut buf);
  271. }
  272. Ok(None) => {
  273. if buf.len() > 0 { send(&buf) }
  274. break
  275. }
  276. _ => {
  277. #[cfg(feature = "no-thrash")]
  278. thread::sleep(Duration::new(0, 1))
  279. }
  280. }
  281. }
  282. }).unwrap();
  283. InfluxWriter {
  284. host,
  285. db,
  286. tx,
  287. thread: Some(Arc::new(thread))
  288. }
  289. }
  290. }
  291. impl Drop for InfluxWriter {
  292. fn drop(&mut self) {
  293. if let Some(arc) = self.thread.take() {
  294. if let Ok(thread) = Arc::try_unwrap(arc) {
  295. let _ = self.tx.send(None);
  296. let _ = thread.join();
  297. }
  298. }
  299. }
  300. }
  301. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  302. let socket = ctx.socket(zmq::PULL)?;
  303. socket.bind(WRITER_ADDR)?;
  304. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  305. Ok(socket)
  306. }
  307. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  308. let socket = ctx.socket(zmq::PUSH)?;
  309. socket.connect(WRITER_ADDR)?;
  310. socket.set_sndhwm(ZMQ_SND_HWM)?;
  311. Ok(socket)
  312. }
  313. /// This removes offending things rather than escaping them.
  314. ///
  315. fn escape_tag(s: &str) -> String {
  316. s.replace(" ", "")
  317. .replace(",", "")
  318. .replace("\"", "")
  319. }
  320. fn escape(s: &str) -> String {
  321. s.replace(" ", "\\ ")
  322. .replace(",", "\\,")
  323. }
  324. fn as_string(s: &str) -> String {
  325. // the second replace removes double escapes
  326. //
  327. format!("\"{}\"", s.replace("\"", "\\\"")
  328. .replace(r#"\\""#, r#"\""#))
  329. }
  330. #[test]
  331. fn it_checks_as_string_does_not_double_escape() {
  332. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  333. let escaped = as_string(&raw);
  334. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  335. }
  336. fn as_integer(i: &i64) -> String {
  337. format!("{}i", i)
  338. }
  339. fn as_float(f: &f64) -> String {
  340. f.to_string()
  341. }
  342. fn as_boolean(b: &bool) -> &str {
  343. if *b { "t" } else { "f" }
  344. }
  345. pub fn now() -> i64 {
  346. nanos(Utc::now()) as i64
  347. }
  348. /// Serialize the measurement into influx line protocol
  349. /// and append to the buffer.
  350. ///
  351. /// # Examples
  352. ///
  353. /// ```
  354. /// extern crate influent;
  355. /// extern crate logging;
  356. ///
  357. /// use influent::measurement::{Measurement, Value};
  358. /// use std::string::String;
  359. /// use logging::influx::serialize;
  360. ///
  361. /// fn main() {
  362. /// let mut buf = String::new();
  363. /// let mut m = Measurement::new("test");
  364. /// m.add_field("x", Value::Integer(1));
  365. /// serialize(&m, &mut buf);
  366. /// }
  367. ///
  368. /// ```
  369. ///
  370. pub fn serialize(measurement: &Measurement, line: &mut String) {
  371. line.push_str(&escape(measurement.key));
  372. for (tag, value) in measurement.tags.iter() {
  373. line.push_str(",");
  374. line.push_str(&escape(tag));
  375. line.push_str("=");
  376. line.push_str(&escape(value));
  377. }
  378. let mut was_spaced = false;
  379. for (field, value) in measurement.fields.iter() {
  380. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  381. line.push_str(&escape(field));
  382. line.push_str("=");
  383. match value {
  384. &Value::String(ref s) => line.push_str(&as_string(s)),
  385. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  386. &Value::Float(ref f) => line.push_str(&as_float(f)),
  387. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  388. };
  389. }
  390. match measurement.timestamp {
  391. Some(t) => {
  392. line.push_str(" ");
  393. line.push_str(&t.to_string());
  394. }
  395. _ => {}
  396. }
  397. }
  398. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  399. ///
  400. /// The serialized measurement is appended to the end of the string without
  401. /// any regard for what exited in it previously.
  402. ///
  403. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  404. line.push_str(&escape_tag(measurement.key));
  405. let add_tag = |line: &mut String, key: &str, value: &str| {
  406. line.push_str(",");
  407. line.push_str(&escape_tag(key));
  408. line.push_str("=");
  409. line.push_str(&escape(value));
  410. };
  411. for (key, value) in measurement.tags.iter() {
  412. add_tag(line, key, value);
  413. }
  414. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  415. if is_first { line.push_str(" "); } else { line.push_str(","); }
  416. line.push_str(&escape_tag(key));
  417. line.push_str("=");
  418. match *value {
  419. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  420. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  421. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  422. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  423. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  424. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  425. };
  426. };
  427. let mut fields = measurement.fields.iter();
  428. // first time separate from tags with space
  429. //
  430. fields.next().map(|kv| {
  431. add_field(line, kv.0, kv.1, true);
  432. });
  433. // then seperate the rest w/ comma
  434. //
  435. for kv in fields {
  436. add_field(line, kv.0, kv.1, false);
  437. }
  438. if let Some(t) = measurement.timestamp {
  439. line.push_str(" ");
  440. line.push_str(&t.to_string());
  441. }
  442. }
  443. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  444. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  445. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  446. let _ = fs::create_dir("/tmp/mm");
  447. let ctx = zmq::Context::new();
  448. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  449. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  450. let client = Client::new();
  451. let mut buf = String::with_capacity(4096);
  452. let mut server_resp = String::with_capacity(4096);
  453. let mut count = 0;
  454. loop {
  455. if let Ok(bytes) = socket.recv_bytes(0) {
  456. if let Ok(msg) = String::from_utf8(bytes) {
  457. count = match count {
  458. 0 => {
  459. buf.push_str(&msg);
  460. 1
  461. }
  462. n @ 1...40 => {
  463. buf.push_str("\n");
  464. buf.push_str(&msg);
  465. n + 1
  466. }
  467. _ => {
  468. buf.push_str("\n");
  469. buf.push_str(&msg);
  470. match client.post(url.clone())
  471. .body(&buf)
  472. .send() {
  473. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  474. Ok(mut resp) => {
  475. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  476. let _ = warnings.send(
  477. Warning::Error(
  478. format!("Influx server: {}", server_resp)));
  479. server_resp.clear();
  480. }
  481. Err(why) => {
  482. let _ = warnings.send(
  483. Warning::Error(
  484. format!("Influx write error: {}", why)));
  485. }
  486. }
  487. buf.clear();
  488. 0
  489. }
  490. }
  491. }
  492. }
  493. }
  494. }).unwrap()
  495. }
  496. #[derive(Debug, Clone, PartialEq)]
  497. pub enum OwnedValue {
  498. String(String),
  499. Float(f64),
  500. Integer(i64),
  501. Boolean(bool),
  502. D128(d128),
  503. Uuid(Uuid),
  504. }
  505. #[derive(Clone, Debug)]
  506. pub struct OwnedMeasurement {
  507. pub key: &'static str,
  508. pub timestamp: Option<i64>,
  509. pub fields: Map<&'static str, OwnedValue>,
  510. pub tags: Map<&'static str, &'static str>,
  511. }
  512. impl OwnedMeasurement {
  513. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  514. OwnedMeasurement {
  515. key,
  516. timestamp: None,
  517. tags: new_map(n_tags),
  518. fields: new_map(n_fields),
  519. }
  520. }
  521. pub fn new(key: &'static str) -> Self {
  522. OwnedMeasurement::with_capacity(key, 4, 4)
  523. }
  524. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  525. self.tags.insert(key, value);
  526. self
  527. }
  528. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  529. self.fields.insert(key, value);
  530. self
  531. }
  532. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  533. self.timestamp = Some(timestamp);
  534. self
  535. }
  536. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  537. *self.tags.entry(key).or_insert(value) = value;
  538. self
  539. }
  540. }
  541. #[allow(unused_imports, unused_variables)]
  542. #[cfg(test)]
  543. mod tests {
  544. use super::*;
  545. use test::{black_box, Bencher};
  546. #[test]
  547. fn it_uses_the_new_tag_k_only_shortcut() {
  548. let tag_value = "one";
  549. let color = "red";
  550. let time = now();
  551. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  552. assert_eq!(m.tags.get("color"), Some(&"red"));
  553. assert_eq!(m.tags.get("tag_value"), Some(&"one"));
  554. assert_eq!(m.timestamp, Some(time));
  555. }
  556. #[test]
  557. fn it_uses_measure_macro_parenthesis_syntax() {
  558. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  559. assert_eq!(m.key, "test");
  560. assert_eq!(m.tags.get("a"), Some(&"b"));
  561. assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
  562. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  563. assert_eq!(m.timestamp, Some(1));
  564. }
  565. #[test]
  566. fn it_uses_measure_macro_on_a_self_attribute() {
  567. struct A {
  568. pub influx: InfluxWriter,
  569. }
  570. impl A {
  571. fn f(&self) {
  572. measure!(self.influx, test, t(color, "red"), i(n, 1));
  573. }
  574. }
  575. let a = A { influx: InfluxWriter::default() };
  576. a.f();
  577. }
  578. #[test]
  579. fn it_clones_an_influx_writer_to_check_both_drop() {
  580. let influx = InfluxWriter::default();
  581. measure!(influx, drop_test, i(a, 1), i(b, 2));
  582. {
  583. let influx = influx.clone();
  584. thread::spawn(move || {
  585. measure!(influx, drop_test, i(a, 3), i(b, 4));
  586. });
  587. }
  588. }
  589. #[bench]
  590. fn influx_writer_send_basic(b: &mut Bencher) {
  591. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  592. b.iter(|| {
  593. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  594. });
  595. }
  596. #[bench]
  597. fn influx_writer_send_price(b: &mut Bencher) {
  598. let m = InfluxWriter::default();
  599. b.iter(|| {
  600. measure!(m, test,
  601. tag[ticker; t!(xmr-btc).to_str()],
  602. tag[exchange; "plnx"],
  603. d128[bid; d128::zero()],
  604. d128[ask; d128::zero()],
  605. );
  606. });
  607. }
  608. #[test]
  609. fn it_checks_color_tag_error_in_non_doctest() {
  610. let (tx, rx) = channel();
  611. measure!(tx, test, tag[color;"red"], int[n;1]);
  612. let meas: OwnedMeasurement = rx.recv().unwrap();
  613. assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
  614. }
  615. #[test]
  616. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  617. let meas = measure!(@make_meas test_measurement,
  618. tag [ one => "a" ],
  619. tag [ two => "b" ],
  620. int [ three => 2 ],
  621. float [ four => 1.2345 ],
  622. string [ five => String::from("d") ],
  623. bool [ six => true ],
  624. int [ seven => { 1 + 2 } ],
  625. time [ 1 ]
  626. );
  627. assert_eq!(meas.key, "test_measurement");
  628. assert_eq!(meas.tags.get("one"), Some(&"a"));
  629. assert_eq!(meas.tags.get("two"), Some(&"b"));
  630. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  631. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  632. assert_eq!(meas.timestamp, Some(1));
  633. }
  634. #[test]
  635. fn it_uses_the_measure_macro() {
  636. let (tx, rx) = channel();
  637. measure!(tx, test_measurement,
  638. tag [ one => "a" ],
  639. tag [ two => "b" ],
  640. int [ three => 2 ],
  641. float [ four => 1.2345 ],
  642. string [ five => String::from("d") ],
  643. bool [ six => true ],
  644. int [ seven => { 1 + 2 } ],
  645. time [ 1 ]
  646. );
  647. thread::sleep(Duration::from_millis(10));
  648. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  649. assert_eq!(meas.key, "test_measurement");
  650. assert_eq!(meas.tags.get("one"), Some(&"a"));
  651. assert_eq!(meas.tags.get("two"), Some(&"b"));
  652. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  653. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  654. assert_eq!(meas.timestamp, Some(1));
  655. }
  656. #[test]
  657. fn it_uses_measure_macro_for_d128_and_uuid() {
  658. let (tx, rx) = channel();
  659. let u = Uuid::new_v4();
  660. let d = d128::zero();
  661. let t = now();
  662. measure!(tx, test_measurement,
  663. tag[one; "a"],
  664. d128[two; d],
  665. uuid[three; u],
  666. time[t]
  667. );
  668. thread::sleep(Duration::from_millis(10));
  669. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  670. assert_eq!(meas.key, "test_measurement");
  671. assert_eq!(meas.tags.get("one"), Some(&"a"));
  672. assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
  673. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
  674. assert_eq!(meas.timestamp, Some(t));
  675. }
  676. #[test]
  677. fn it_uses_the_measure_macro_alt_syntax() {
  678. let (tx, rx) = channel();
  679. measure!(tx, test_measurement,
  680. tag[one; "a"],
  681. tag[two; "b"],
  682. int[three; 2],
  683. float[four; 1.2345],
  684. string[five; String::from("d")],
  685. bool [ six => true ],
  686. int[seven; { 1 + 2 }],
  687. time[1]
  688. );
  689. thread::sleep(Duration::from_millis(10));
  690. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  691. assert_eq!(meas.key, "test_measurement");
  692. assert_eq!(meas.tags.get("one"), Some(&"a"));
  693. assert_eq!(meas.tags.get("two"), Some(&"b"));
  694. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  695. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  696. assert_eq!(meas.timestamp, Some(1));
  697. }
  698. #[test]
  699. fn it_checks_that_fields_are_separated_correctly() {
  700. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  701. assert_eq!(m.key, "test");
  702. assert_eq!(m.tags.get("a"), Some(&"one"));
  703. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  704. let mut buf = String::new();
  705. serialize_owned(&m, &mut buf);
  706. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  707. }
  708. #[test]
  709. fn try_to_break_measure_macro() {
  710. let (tx, _) = channel();
  711. measure!(tx, one, tag[x=>"y"], int[n;1]);
  712. measure!(tx, one, tag[x;"y"], int[n;1],);
  713. struct A {
  714. pub one: i32,
  715. pub two: i32,
  716. }
  717. struct B {
  718. pub a: A
  719. }
  720. let b = B { a: A { one: 1, two: 2 } };
  721. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  722. assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
  723. }
  724. #[bench]
  725. fn measure_macro_small(b: &mut Bencher) {
  726. let (tx, rx) = channel();
  727. let listener = thread::spawn(move || {
  728. loop { if rx.recv().is_err() { break } }
  729. });
  730. b.iter(|| {
  731. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  732. });
  733. }
  734. #[bench]
  735. fn measure_macro_medium(b: &mut Bencher) {
  736. let (tx, rx) = channel();
  737. let listener = thread::spawn(move || {
  738. loop { if rx.recv().is_err() { break } }
  739. });
  740. b.iter(|| {
  741. measure!(tx, test,
  742. tag[color; "red"],
  743. tag[mood => "playful"],
  744. tag [ ticker => "xmr_btc" ],
  745. float[ price => 1.2345 ],
  746. float[ amount => 56.323],
  747. int[n; 1],
  748. time[now()]
  749. );
  750. });
  751. }
  752. #[test]
  753. #[ignore]
  754. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  755. let ctx = zmq::Context::new();
  756. let socket = push(&ctx).unwrap();
  757. let (tx, rx) = channel();
  758. let w = writer(tx.clone());
  759. let mut buf = String::with_capacity(4096);
  760. let mut meas = Measurement::new("rust_test");
  761. meas.add_tag("a", "t");
  762. meas.add_field("c", Value::Float(1.23456));
  763. let now = now();
  764. meas.set_timestamp(now);
  765. serialize(&meas, &mut buf);
  766. socket.send_str(&buf, 0).unwrap();
  767. drop(w);
  768. }
  769. #[test]
  770. fn it_serializes_a_measurement_in_place() {
  771. let mut buf = String::with_capacity(4096);
  772. let mut meas = Measurement::new("rust_test");
  773. meas.add_tag("a", "b");
  774. meas.add_field("c", Value::Float(1.0));
  775. let now = now();
  776. meas.set_timestamp(now);
  777. serialize(&meas, &mut buf);
  778. let ans = format!("rust_test,a=b c=1 {}", now);
  779. assert_eq!(buf, ans);
  780. }
  781. #[test]
  782. fn it_serializes_a_hard_to_serialize_message() {
  783. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  784. let mut buf = String::new();
  785. let mut server_resp = String::new();
  786. let mut m = Measurement::new("rust_test");
  787. m.add_field("s", Value::String(&raw));
  788. let now = now();
  789. m.set_timestamp(now);
  790. serialize(&m, &mut buf);
  791. println!("{}", buf);
  792. buf.push_str("\n");
  793. let buf_copy = buf.clone();
  794. buf.push_str(&buf_copy);
  795. println!("{}", buf);
  796. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  797. let client = Client::new();
  798. match client.post(url.clone())
  799. .body(&buf)
  800. .send() {
  801. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  802. Ok(mut resp) => {
  803. resp.read_to_string(&mut server_resp).unwrap();
  804. panic!("{}", server_resp);
  805. }
  806. Err(why) => {
  807. panic!(why)
  808. }
  809. }
  810. }
  811. #[bench]
  812. fn serialize_owned_longer(b: &mut Bencher) {
  813. let mut buf = String::with_capacity(1024);
  814. let m =
  815. OwnedMeasurement::new("test")
  816. .add_tag("one", "a")
  817. .add_tag("two", "b")
  818. .add_tag("ticker", "xmr_btc")
  819. .add_tag("exchange", "plnx")
  820. .add_tag("side", "bid")
  821. .add_field("three", OwnedValue::Float(1.2345))
  822. .add_field("four", OwnedValue::Integer(57))
  823. .add_field("five", OwnedValue::Boolean(true))
  824. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  825. .set_timestamp(now());
  826. b.iter(|| {
  827. serialize_owned(&m, &mut buf);
  828. buf.clear()
  829. });
  830. }
  831. #[bench]
  832. fn serialize_owned_simple(b: &mut Bencher) {
  833. let mut buf = String::with_capacity(1024);
  834. let m =
  835. OwnedMeasurement::new("test")
  836. .add_tag("one", "a")
  837. .add_tag("two", "b")
  838. .add_field("three", OwnedValue::Float(1.2345))
  839. .add_field("four", OwnedValue::Integer(57))
  840. .set_timestamp(now());
  841. b.iter(|| {
  842. serialize_owned(&m, &mut buf);
  843. buf.clear()
  844. });
  845. }
  846. #[test]
  847. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  848. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  849. let mut buf = String::new();
  850. let mut server_resp = String::new();
  851. let m = OwnedMeasurement::new("rust_test")
  852. .add_field("s", OwnedValue::String(raw.to_string()))
  853. .set_timestamp(now());
  854. serialize_owned(&m, &mut buf);
  855. println!("{}", buf);
  856. buf.push_str("\n");
  857. let buf_copy = buf.clone();
  858. buf.push_str(&buf_copy);
  859. println!("{}", buf);
  860. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  861. let client = Client::new();
  862. match client.post(url.clone())
  863. .body(&buf)
  864. .send() {
  865. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  866. Ok(mut resp) => {
  867. resp.read_to_string(&mut server_resp).unwrap();
  868. panic!("{}", server_resp);
  869. }
  870. Err(why) => {
  871. panic!(why)
  872. }
  873. }
  874. }
  875. // macro_rules! make_measurement {
  876. // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  877. // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  878. // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  879. // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  880. // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
  881. // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
  882. // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  883. // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
  884. //
  885. // (@count_tags) => {0usize};
  886. // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  887. // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  888. //
  889. // (@count_fields) => {0usize};
  890. // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  891. // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  892. // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  893. //
  894. // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  895. // let n_tags = measure!(@count_tags $($t)*);
  896. // let n_fields = measure!(@count_fields $($t)*);
  897. // let mut meas =
  898. // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  899. // $(
  900. // measure!(@kv $t, meas, $($tail)*);
  901. // )*
  902. // //let _ = $m.send(meas);
  903. // meas
  904. // }};
  905. // }
  906. //
  907. // #[test]
  908. // fn it_checks_n_tags_is_correct() {
  909. // let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
  910. // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
  911. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
  912. // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
  913. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
  914. //
  915. // let m4 =
  916. // make_measurement!(tx, test,
  917. // tag[a;"b"],
  918. // tag[c;"d"],
  919. // int[n; 1],
  920. // tag[e;"f"],
  921. // float[x; 1.234],
  922. // tag[g;"h"],
  923. // time[1],
  924. // );
  925. // assert_eq!(m4.n_tags, 4);
  926. // assert_eq!(m4.n_fields, 2);
  927. // }
  928. }