You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1077 lines
36KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. #[cfg(feature = "warnings")]
  8. use std::fs;
  9. use std::time::Duration;
  10. use std::hash::BuildHasherDefault;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. use zmq;
  17. #[allow(unused_imports)]
  18. use chrono::{DateTime, Utc};
  19. use ordermap::OrderMap;
  20. use fnv::FnvHasher;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use smallvec::SmallVec;
  24. use slog::Logger;
  25. use super::{nanos, file_logger, LOG_LEVEL};
  26. #[cfg(feature = "warnings")]
  27. use warnings::Warning;
  28. pub use super::{dur_nanos, dt_nanos};
  29. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  30. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  31. Map::with_capacity_and_hasher(capacity, Default::default())
  32. }
  33. /// Created this so I know what types can be passed through the
  34. /// `measure!` macro, which used to convert with `as i64` and
  35. /// `as f64` until I accidentally passed a function name, and it
  36. /// still compiled, but with garbage numbers.
  37. pub trait AsI64 {
  38. fn as_i64(x: Self) -> i64;
  39. }
  40. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  41. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  42. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  43. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  44. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  45. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  46. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  47. /// Created this so I know what types can be passed through the
  48. /// `measure!` macro, which used to convert with `as i64` and
  49. /// `as f64` until I accidentally passed a function name, and it
  50. /// still compiled, but with garbage numbers.
  51. pub trait AsF64 {
  52. fn as_f64(x: Self) -> f64;
  53. }
  54. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  55. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  56. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  57. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  58. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  59. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  60. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  61. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  62. ///
  63. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  64. /// values, as well as sends it with the `Sender`.
  65. ///
  66. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  67. /// measurement (see `tests` mod).
  68. ///
  69. /// # Examples
  70. ///
  71. /// ```
  72. /// #![feature(try_from)]
  73. /// #[macro_use] extern crate logging;
  74. /// extern crate decimal;
  75. ///
  76. /// use std::sync::mpsc::channel;
  77. /// use decimal::d128;
  78. /// use logging::influx::*;
  79. ///
  80. /// fn main() {
  81. /// let (tx, rx) = channel();
  82. ///
  83. /// // "shorthand" syntax
  84. ///
  85. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  86. ///
  87. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  88. ///
  89. /// assert_eq!(meas.key, "test");
  90. /// assert_eq!(meas.get_tag("color"), Some("red"));
  91. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  92. ///
  93. /// // alternate syntax ...
  94. ///
  95. /// measure!(tx, test,
  96. /// tag [ one => "a" ],
  97. /// tag [ two => "b" ],
  98. /// int [ three => 2 ],
  99. /// float [ four => 1.2345 ],
  100. /// string [ five => String::from("d") ],
  101. /// bool [ six => true ],
  102. /// int [ seven => { 1 + 2 } ],
  103. /// time [ 1 ]
  104. /// );
  105. ///
  106. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  107. ///
  108. /// assert_eq!(meas.key, "test");
  109. /// assert_eq!(meas.get_tag("one"), Some("a"));
  110. /// assert_eq!(meas.get_tag("two"), Some("b"));
  111. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  112. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  113. /// assert_eq!(meas.timestamp, Some(1));
  114. ///
  115. /// // use the @make_meas flag to skip sending a measurement, instead merely
  116. /// // creating it.
  117. ///
  118. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  119. ///
  120. /// // each variant also has shorthand aliases
  121. ///
  122. /// let meas: OwnedMeasurement =
  123. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  124. /// }
  125. /// ```
  126. ///
  127. #[macro_export]
  128. macro_rules! measure {
  129. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  130. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  131. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  132. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  133. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  134. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  135. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  136. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  137. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  138. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  139. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  140. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  141. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  142. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  143. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  144. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  145. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  146. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  147. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  148. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  149. (@as_expr $e:expr) => {$e};
  150. (@count_tags) => {0usize};
  151. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  152. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  153. (@count_fields) => {0usize};
  154. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  155. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  156. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  157. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  158. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  159. };
  160. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  161. let n_tags = measure!(@count_tags $($t)*);
  162. let n_fields = measure!(@count_fields $($t)*);
  163. let mut meas =
  164. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  165. $(
  166. measure!(@kv $t, meas, $($tail)*);
  167. )*
  168. meas
  169. }};
  170. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  171. measure!($m, $name, $($t [ $($tail)* ] ),+)
  172. };
  173. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  174. #[allow(unused_imports)]
  175. use $crate::influx::{AsI64, AsF64};
  176. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  177. let _ = $m.send(measurement);
  178. }};
  179. }
  180. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  181. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  182. /// measurements have accumulated.
  183. ///
  184. #[derive(Debug)]
  185. pub struct InfluxWriter {
  186. host: String,
  187. db: String,
  188. tx: Sender<Option<OwnedMeasurement>>,
  189. thread: Option<Arc<thread::JoinHandle<()>>>,
  190. }
  191. impl Default for InfluxWriter {
  192. fn default() -> Self {
  193. //if cfg!(any(test, feature = "test")) {
  194. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  195. //} else {
  196. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 4000)
  197. //}
  198. }
  199. }
  200. impl Clone for InfluxWriter {
  201. fn clone(&self) -> Self {
  202. debug_assert!(self.thread.is_some());
  203. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  204. InfluxWriter {
  205. host: self.host.to_string(),
  206. db: self.db.to_string(),
  207. tx: self.tx.clone(),
  208. thread,
  209. }
  210. }
  211. }
  212. impl InfluxWriter {
  213. /// Sends the `OwnedMeasurement` to the serialization thread.
  214. ///
  215. #[cfg_attr(feature = "inlines", inline)]
  216. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  217. self.tx.send(Some(m))
  218. }
  219. #[cfg_attr(feature = "inlines", inline)]
  220. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  221. #[cfg_attr(feature = "inlines", inline)]
  222. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  223. #[cfg_attr(feature = "inlines", inline)]
  224. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  225. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  226. self.tx.clone()
  227. }
  228. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  229. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  230. Self::with_logger(host, db, buffer_size, logger)
  231. }
  232. #[allow(unused_assignments)]
  233. pub fn with_logger(host: &str, db: &str, buffer_size: u16, logger: Logger) -> Self {
  234. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  235. #[cfg(feature = "no-influx-buffer")]
  236. let buffer_size = 0u16;
  237. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  238. let url =
  239. Url::parse_with_params(&format!("http://{}:8086/write", host),
  240. &[("db", db), ("precision", "ns")])
  241. .expect("influx writer url should parse");
  242. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  243. let client = Client::new();
  244. debug!(logger, "initializing buffers");
  245. let mut buf = String::with_capacity(32 * 32 * 32);
  246. let mut count = 0;
  247. let send = |buf: &str| {
  248. let resp = client.post(url.clone())
  249. .body(buf)
  250. .send();
  251. match resp {
  252. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  253. debug!(logger, "server responded ok: 204 NoContent");
  254. }
  255. Ok(mut resp) => {
  256. let mut server_resp = String::with_capacity(1024);
  257. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  258. error!(logger, "influx server error";
  259. "status" => resp.status.to_string(),
  260. "body" => server_resp);
  261. }
  262. Err(why) => {
  263. error!(logger, "http request failed: {:?}", why);
  264. }
  265. }
  266. };
  267. let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
  268. match prev {
  269. 0 if buffer_size > 0 => {
  270. serialize_owned(m, buf);
  271. 1
  272. }
  273. n if n < buffer_size => {
  274. buf.push_str("\n");
  275. serialize_owned(m, buf);
  276. n + 1
  277. }
  278. n => {
  279. buf.push_str("\n");
  280. serialize_owned(m, buf);
  281. debug!(logger, "sending buffer to influx"; "len" => n);
  282. send(buf);
  283. buf.clear();
  284. 0
  285. }
  286. }
  287. };
  288. loop {
  289. match rx.recv() {
  290. Ok(Some(mut meas)) => {
  291. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  292. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  293. count = next(count, &meas, &mut buf);
  294. }
  295. Ok(None) => {
  296. if buf.len() > 0 {
  297. debug!(logger, "sending buffer to influx"; "len" => count);
  298. send(&buf)
  299. }
  300. break
  301. }
  302. _ => {
  303. thread::sleep(Duration::new(0, 1))
  304. }
  305. }
  306. }
  307. }).unwrap();
  308. InfluxWriter {
  309. host: host.to_string(),
  310. db: db.to_string(),
  311. tx,
  312. thread: Some(Arc::new(thread))
  313. }
  314. }
  315. }
  316. impl Drop for InfluxWriter {
  317. fn drop(&mut self) {
  318. if let Some(arc) = self.thread.take() {
  319. if let Ok(thread) = Arc::try_unwrap(arc) {
  320. let _ = self.tx.send(None);
  321. let _ = thread.join();
  322. }
  323. }
  324. }
  325. }
  326. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  327. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  328. let socket = ctx.socket(zmq::PULL)?;
  329. socket.bind(WRITER_ADDR)?;
  330. socket.set_rcvhwm(0)?;
  331. Ok(socket)
  332. }
  333. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  334. let socket = ctx.socket(zmq::PUSH)?;
  335. socket.connect(WRITER_ADDR)?;
  336. socket.set_sndhwm(0)?;
  337. Ok(socket)
  338. }
  339. /// This removes offending things rather than escaping them.
  340. ///
  341. fn escape_tag(s: &str) -> String {
  342. s.replace(" ", "")
  343. .replace(",", "")
  344. .replace("\"", "")
  345. }
  346. fn escape(s: &str) -> String {
  347. s.replace(" ", "\\ ")
  348. .replace(",", "\\,")
  349. }
  350. fn as_string(s: &str) -> String {
  351. // the second replace removes double escapes
  352. //
  353. format!("\"{}\"", s.replace("\"", "\\\"")
  354. .replace(r#"\\""#, r#"\""#))
  355. }
  356. #[test]
  357. fn it_checks_as_string_does_not_double_escape() {
  358. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  359. let escaped = as_string(&raw);
  360. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  361. }
  362. fn as_integer(i: &i64) -> String {
  363. format!("{}i", i)
  364. }
  365. fn as_float(f: &f64) -> String {
  366. f.to_string()
  367. }
  368. fn as_boolean(b: &bool) -> &str {
  369. if *b { "t" } else { "f" }
  370. }
  371. pub fn now() -> i64 {
  372. nanos(Utc::now()) as i64
  373. }
  374. /// Serialize the measurement into influx line protocol
  375. /// and append to the buffer.
  376. ///
  377. /// # Examples
  378. ///
  379. /// ```
  380. /// extern crate influent;
  381. /// extern crate logging;
  382. ///
  383. /// use influent::measurement::{Measurement, Value};
  384. /// use std::string::String;
  385. /// use logging::influx::serialize;
  386. ///
  387. /// fn main() {
  388. /// let mut buf = String::new();
  389. /// let mut m = Measurement::new("test");
  390. /// m.add_field("x", Value::Integer(1));
  391. /// serialize(&m, &mut buf);
  392. /// }
  393. ///
  394. /// ```
  395. ///
  396. pub fn serialize(measurement: &Measurement, line: &mut String) {
  397. line.push_str(&escape(measurement.key));
  398. for (tag, value) in measurement.tags.iter() {
  399. line.push_str(",");
  400. line.push_str(&escape(tag));
  401. line.push_str("=");
  402. line.push_str(&escape(value));
  403. }
  404. let mut was_spaced = false;
  405. for (field, value) in measurement.fields.iter() {
  406. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  407. line.push_str(&escape(field));
  408. line.push_str("=");
  409. match value {
  410. &Value::String(ref s) => line.push_str(&as_string(s)),
  411. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  412. &Value::Float(ref f) => line.push_str(&as_float(f)),
  413. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  414. };
  415. }
  416. match measurement.timestamp {
  417. Some(t) => {
  418. line.push_str(" ");
  419. line.push_str(&t.to_string());
  420. }
  421. _ => {}
  422. }
  423. }
  424. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  425. ///
  426. /// The serialized measurement is appended to the end of the string without
  427. /// any regard for what exited in it previously.
  428. ///
  429. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  430. line.push_str(&escape_tag(measurement.key));
  431. let add_tag = |line: &mut String, key: &str, value: &str| {
  432. line.push_str(",");
  433. line.push_str(&escape_tag(key));
  434. line.push_str("=");
  435. line.push_str(&escape(value));
  436. };
  437. for &(key, value) in measurement.tags.iter() {
  438. add_tag(line, key, value);
  439. }
  440. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  441. if is_first { line.push_str(" "); } else { line.push_str(","); }
  442. line.push_str(&escape_tag(key));
  443. line.push_str("=");
  444. match *value {
  445. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  446. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  447. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  448. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  449. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  450. #[cfg(not(feature = "disable-short-uuid"))]
  451. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  452. #[cfg(feature = "disable-short-uuid")]
  453. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  454. };
  455. };
  456. let mut fields = measurement.fields.iter();
  457. // first time separate from tags with space
  458. //
  459. fields.next().map(|kv| {
  460. add_field(line, &kv.0, &kv.1, true);
  461. });
  462. // then seperate the rest w/ comma
  463. //
  464. for kv in fields {
  465. add_field(line, kv.0, &kv.1, false);
  466. }
  467. if let Some(t) = measurement.timestamp {
  468. line.push_str(" ");
  469. line.push_str(&t.to_string());
  470. }
  471. }
  472. #[cfg(feature = "warnings")]
  473. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  474. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  475. assert!(false);
  476. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  477. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  478. let _ = fs::create_dir("/tmp/mm");
  479. let ctx = zmq::Context::new();
  480. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  481. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  482. let client = Client::new();
  483. let mut buf = String::with_capacity(4096);
  484. let mut server_resp = String::with_capacity(4096);
  485. let mut count = 0;
  486. loop {
  487. if let Ok(bytes) = socket.recv_bytes(0) {
  488. if let Ok(msg) = String::from_utf8(bytes) {
  489. count = match count {
  490. 0 => {
  491. buf.push_str(&msg);
  492. 1
  493. }
  494. n @ 1...40 => {
  495. buf.push_str("\n");
  496. buf.push_str(&msg);
  497. n + 1
  498. }
  499. _ => {
  500. buf.push_str("\n");
  501. buf.push_str(&msg);
  502. match client.post(url.clone())
  503. .body(&buf)
  504. .send() {
  505. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  506. Ok(mut resp) => {
  507. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  508. let _ = warnings.send(
  509. Warning::Error(
  510. format!("Influx server: {}", server_resp)));
  511. server_resp.clear();
  512. }
  513. Err(why) => {
  514. let _ = warnings.send(
  515. Warning::Error(
  516. format!("Influx write error: {}", why)));
  517. }
  518. }
  519. buf.clear();
  520. 0
  521. }
  522. }
  523. }
  524. }
  525. }
  526. }).unwrap()
  527. }
  528. #[derive(Debug, Clone, PartialEq)]
  529. pub enum OwnedValue {
  530. String(String),
  531. Float(f64),
  532. Integer(i64),
  533. Boolean(bool),
  534. D128(d128),
  535. Uuid(Uuid),
  536. }
  537. /// Holds data meant for an influxdb measurement in transit to the
  538. /// writing thread.
  539. ///
  540. /// TODO: convert `Map` to `SmallVec`?
  541. ///
  542. #[derive(Clone, Debug)]
  543. pub struct OwnedMeasurement {
  544. pub key: &'static str,
  545. pub timestamp: Option<i64>,
  546. //pub fields: Map<&'static str, OwnedValue>,
  547. //pub tags: Map<&'static str, &'static str>,
  548. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  549. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  550. }
  551. impl OwnedMeasurement {
  552. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  553. OwnedMeasurement {
  554. key,
  555. timestamp: None,
  556. tags: SmallVec::with_capacity(n_tags),
  557. fields: SmallVec::with_capacity(n_fields),
  558. }
  559. }
  560. pub fn new(key: &'static str) -> Self {
  561. OwnedMeasurement {
  562. key,
  563. timestamp: None,
  564. tags: SmallVec::new(),
  565. fields: SmallVec::new(),
  566. }
  567. }
  568. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  569. self.tags.push((key, value));
  570. self
  571. }
  572. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  573. self.fields.push((key, value));
  574. self
  575. }
  576. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  577. self.timestamp = Some(timestamp);
  578. self
  579. }
  580. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  581. match self.tags.iter().position(|kv| kv.0 == key) {
  582. Some(i) => {
  583. self.tags.get_mut(i)
  584. .map(|x| {
  585. x.0 = value;
  586. });
  587. self
  588. }
  589. None => {
  590. self.add_tag(key, value)
  591. }
  592. }
  593. }
  594. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  595. self.fields.iter()
  596. .find(|kv| kv.0 == key)
  597. .map(|kv| &kv.1)
  598. }
  599. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  600. self.tags.iter()
  601. .find(|kv| kv.0 == key)
  602. .map(|kv| kv.1)
  603. }
  604. }
  605. #[allow(unused_imports, unused_variables)]
  606. #[cfg(test)]
  607. mod tests {
  608. use super::*;
  609. use test::{black_box, Bencher};
  610. #[test]
  611. fn it_uses_the_new_tag_k_only_shortcut() {
  612. let tag_value = "one";
  613. let color = "red";
  614. let time = now();
  615. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  616. assert_eq!(m.get_tag("color"), Some("red"));
  617. assert_eq!(m.get_tag("tag_value"), Some("one"));
  618. assert_eq!(m.timestamp, Some(time));
  619. }
  620. #[test]
  621. fn it_uses_measure_macro_parenthesis_syntax() {
  622. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  623. assert_eq!(m.key, "test");
  624. assert_eq!(m.get_tag("a"), Some("b"));
  625. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  626. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  627. assert_eq!(m.timestamp, Some(1));
  628. }
  629. #[test]
  630. fn it_uses_measure_macro_on_a_self_attribute() {
  631. struct A {
  632. pub influx: InfluxWriter,
  633. }
  634. impl A {
  635. fn f(&self) {
  636. measure!(self.influx, test, t(color, "red"), i(n, 1));
  637. }
  638. }
  639. let a = A { influx: InfluxWriter::default() };
  640. a.f();
  641. }
  642. #[test]
  643. fn it_clones_an_influx_writer_to_check_both_drop() {
  644. let influx = InfluxWriter::default();
  645. measure!(influx, drop_test, i(a, 1), i(b, 2));
  646. {
  647. let influx = influx.clone();
  648. thread::spawn(move || {
  649. measure!(influx, drop_test, i(a, 3), i(b, 4));
  650. });
  651. }
  652. }
  653. #[bench]
  654. fn influx_writer_send_basic(b: &mut Bencher) {
  655. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  656. b.iter(|| {
  657. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  658. });
  659. }
  660. #[bench]
  661. fn influx_writer_send_price(b: &mut Bencher) {
  662. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  663. b.iter(|| {
  664. measure!(m, test,
  665. t(ticker, t!(xmr-btc).as_str()),
  666. t(exchange, "plnx"),
  667. d(bid, d128::zero()),
  668. d(ask, d128::zero()),
  669. );
  670. });
  671. }
  672. #[test]
  673. fn it_checks_color_tag_error_in_non_doctest() {
  674. let (tx, rx) = channel();
  675. measure!(tx, test, tag[color;"red"], int[n;1]);
  676. let meas: OwnedMeasurement = rx.recv().unwrap();
  677. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  678. }
  679. #[test]
  680. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  681. let meas = measure!(@make_meas test_measurement,
  682. tag [ one => "a" ],
  683. tag [ two => "b" ],
  684. int [ three => 2 ],
  685. float [ four => 1.2345 ],
  686. string [ five => String::from("d") ],
  687. bool [ six => true ],
  688. int [ seven => { 1 + 2 } ],
  689. time [ 1 ]
  690. );
  691. assert_eq!(meas.key, "test_measurement");
  692. assert_eq!(meas.get_tag("one"), Some("a"));
  693. assert_eq!(meas.get_tag("two"), Some("b"));
  694. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  695. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  696. assert_eq!(meas.timestamp, Some(1));
  697. }
  698. #[test]
  699. fn it_uses_the_measure_macro() {
  700. let (tx, rx) = channel();
  701. measure!(tx, test_measurement,
  702. tag [ one => "a" ],
  703. tag [ two => "b" ],
  704. int [ three => 2 ],
  705. float [ four => 1.2345 ],
  706. string [ five => String::from("d") ],
  707. bool [ six => true ],
  708. int [ seven => { 1 + 2 } ],
  709. time [ 1 ]
  710. );
  711. thread::sleep(Duration::from_millis(10));
  712. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  713. assert_eq!(meas.key, "test_measurement");
  714. assert_eq!(meas.get_tag("one"), Some("a"));
  715. assert_eq!(meas.get_tag("two"), Some("b"));
  716. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  717. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  718. assert_eq!(meas.timestamp, Some(1));
  719. }
  720. #[test]
  721. fn it_uses_measure_macro_for_d128_and_uuid() {
  722. let (tx, rx) = channel();
  723. let u = Uuid::new_v4();
  724. let d = d128::zero();
  725. let t = now();
  726. measure!(tx, test_measurement,
  727. tag[one; "a"],
  728. d128[two; d],
  729. uuid[three; u],
  730. time[t]
  731. );
  732. thread::sleep(Duration::from_millis(10));
  733. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  734. assert_eq!(meas.key, "test_measurement");
  735. assert_eq!(meas.get_tag("one"), Some("a"));
  736. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  737. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  738. assert_eq!(meas.timestamp, Some(t));
  739. }
  740. #[test]
  741. fn it_uses_the_measure_macro_alt_syntax() {
  742. let (tx, rx) = channel();
  743. measure!(tx, test_measurement,
  744. tag[one; "a"],
  745. tag[two; "b"],
  746. int[three; 2],
  747. float[four; 1.2345],
  748. string[five; String::from("d")],
  749. bool [ six => true ],
  750. int[seven; { 1 + 2 }],
  751. time[1]
  752. );
  753. thread::sleep(Duration::from_millis(10));
  754. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  755. assert_eq!(meas.key, "test_measurement");
  756. assert_eq!(meas.get_tag("one"), Some("a"));
  757. assert_eq!(meas.get_tag("two"), Some("b"));
  758. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  759. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  760. assert_eq!(meas.timestamp, Some(1));
  761. }
  762. #[test]
  763. fn it_checks_that_fields_are_separated_correctly() {
  764. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  765. assert_eq!(m.key, "test");
  766. assert_eq!(m.get_tag("a"), Some("one"));
  767. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  768. let mut buf = String::new();
  769. serialize_owned(&m, &mut buf);
  770. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  771. }
  772. #[test]
  773. fn try_to_break_measure_macro() {
  774. let (tx, _) = channel();
  775. measure!(tx, one, tag[x=>"y"], int[n;1]);
  776. measure!(tx, one, tag[x;"y"], int[n;1],);
  777. struct A {
  778. pub one: i32,
  779. pub two: i32,
  780. }
  781. struct B {
  782. pub a: A
  783. }
  784. let b = B { a: A { one: 1, two: 2 } };
  785. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  786. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  787. }
  788. #[bench]
  789. fn measure_macro_small(b: &mut Bencher) {
  790. let (tx, rx) = channel();
  791. let listener = thread::spawn(move || {
  792. loop { if rx.recv().is_err() { break } }
  793. });
  794. b.iter(|| {
  795. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  796. });
  797. }
  798. #[bench]
  799. fn measure_macro_medium(b: &mut Bencher) {
  800. let (tx, rx) = channel();
  801. let listener = thread::spawn(move || {
  802. loop { if rx.recv().is_err() { break } }
  803. });
  804. b.iter(|| {
  805. measure!(tx, test,
  806. tag[color; "red"],
  807. tag[mood => "playful"],
  808. tag [ ticker => "xmr_btc" ],
  809. float[ price => 1.2345 ],
  810. float[ amount => 56.323],
  811. int[n; 1],
  812. time[now()]
  813. );
  814. });
  815. }
  816. #[cfg(feature = "warnings")]
  817. #[test]
  818. #[ignore]
  819. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  820. let ctx = zmq::Context::new();
  821. let socket = push(&ctx).unwrap();
  822. let (tx, rx) = channel();
  823. let w = writer(tx.clone());
  824. let mut buf = String::with_capacity(4096);
  825. let mut meas = Measurement::new("rust_test");
  826. meas.add_tag("a", "t");
  827. meas.add_field("c", Value::Float(1.23456));
  828. let now = now();
  829. meas.set_timestamp(now);
  830. serialize(&meas, &mut buf);
  831. socket.send_str(&buf, 0).unwrap();
  832. drop(w);
  833. }
  834. #[test]
  835. fn it_serializes_a_measurement_in_place() {
  836. let mut buf = String::with_capacity(4096);
  837. let mut meas = Measurement::new("rust_test");
  838. meas.add_tag("a", "b");
  839. meas.add_field("c", Value::Float(1.0));
  840. let now = now();
  841. meas.set_timestamp(now);
  842. serialize(&meas, &mut buf);
  843. let ans = format!("rust_test,a=b c=1 {}", now);
  844. assert_eq!(buf, ans);
  845. }
  846. #[test]
  847. fn it_serializes_a_hard_to_serialize_message() {
  848. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  849. let mut buf = String::new();
  850. let mut server_resp = String::new();
  851. let mut m = Measurement::new("rust_test");
  852. m.add_field("s", Value::String(&raw));
  853. let now = now();
  854. m.set_timestamp(now);
  855. serialize(&m, &mut buf);
  856. println!("{}", buf);
  857. buf.push_str("\n");
  858. let buf_copy = buf.clone();
  859. buf.push_str(&buf_copy);
  860. println!("{}", buf);
  861. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  862. let client = Client::new();
  863. match client.post(url.clone())
  864. .body(&buf)
  865. .send() {
  866. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  867. Ok(mut resp) => {
  868. resp.read_to_string(&mut server_resp).unwrap();
  869. panic!("{}", server_resp);
  870. }
  871. Err(why) => {
  872. panic!(why)
  873. }
  874. }
  875. }
  876. #[bench]
  877. fn serialize_owned_longer(b: &mut Bencher) {
  878. let mut buf = String::with_capacity(1024);
  879. let m =
  880. OwnedMeasurement::new("test")
  881. .add_tag("one", "a")
  882. .add_tag("two", "b")
  883. .add_tag("ticker", "xmr_btc")
  884. .add_tag("exchange", "plnx")
  885. .add_tag("side", "bid")
  886. .add_field("three", OwnedValue::Float(1.2345))
  887. .add_field("four", OwnedValue::Integer(57))
  888. .add_field("five", OwnedValue::Boolean(true))
  889. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  890. .set_timestamp(now());
  891. b.iter(|| {
  892. serialize_owned(&m, &mut buf);
  893. buf.clear()
  894. });
  895. }
  896. #[bench]
  897. fn serialize_owned_simple(b: &mut Bencher) {
  898. let mut buf = String::with_capacity(1024);
  899. let m =
  900. OwnedMeasurement::new("test")
  901. .add_tag("one", "a")
  902. .add_tag("two", "b")
  903. .add_field("three", OwnedValue::Float(1.2345))
  904. .add_field("four", OwnedValue::Integer(57))
  905. .set_timestamp(now());
  906. b.iter(|| {
  907. serialize_owned(&m, &mut buf);
  908. buf.clear()
  909. });
  910. }
  911. #[test]
  912. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  913. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  914. let mut buf = String::new();
  915. let mut server_resp = String::new();
  916. let m = OwnedMeasurement::new("rust_test")
  917. .add_field("s", OwnedValue::String(raw.to_string()))
  918. .set_timestamp(now());
  919. serialize_owned(&m, &mut buf);
  920. println!("{}", buf);
  921. buf.push_str("\n");
  922. let buf_copy = buf.clone();
  923. buf.push_str(&buf_copy);
  924. println!("{}", buf);
  925. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  926. let client = Client::new();
  927. match client.post(url.clone())
  928. .body(&buf)
  929. .send() {
  930. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  931. Ok(mut resp) => {
  932. resp.read_to_string(&mut server_resp).unwrap();
  933. panic!("{}", server_resp);
  934. }
  935. Err(why) => {
  936. panic!(why)
  937. }
  938. }
  939. }
  940. }