You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1029 lines
34KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. #[cfg(feature = "warnings")]
  8. use std::fs;
  9. use std::time::Duration;
  10. use std::hash::BuildHasherDefault;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. use zmq;
  17. #[allow(unused_imports)]
  18. use chrono::{DateTime, Utc};
  19. use ordermap::OrderMap;
  20. use fnv::FnvHasher;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use smallvec::SmallVec;
  24. use super::{nanos, file_logger, LOG_LEVEL};
  25. #[cfg(feature = "warnings")]
  26. use warnings::Warning;
  27. pub use super::{dur_nanos, dt_nanos};
  28. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  29. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  30. Map::with_capacity_and_hasher(capacity, Default::default())
  31. }
  32. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  33. ///
  34. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  35. /// values, as well as sends it with the `Sender`.
  36. ///
  37. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  38. /// measurement (see `tests` mod).
  39. ///
  40. /// # Examples
  41. ///
  42. /// ```
  43. /// #[macro_use] extern crate logging;
  44. /// extern crate decimal;
  45. ///
  46. /// use std::sync::mpsc::channel;
  47. /// use decimal::d128;
  48. /// use logging::influx::*;
  49. ///
  50. /// fn main() {
  51. /// let (tx, rx) = channel();
  52. ///
  53. /// // "shorthand" syntax
  54. ///
  55. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  56. ///
  57. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  58. ///
  59. /// assert_eq!(meas.key, "test");
  60. /// assert_eq!(meas.get_tag("color"), Some("red"));
  61. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  62. ///
  63. /// // alternate syntax ...
  64. ///
  65. /// measure!(tx, test,
  66. /// tag [ one => "a" ],
  67. /// tag [ two => "b" ],
  68. /// int [ three => 2 ],
  69. /// float [ four => 1.2345 ],
  70. /// string [ five => String::from("d") ],
  71. /// bool [ six => true ],
  72. /// int [ seven => { 1 + 2 } ],
  73. /// time [ 1 ]
  74. /// );
  75. ///
  76. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  77. ///
  78. /// assert_eq!(meas.key, "test");
  79. /// assert_eq!(meas.get_tag("one"), Some("a"));
  80. /// assert_eq!(meas.get_tag("two"), Some("b"));
  81. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  82. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  83. /// assert_eq!(meas.timestamp, Some(1));
  84. ///
  85. /// // use the @make_meas flag to skip sending a measurement, instead merely
  86. /// // creating it.
  87. ///
  88. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  89. ///
  90. /// // each variant also has shorthand aliases
  91. ///
  92. /// let meas: OwnedMeasurement =
  93. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  94. /// }
  95. /// ```
  96. ///
  97. #[macro_export]
  98. macro_rules! measure {
  99. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  100. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  101. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  102. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  103. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  104. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  105. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  106. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  107. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  108. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  109. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  110. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  111. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  112. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  113. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  114. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  115. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  116. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  117. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  118. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  119. (@as_expr $e:expr) => {$e};
  120. (@count_tags) => {0usize};
  121. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  122. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  123. (@count_fields) => {0usize};
  124. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  125. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  126. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  127. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  128. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  129. };
  130. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  131. let n_tags = measure!(@count_tags $($t)*);
  132. let n_fields = measure!(@count_fields $($t)*);
  133. let mut meas =
  134. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  135. $(
  136. measure!(@kv $t, meas, $($tail)*);
  137. )*
  138. meas
  139. }};
  140. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  141. measure!($m, $name, $($t [ $($tail)* ] ),+)
  142. };
  143. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  144. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  145. let _ = $m.send(measurement);
  146. }};
  147. }
  148. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  149. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  150. /// measurements have accumulated.
  151. ///
  152. #[derive(Debug)]
  153. pub struct InfluxWriter {
  154. host: String,
  155. db: String,
  156. tx: Sender<Option<OwnedMeasurement>>,
  157. thread: Option<Arc<thread::JoinHandle<()>>>,
  158. }
  159. impl Default for InfluxWriter {
  160. fn default() -> Self {
  161. //if cfg!(any(test, feature = "test")) {
  162. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  163. //} else {
  164. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 4000)
  165. //}
  166. }
  167. }
  168. impl Clone for InfluxWriter {
  169. fn clone(&self) -> Self {
  170. debug_assert!(self.thread.is_some());
  171. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  172. InfluxWriter {
  173. host: self.host.to_string(),
  174. db: self.db.to_string(),
  175. tx: self.tx.clone(),
  176. thread,
  177. }
  178. }
  179. }
  180. impl InfluxWriter {
  181. /// Sends the `OwnedMeasurement` to the serialization thread.
  182. ///
  183. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  184. self.tx.send(Some(m))
  185. }
  186. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  187. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  188. self.tx.clone()
  189. }
  190. #[allow(unused_assignments)]
  191. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  192. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  193. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  194. #[cfg(feature = "no-influx-buffer")]
  195. let buffer_size = 0u16;
  196. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  197. let url =
  198. Url::parse_with_params(&format!("http://{}:8086/write", host),
  199. &[("db", db), ("precision", "ns")])
  200. .expect("influx writer url should parse");
  201. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  202. let client = Client::new();
  203. debug!(logger, "initializing buffers");
  204. let mut buf = String::with_capacity(32 * 32 * 32);
  205. let mut count = 0;
  206. let send = |buf: &str| {
  207. let resp = client.post(url.clone())
  208. .body(buf)
  209. .send();
  210. match resp {
  211. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  212. debug!(logger, "server responded ok: 204 NoContent");
  213. }
  214. Ok(mut resp) => {
  215. let mut server_resp = String::with_capacity(1024);
  216. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  217. error!(logger, "influx server error";
  218. "status" => resp.status.to_string(),
  219. "body" => server_resp);
  220. }
  221. Err(why) => {
  222. error!(logger, "http request failed: {:?}", why);
  223. }
  224. }
  225. };
  226. let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
  227. match prev {
  228. 0 if buffer_size > 0 => {
  229. serialize_owned(m, buf);
  230. 1
  231. }
  232. n if n < buffer_size => {
  233. buf.push_str("\n");
  234. serialize_owned(m, buf);
  235. n + 1
  236. }
  237. n => {
  238. buf.push_str("\n");
  239. serialize_owned(m, buf);
  240. debug!(logger, "sending buffer to influx"; "len" => n);
  241. send(buf);
  242. buf.clear();
  243. 0
  244. }
  245. }
  246. };
  247. loop {
  248. match rx.recv() {
  249. Ok(Some(mut meas)) => {
  250. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  251. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  252. count = next(count, &meas, &mut buf);
  253. }
  254. Ok(None) => {
  255. if buf.len() > 0 {
  256. debug!(logger, "sending buffer to influx"; "len" => count);
  257. send(&buf)
  258. }
  259. break
  260. }
  261. _ => {
  262. thread::sleep(Duration::new(0, 1))
  263. }
  264. }
  265. }
  266. }).unwrap();
  267. InfluxWriter {
  268. host: host.to_string(),
  269. db: db.to_string(),
  270. tx,
  271. thread: Some(Arc::new(thread))
  272. }
  273. }
  274. }
  275. impl Drop for InfluxWriter {
  276. fn drop(&mut self) {
  277. if let Some(arc) = self.thread.take() {
  278. if let Ok(thread) = Arc::try_unwrap(arc) {
  279. let _ = self.tx.send(None);
  280. let _ = thread.join();
  281. }
  282. }
  283. }
  284. }
  285. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  286. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  287. let socket = ctx.socket(zmq::PULL)?;
  288. socket.bind(WRITER_ADDR)?;
  289. socket.set_rcvhwm(0)?;
  290. Ok(socket)
  291. }
  292. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  293. let socket = ctx.socket(zmq::PUSH)?;
  294. socket.connect(WRITER_ADDR)?;
  295. socket.set_sndhwm(0)?;
  296. Ok(socket)
  297. }
  298. /// This removes offending things rather than escaping them.
  299. ///
  300. fn escape_tag(s: &str) -> String {
  301. s.replace(" ", "")
  302. .replace(",", "")
  303. .replace("\"", "")
  304. }
  305. fn escape(s: &str) -> String {
  306. s.replace(" ", "\\ ")
  307. .replace(",", "\\,")
  308. }
  309. fn as_string(s: &str) -> String {
  310. // the second replace removes double escapes
  311. //
  312. format!("\"{}\"", s.replace("\"", "\\\"")
  313. .replace(r#"\\""#, r#"\""#))
  314. }
  315. #[test]
  316. fn it_checks_as_string_does_not_double_escape() {
  317. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  318. let escaped = as_string(&raw);
  319. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  320. }
  321. fn as_integer(i: &i64) -> String {
  322. format!("{}i", i)
  323. }
  324. fn as_float(f: &f64) -> String {
  325. f.to_string()
  326. }
  327. fn as_boolean(b: &bool) -> &str {
  328. if *b { "t" } else { "f" }
  329. }
  330. pub fn now() -> i64 {
  331. nanos(Utc::now()) as i64
  332. }
  333. /// Serialize the measurement into influx line protocol
  334. /// and append to the buffer.
  335. ///
  336. /// # Examples
  337. ///
  338. /// ```
  339. /// extern crate influent;
  340. /// extern crate logging;
  341. ///
  342. /// use influent::measurement::{Measurement, Value};
  343. /// use std::string::String;
  344. /// use logging::influx::serialize;
  345. ///
  346. /// fn main() {
  347. /// let mut buf = String::new();
  348. /// let mut m = Measurement::new("test");
  349. /// m.add_field("x", Value::Integer(1));
  350. /// serialize(&m, &mut buf);
  351. /// }
  352. ///
  353. /// ```
  354. ///
  355. pub fn serialize(measurement: &Measurement, line: &mut String) {
  356. line.push_str(&escape(measurement.key));
  357. for (tag, value) in measurement.tags.iter() {
  358. line.push_str(",");
  359. line.push_str(&escape(tag));
  360. line.push_str("=");
  361. line.push_str(&escape(value));
  362. }
  363. let mut was_spaced = false;
  364. for (field, value) in measurement.fields.iter() {
  365. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  366. line.push_str(&escape(field));
  367. line.push_str("=");
  368. match value {
  369. &Value::String(ref s) => line.push_str(&as_string(s)),
  370. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  371. &Value::Float(ref f) => line.push_str(&as_float(f)),
  372. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  373. };
  374. }
  375. match measurement.timestamp {
  376. Some(t) => {
  377. line.push_str(" ");
  378. line.push_str(&t.to_string());
  379. }
  380. _ => {}
  381. }
  382. }
  383. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  384. ///
  385. /// The serialized measurement is appended to the end of the string without
  386. /// any regard for what exited in it previously.
  387. ///
  388. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  389. line.push_str(&escape_tag(measurement.key));
  390. let add_tag = |line: &mut String, key: &str, value: &str| {
  391. line.push_str(",");
  392. line.push_str(&escape_tag(key));
  393. line.push_str("=");
  394. line.push_str(&escape(value));
  395. };
  396. for &(key, value) in measurement.tags.iter() {
  397. add_tag(line, key, value);
  398. }
  399. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  400. if is_first { line.push_str(" "); } else { line.push_str(","); }
  401. line.push_str(&escape_tag(key));
  402. line.push_str("=");
  403. match *value {
  404. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  405. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  406. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  407. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  408. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  409. #[cfg(not(feature = "disable-short-uuid"))]
  410. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  411. #[cfg(feature = "disable-short-uuid")]
  412. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  413. };
  414. };
  415. let mut fields = measurement.fields.iter();
  416. // first time separate from tags with space
  417. //
  418. fields.next().map(|kv| {
  419. add_field(line, &kv.0, &kv.1, true);
  420. });
  421. // then seperate the rest w/ comma
  422. //
  423. for kv in fields {
  424. add_field(line, kv.0, &kv.1, false);
  425. }
  426. if let Some(t) = measurement.timestamp {
  427. line.push_str(" ");
  428. line.push_str(&t.to_string());
  429. }
  430. }
  431. #[cfg(feature = "warnings")]
  432. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  433. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  434. assert!(false);
  435. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  436. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  437. let _ = fs::create_dir("/tmp/mm");
  438. let ctx = zmq::Context::new();
  439. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  440. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  441. let client = Client::new();
  442. let mut buf = String::with_capacity(4096);
  443. let mut server_resp = String::with_capacity(4096);
  444. let mut count = 0;
  445. loop {
  446. if let Ok(bytes) = socket.recv_bytes(0) {
  447. if let Ok(msg) = String::from_utf8(bytes) {
  448. count = match count {
  449. 0 => {
  450. buf.push_str(&msg);
  451. 1
  452. }
  453. n @ 1...40 => {
  454. buf.push_str("\n");
  455. buf.push_str(&msg);
  456. n + 1
  457. }
  458. _ => {
  459. buf.push_str("\n");
  460. buf.push_str(&msg);
  461. match client.post(url.clone())
  462. .body(&buf)
  463. .send() {
  464. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  465. Ok(mut resp) => {
  466. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  467. let _ = warnings.send(
  468. Warning::Error(
  469. format!("Influx server: {}", server_resp)));
  470. server_resp.clear();
  471. }
  472. Err(why) => {
  473. let _ = warnings.send(
  474. Warning::Error(
  475. format!("Influx write error: {}", why)));
  476. }
  477. }
  478. buf.clear();
  479. 0
  480. }
  481. }
  482. }
  483. }
  484. }
  485. }).unwrap()
  486. }
  487. #[derive(Debug, Clone, PartialEq)]
  488. pub enum OwnedValue {
  489. String(String),
  490. Float(f64),
  491. Integer(i64),
  492. Boolean(bool),
  493. D128(d128),
  494. Uuid(Uuid),
  495. }
  496. /// Holds data meant for an influxdb measurement in transit to the
  497. /// writing thread.
  498. ///
  499. /// TODO: convert `Map` to `SmallVec`?
  500. ///
  501. #[derive(Clone, Debug)]
  502. pub struct OwnedMeasurement {
  503. pub key: &'static str,
  504. pub timestamp: Option<i64>,
  505. //pub fields: Map<&'static str, OwnedValue>,
  506. //pub tags: Map<&'static str, &'static str>,
  507. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  508. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  509. }
  510. impl OwnedMeasurement {
  511. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  512. OwnedMeasurement {
  513. key,
  514. timestamp: None,
  515. tags: SmallVec::with_capacity(n_tags),
  516. fields: SmallVec::with_capacity(n_fields),
  517. }
  518. }
  519. pub fn new(key: &'static str) -> Self {
  520. OwnedMeasurement {
  521. key,
  522. timestamp: None,
  523. tags: SmallVec::new(),
  524. fields: SmallVec::new(),
  525. }
  526. }
  527. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  528. self.tags.push((key, value));
  529. self
  530. }
  531. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  532. self.fields.push((key, value));
  533. self
  534. }
  535. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  536. self.timestamp = Some(timestamp);
  537. self
  538. }
  539. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  540. match self.tags.iter().position(|kv| kv.0 == key) {
  541. Some(i) => {
  542. self.tags.get_mut(i)
  543. .map(|x| {
  544. x.0 = value;
  545. });
  546. self
  547. }
  548. None => {
  549. self.add_tag(key, value)
  550. }
  551. }
  552. }
  553. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  554. self.fields.iter()
  555. .find(|kv| kv.0 == key)
  556. .map(|kv| &kv.1)
  557. }
  558. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  559. self.tags.iter()
  560. .find(|kv| kv.0 == key)
  561. .map(|kv| kv.1)
  562. }
  563. }
  564. #[allow(unused_imports, unused_variables)]
  565. #[cfg(test)]
  566. mod tests {
  567. use super::*;
  568. use test::{black_box, Bencher};
  569. #[test]
  570. fn it_uses_the_new_tag_k_only_shortcut() {
  571. let tag_value = "one";
  572. let color = "red";
  573. let time = now();
  574. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  575. assert_eq!(m.get_tag("color"), Some("red"));
  576. assert_eq!(m.get_tag("tag_value"), Some("one"));
  577. assert_eq!(m.timestamp, Some(time));
  578. }
  579. #[test]
  580. fn it_uses_measure_macro_parenthesis_syntax() {
  581. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  582. assert_eq!(m.key, "test");
  583. assert_eq!(m.get_tag("a"), Some("b"));
  584. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  585. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  586. assert_eq!(m.timestamp, Some(1));
  587. }
  588. #[test]
  589. fn it_uses_measure_macro_on_a_self_attribute() {
  590. struct A {
  591. pub influx: InfluxWriter,
  592. }
  593. impl A {
  594. fn f(&self) {
  595. measure!(self.influx, test, t(color, "red"), i(n, 1));
  596. }
  597. }
  598. let a = A { influx: InfluxWriter::default() };
  599. a.f();
  600. }
  601. #[test]
  602. fn it_clones_an_influx_writer_to_check_both_drop() {
  603. let influx = InfluxWriter::default();
  604. measure!(influx, drop_test, i(a, 1), i(b, 2));
  605. {
  606. let influx = influx.clone();
  607. thread::spawn(move || {
  608. measure!(influx, drop_test, i(a, 3), i(b, 4));
  609. });
  610. }
  611. }
  612. #[bench]
  613. fn influx_writer_send_basic(b: &mut Bencher) {
  614. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  615. b.iter(|| {
  616. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  617. });
  618. }
  619. #[bench]
  620. fn influx_writer_send_price(b: &mut Bencher) {
  621. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  622. b.iter(|| {
  623. measure!(m, test,
  624. t(ticker, t!(xmr-btc).as_str()),
  625. t(exchange, "plnx"),
  626. d(bid, d128::zero()),
  627. d(ask, d128::zero()),
  628. );
  629. });
  630. }
  631. #[test]
  632. fn it_checks_color_tag_error_in_non_doctest() {
  633. let (tx, rx) = channel();
  634. measure!(tx, test, tag[color;"red"], int[n;1]);
  635. let meas: OwnedMeasurement = rx.recv().unwrap();
  636. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  637. }
  638. #[test]
  639. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  640. let meas = measure!(@make_meas test_measurement,
  641. tag [ one => "a" ],
  642. tag [ two => "b" ],
  643. int [ three => 2 ],
  644. float [ four => 1.2345 ],
  645. string [ five => String::from("d") ],
  646. bool [ six => true ],
  647. int [ seven => { 1 + 2 } ],
  648. time [ 1 ]
  649. );
  650. assert_eq!(meas.key, "test_measurement");
  651. assert_eq!(meas.get_tag("one"), Some("a"));
  652. assert_eq!(meas.get_tag("two"), Some("b"));
  653. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  654. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  655. assert_eq!(meas.timestamp, Some(1));
  656. }
  657. #[test]
  658. fn it_uses_the_measure_macro() {
  659. let (tx, rx) = channel();
  660. measure!(tx, test_measurement,
  661. tag [ one => "a" ],
  662. tag [ two => "b" ],
  663. int [ three => 2 ],
  664. float [ four => 1.2345 ],
  665. string [ five => String::from("d") ],
  666. bool [ six => true ],
  667. int [ seven => { 1 + 2 } ],
  668. time [ 1 ]
  669. );
  670. thread::sleep(Duration::from_millis(10));
  671. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  672. assert_eq!(meas.key, "test_measurement");
  673. assert_eq!(meas.get_tag("one"), Some("a"));
  674. assert_eq!(meas.get_tag("two"), Some("b"));
  675. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  676. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  677. assert_eq!(meas.timestamp, Some(1));
  678. }
  679. #[test]
  680. fn it_uses_measure_macro_for_d128_and_uuid() {
  681. let (tx, rx) = channel();
  682. let u = Uuid::new_v4();
  683. let d = d128::zero();
  684. let t = now();
  685. measure!(tx, test_measurement,
  686. tag[one; "a"],
  687. d128[two; d],
  688. uuid[three; u],
  689. time[t]
  690. );
  691. thread::sleep(Duration::from_millis(10));
  692. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  693. assert_eq!(meas.key, "test_measurement");
  694. assert_eq!(meas.get_tag("one"), Some("a"));
  695. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  696. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  697. assert_eq!(meas.timestamp, Some(t));
  698. }
  699. #[test]
  700. fn it_uses_the_measure_macro_alt_syntax() {
  701. let (tx, rx) = channel();
  702. measure!(tx, test_measurement,
  703. tag[one; "a"],
  704. tag[two; "b"],
  705. int[three; 2],
  706. float[four; 1.2345],
  707. string[five; String::from("d")],
  708. bool [ six => true ],
  709. int[seven; { 1 + 2 }],
  710. time[1]
  711. );
  712. thread::sleep(Duration::from_millis(10));
  713. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  714. assert_eq!(meas.key, "test_measurement");
  715. assert_eq!(meas.get_tag("one"), Some("a"));
  716. assert_eq!(meas.get_tag("two"), Some("b"));
  717. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  718. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  719. assert_eq!(meas.timestamp, Some(1));
  720. }
  721. #[test]
  722. fn it_checks_that_fields_are_separated_correctly() {
  723. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  724. assert_eq!(m.key, "test");
  725. assert_eq!(m.get_tag("a"), Some("one"));
  726. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  727. let mut buf = String::new();
  728. serialize_owned(&m, &mut buf);
  729. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  730. }
  731. #[test]
  732. fn try_to_break_measure_macro() {
  733. let (tx, _) = channel();
  734. measure!(tx, one, tag[x=>"y"], int[n;1]);
  735. measure!(tx, one, tag[x;"y"], int[n;1],);
  736. struct A {
  737. pub one: i32,
  738. pub two: i32,
  739. }
  740. struct B {
  741. pub a: A
  742. }
  743. let b = B { a: A { one: 1, two: 2 } };
  744. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  745. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  746. }
  747. #[bench]
  748. fn measure_macro_small(b: &mut Bencher) {
  749. let (tx, rx) = channel();
  750. let listener = thread::spawn(move || {
  751. loop { if rx.recv().is_err() { break } }
  752. });
  753. b.iter(|| {
  754. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  755. });
  756. }
  757. #[bench]
  758. fn measure_macro_medium(b: &mut Bencher) {
  759. let (tx, rx) = channel();
  760. let listener = thread::spawn(move || {
  761. loop { if rx.recv().is_err() { break } }
  762. });
  763. b.iter(|| {
  764. measure!(tx, test,
  765. tag[color; "red"],
  766. tag[mood => "playful"],
  767. tag [ ticker => "xmr_btc" ],
  768. float[ price => 1.2345 ],
  769. float[ amount => 56.323],
  770. int[n; 1],
  771. time[now()]
  772. );
  773. });
  774. }
  775. #[cfg(feature = "warnings")]
  776. #[test]
  777. #[ignore]
  778. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  779. let ctx = zmq::Context::new();
  780. let socket = push(&ctx).unwrap();
  781. let (tx, rx) = channel();
  782. let w = writer(tx.clone());
  783. let mut buf = String::with_capacity(4096);
  784. let mut meas = Measurement::new("rust_test");
  785. meas.add_tag("a", "t");
  786. meas.add_field("c", Value::Float(1.23456));
  787. let now = now();
  788. meas.set_timestamp(now);
  789. serialize(&meas, &mut buf);
  790. socket.send_str(&buf, 0).unwrap();
  791. drop(w);
  792. }
  793. #[test]
  794. fn it_serializes_a_measurement_in_place() {
  795. let mut buf = String::with_capacity(4096);
  796. let mut meas = Measurement::new("rust_test");
  797. meas.add_tag("a", "b");
  798. meas.add_field("c", Value::Float(1.0));
  799. let now = now();
  800. meas.set_timestamp(now);
  801. serialize(&meas, &mut buf);
  802. let ans = format!("rust_test,a=b c=1 {}", now);
  803. assert_eq!(buf, ans);
  804. }
  805. #[test]
  806. fn it_serializes_a_hard_to_serialize_message() {
  807. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  808. let mut buf = String::new();
  809. let mut server_resp = String::new();
  810. let mut m = Measurement::new("rust_test");
  811. m.add_field("s", Value::String(&raw));
  812. let now = now();
  813. m.set_timestamp(now);
  814. serialize(&m, &mut buf);
  815. println!("{}", buf);
  816. buf.push_str("\n");
  817. let buf_copy = buf.clone();
  818. buf.push_str(&buf_copy);
  819. println!("{}", buf);
  820. let url = Url::parse_with_params("localhost", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  821. let client = Client::new();
  822. match client.post(url.clone())
  823. .body(&buf)
  824. .send() {
  825. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  826. Ok(mut resp) => {
  827. resp.read_to_string(&mut server_resp).unwrap();
  828. panic!("{}", server_resp);
  829. }
  830. Err(why) => {
  831. panic!(why)
  832. }
  833. }
  834. }
  835. #[bench]
  836. fn serialize_owned_longer(b: &mut Bencher) {
  837. let mut buf = String::with_capacity(1024);
  838. let m =
  839. OwnedMeasurement::new("test")
  840. .add_tag("one", "a")
  841. .add_tag("two", "b")
  842. .add_tag("ticker", "xmr_btc")
  843. .add_tag("exchange", "plnx")
  844. .add_tag("side", "bid")
  845. .add_field("three", OwnedValue::Float(1.2345))
  846. .add_field("four", OwnedValue::Integer(57))
  847. .add_field("five", OwnedValue::Boolean(true))
  848. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  849. .set_timestamp(now());
  850. b.iter(|| {
  851. serialize_owned(&m, &mut buf);
  852. buf.clear()
  853. });
  854. }
  855. #[bench]
  856. fn serialize_owned_simple(b: &mut Bencher) {
  857. let mut buf = String::with_capacity(1024);
  858. let m =
  859. OwnedMeasurement::new("test")
  860. .add_tag("one", "a")
  861. .add_tag("two", "b")
  862. .add_field("three", OwnedValue::Float(1.2345))
  863. .add_field("four", OwnedValue::Integer(57))
  864. .set_timestamp(now());
  865. b.iter(|| {
  866. serialize_owned(&m, &mut buf);
  867. buf.clear()
  868. });
  869. }
  870. #[test]
  871. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  872. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  873. let mut buf = String::new();
  874. let mut server_resp = String::new();
  875. let m = OwnedMeasurement::new("rust_test")
  876. .add_field("s", OwnedValue::String(raw.to_string()))
  877. .set_timestamp(now());
  878. serialize_owned(&m, &mut buf);
  879. println!("{}", buf);
  880. buf.push_str("\n");
  881. let buf_copy = buf.clone();
  882. buf.push_str(&buf_copy);
  883. println!("{}", buf);
  884. let url = Url::parse_with_params("localhost", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  885. let client = Client::new();
  886. match client.post(url.clone())
  887. .body(&buf)
  888. .send() {
  889. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  890. Ok(mut resp) => {
  891. resp.read_to_string(&mut server_resp).unwrap();
  892. panic!("{}", server_resp);
  893. }
  894. Err(why) => {
  895. panic!(why)
  896. }
  897. }
  898. }
  899. }