You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1104 lines
37KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::io::Read;
  4. use std::sync::Arc;
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. #[cfg(feature = "warnings")]
  8. use std::fs;
  9. use std::time::Duration;
  10. use std::hash::BuildHasherDefault;
  11. use hyper::status::StatusCode;
  12. use hyper::client::response::Response;
  13. use hyper::Url;
  14. use hyper::client::Client;
  15. use influent::measurement::{Measurement, Value};
  16. use zmq;
  17. #[allow(unused_imports)]
  18. use chrono::{DateTime, Utc};
  19. use ordermap::OrderMap;
  20. use fnv::FnvHasher;
  21. use decimal::d128;
  22. use uuid::Uuid;
  23. use smallvec::SmallVec;
  24. use slog::Logger;
  25. use super::{nanos, file_logger, LOG_LEVEL};
  26. #[cfg(feature = "warnings")]
  27. use warnings::Warning;
  28. pub use super::{dur_nanos, dt_nanos};
  29. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  30. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  31. Map::with_capacity_and_hasher(capacity, Default::default())
  32. }
  33. /// Created this so I know what types can be passed through the
  34. /// `measure!` macro, which used to convert with `as i64` and
  35. /// `as f64` until I accidentally passed a function name, and it
  36. /// still compiled, but with garbage numbers.
  37. pub trait AsI64 {
  38. fn as_i64(x: Self) -> i64;
  39. }
  40. impl AsI64 for i64 { fn as_i64(x: Self) -> i64 { x } }
  41. impl AsI64 for i32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  42. impl AsI64 for u32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  43. impl AsI64 for u64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  44. impl AsI64 for usize { fn as_i64(x: Self) -> i64 { x as i64 } }
  45. impl AsI64 for f64 { fn as_i64(x: Self) -> i64 { x as i64 } }
  46. impl AsI64 for f32 { fn as_i64(x: Self) -> i64 { x as i64 } }
  47. impl AsI64 for u16 { fn as_i64(x: Self) -> i64 { x as i64 } }
  48. /// Created this so I know what types can be passed through the
  49. /// `measure!` macro, which used to convert with `as i64` and
  50. /// `as f64` until I accidentally passed a function name, and it
  51. /// still compiled, but with garbage numbers.
  52. pub trait AsF64 {
  53. fn as_f64(x: Self) -> f64;
  54. }
  55. impl AsF64 for f64 { fn as_f64(x: Self) -> f64 { x } }
  56. impl AsF64 for i64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  57. impl AsF64 for i32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  58. impl AsF64 for u32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  59. impl AsF64 for u64 { fn as_f64(x: Self) -> f64 { x as f64 } }
  60. impl AsF64 for usize { fn as_f64(x: Self) -> f64 { x as f64 } }
  61. impl AsF64 for f32 { fn as_f64(x: Self) -> f64 { x as f64 } }
  62. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  63. ///
  64. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  65. /// values, as well as sends it with the `Sender`.
  66. ///
  67. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  68. /// measurement (see `tests` mod).
  69. ///
  70. /// # Examples
  71. ///
  72. /// ```
  73. /// #![feature(try_from)]
  74. /// #[macro_use] extern crate logging;
  75. /// extern crate decimal;
  76. ///
  77. /// use std::sync::mpsc::channel;
  78. /// use decimal::d128;
  79. /// use logging::influx::*;
  80. ///
  81. /// fn main() {
  82. /// let (tx, rx) = channel();
  83. ///
  84. /// // "shorthand" syntax
  85. ///
  86. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  87. ///
  88. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  89. ///
  90. /// assert_eq!(meas.key, "test");
  91. /// assert_eq!(meas.get_tag("color"), Some("red"));
  92. /// assert_eq!(meas.get_field("n"), Some(&OwnedValue::Integer(1)));
  93. ///
  94. /// // alternate syntax ...
  95. ///
  96. /// measure!(tx, test,
  97. /// tag [ one => "a" ],
  98. /// tag [ two => "b" ],
  99. /// int [ three => 2 ],
  100. /// float [ four => 1.2345 ],
  101. /// string [ five => String::from("d") ],
  102. /// bool [ six => true ],
  103. /// int [ seven => { 1 + 2 } ],
  104. /// time [ 1 ]
  105. /// );
  106. ///
  107. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  108. ///
  109. /// assert_eq!(meas.key, "test");
  110. /// assert_eq!(meas.get_tag("one"), Some("a"));
  111. /// assert_eq!(meas.get_tag("two"), Some("b"));
  112. /// assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  113. /// assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  114. /// assert_eq!(meas.timestamp, Some(1));
  115. ///
  116. /// // use the @make_meas flag to skip sending a measurement, instead merely
  117. /// // creating it.
  118. ///
  119. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  120. ///
  121. /// // each variant also has shorthand aliases
  122. ///
  123. /// let meas: OwnedMeasurement =
  124. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  125. /// }
  126. /// ```
  127. ///
  128. #[macro_export]
  129. macro_rules! measure {
  130. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  131. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  132. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  133. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  134. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($tm)) };
  135. (@kv utc, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp(AsI64::as_i64($crate::nanos($tm))) };
  136. (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) };
  137. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  138. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  139. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  140. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  141. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) };
  142. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  143. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) };
  144. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  145. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  146. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  147. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  148. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  149. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  150. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  151. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean(bool::from($v))) };
  152. (@as_expr $e:expr) => {$e};
  153. (@count_tags) => {0usize};
  154. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  155. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  156. (@count_fields) => {0usize};
  157. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  158. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  159. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  160. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  161. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  162. };
  163. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  164. let n_tags = measure!(@count_tags $($t)*);
  165. let n_fields = measure!(@count_fields $($t)*);
  166. let mut meas =
  167. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  168. $(
  169. measure!(@kv $t, meas, $($tail)*);
  170. )*
  171. meas
  172. }};
  173. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  174. measure!($m, $name, $($t [ $($tail)* ] ),+)
  175. };
  176. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  177. #[allow(unused_imports)]
  178. use $crate::influx::{AsI64, AsF64};
  179. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  180. let _ = $m.send(measurement);
  181. }};
  182. }
  183. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  184. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  185. /// measurements have accumulated.
  186. ///
  187. #[derive(Debug)]
  188. pub struct InfluxWriter {
  189. host: String,
  190. db: String,
  191. tx: Sender<Option<OwnedMeasurement>>,
  192. thread: Option<Arc<thread::JoinHandle<()>>>,
  193. }
  194. impl Default for InfluxWriter {
  195. fn default() -> Self {
  196. //if cfg!(any(test, feature = "test")) {
  197. // InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 0)
  198. //} else {
  199. InfluxWriter::new("localhost", "test", "/home/jstrong/src/logging/var/log/influx-test.log", 4000)
  200. //}
  201. }
  202. }
  203. impl Clone for InfluxWriter {
  204. fn clone(&self) -> Self {
  205. debug_assert!(self.thread.is_some());
  206. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  207. InfluxWriter {
  208. host: self.host.to_string(),
  209. db: self.db.to_string(),
  210. tx: self.tx.clone(),
  211. thread,
  212. }
  213. }
  214. }
  215. impl InfluxWriter {
  216. /// Sends the `OwnedMeasurement` to the serialization thread.
  217. ///
  218. #[cfg_attr(feature = "inlines", inline)]
  219. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
  220. self.tx.send(Some(m))
  221. }
  222. #[cfg_attr(feature = "inlines", inline)]
  223. pub fn nanos(&self, d: DateTime<Utc>) -> i64 { nanos(d) as i64 }
  224. #[cfg_attr(feature = "inlines", inline)]
  225. pub fn dur_nanos(&self, d: Duration) -> i64 { dur_nanos(d) as i64 }
  226. #[cfg_attr(feature = "inlines", inline)]
  227. pub fn dur_nanos_u64(&self, d: Duration) -> u64 { dur_nanos(d).max(0) as u64 }
  228. pub fn tx(&self) -> Sender<Option<OwnedMeasurement>> {
  229. self.tx.clone()
  230. }
  231. pub fn new(host: &str, db: &str, log_path: &str, buffer_size: u16) -> Self {
  232. let logger = file_logger(log_path, LOG_LEVEL); // this needs to be outside the thread
  233. Self::with_logger(host, db, buffer_size, logger)
  234. }
  235. #[allow(unused_assignments)]
  236. pub fn with_logger(host: &str, db: &str, buffer_size: u16, logger: Logger) -> Self {
  237. let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();
  238. #[cfg(feature = "no-influx-buffer")]
  239. let buffer_size = 0u16;
  240. debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);
  241. let url =
  242. Url::parse_with_params(&format!("http://{}:8086/write", host),
  243. &[("db", db), ("precision", "ns")])
  244. .expect("influx writer url should parse");
  245. let thread = thread::Builder::new().name(format!("mm:inflx:{}", db)).spawn(move || {
  246. let client = Client::new();
  247. debug!(logger, "initializing buffers");
  248. let mut buf = String::with_capacity(32 * 32 * 32);
  249. let mut count = 0;
  250. let send = |buf: &str| {
  251. let resp = client.post(url.clone())
  252. .body(buf)
  253. .send();
  254. match resp {
  255. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  256. debug!(logger, "server responded ok: 204 NoContent");
  257. }
  258. Ok(mut resp) => {
  259. let mut server_resp = String::with_capacity(1024);
  260. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  261. error!(logger, "influx server error";
  262. "status" => resp.status.to_string(),
  263. "body" => server_resp);
  264. }
  265. Err(why) => {
  266. error!(logger, "http request failed: {:?}", why);
  267. }
  268. }
  269. };
  270. let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
  271. match prev {
  272. 0 if buffer_size > 0 => {
  273. serialize_owned(m, buf);
  274. 1
  275. }
  276. n if n < buffer_size => {
  277. buf.push_str("\n");
  278. serialize_owned(m, buf);
  279. n + 1
  280. }
  281. n => {
  282. buf.push_str("\n");
  283. serialize_owned(m, buf);
  284. debug!(logger, "sending buffer to influx"; "len" => n);
  285. send(buf);
  286. buf.clear();
  287. 0
  288. }
  289. }
  290. };
  291. loop {
  292. match rx.recv() {
  293. Ok(Some(mut meas)) => {
  294. if meas.timestamp.is_none() { meas.timestamp = Some(now()) }
  295. //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }
  296. count = next(count, &meas, &mut buf);
  297. }
  298. Ok(None) => {
  299. if buf.len() > 0 {
  300. debug!(logger, "sending buffer to influx"; "len" => count);
  301. send(&buf)
  302. }
  303. break
  304. }
  305. _ => {
  306. thread::sleep(Duration::new(0, 1))
  307. }
  308. }
  309. }
  310. }).unwrap();
  311. InfluxWriter {
  312. host: host.to_string(),
  313. db: db.to_string(),
  314. tx,
  315. thread: Some(Arc::new(thread))
  316. }
  317. }
  318. }
  319. impl Drop for InfluxWriter {
  320. fn drop(&mut self) {
  321. if let Some(arc) = self.thread.take() {
  322. if let Ok(thread) = Arc::try_unwrap(arc) {
  323. let _ = self.tx.send(None);
  324. let _ = thread.join();
  325. }
  326. }
  327. }
  328. }
  329. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  330. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  331. let socket = ctx.socket(zmq::PULL)?;
  332. socket.bind(WRITER_ADDR)?;
  333. socket.set_rcvhwm(0)?;
  334. Ok(socket)
  335. }
  336. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  337. let socket = ctx.socket(zmq::PUSH)?;
  338. socket.connect(WRITER_ADDR)?;
  339. socket.set_sndhwm(0)?;
  340. Ok(socket)
  341. }
  342. /// This removes offending things rather than escaping them.
  343. ///
  344. fn escape_tag(s: &str) -> String {
  345. s.replace(" ", "")
  346. .replace(",", "")
  347. .replace("\"", "")
  348. }
  349. fn escape(s: &str) -> String {
  350. s.replace(" ", "\\ ")
  351. .replace(",", "\\,")
  352. }
  353. fn as_string(s: &str) -> String {
  354. // the second replace removes double escapes
  355. //
  356. format!("\"{}\"", s.replace("\"", "\\\"")
  357. .replace(r#"\\""#, r#"\""#))
  358. }
  359. #[test]
  360. fn it_checks_as_string_does_not_double_escape() {
  361. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  362. let escaped = as_string(&raw);
  363. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  364. }
  365. fn as_integer(i: &i64) -> String {
  366. format!("{}i", i)
  367. }
  368. fn as_float(f: &f64) -> String {
  369. f.to_string()
  370. }
  371. fn as_boolean(b: &bool) -> &str {
  372. if *b { "t" } else { "f" }
  373. }
  374. pub fn now() -> i64 {
  375. nanos(Utc::now()) as i64
  376. }
  377. /// Serialize the measurement into influx line protocol
  378. /// and append to the buffer.
  379. ///
  380. /// # Examples
  381. ///
  382. /// ```
  383. /// extern crate influent;
  384. /// extern crate logging;
  385. ///
  386. /// use influent::measurement::{Measurement, Value};
  387. /// use std::string::String;
  388. /// use logging::influx::serialize;
  389. ///
  390. /// fn main() {
  391. /// let mut buf = String::new();
  392. /// let mut m = Measurement::new("test");
  393. /// m.add_field("x", Value::Integer(1));
  394. /// serialize(&m, &mut buf);
  395. /// }
  396. ///
  397. /// ```
  398. ///
  399. pub fn serialize(measurement: &Measurement, line: &mut String) {
  400. line.push_str(&escape(measurement.key));
  401. for (tag, value) in measurement.tags.iter() {
  402. line.push_str(",");
  403. line.push_str(&escape(tag));
  404. line.push_str("=");
  405. line.push_str(&escape(value));
  406. }
  407. let mut was_spaced = false;
  408. for (field, value) in measurement.fields.iter() {
  409. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  410. line.push_str(&escape(field));
  411. line.push_str("=");
  412. match value {
  413. &Value::String(ref s) => line.push_str(&as_string(s)),
  414. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  415. &Value::Float(ref f) => line.push_str(&as_float(f)),
  416. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  417. };
  418. }
  419. match measurement.timestamp {
  420. Some(t) => {
  421. line.push_str(" ");
  422. line.push_str(&t.to_string());
  423. }
  424. _ => {}
  425. }
  426. }
  427. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  428. ///
  429. /// The serialized measurement is appended to the end of the string without
  430. /// any regard for what exited in it previously.
  431. ///
  432. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  433. line.push_str(&escape_tag(measurement.key));
  434. let add_tag = |line: &mut String, key: &str, value: &str| {
  435. line.push_str(",");
  436. line.push_str(&escape_tag(key));
  437. line.push_str("=");
  438. line.push_str(&escape(value));
  439. };
  440. for &(key, value) in measurement.tags.iter() {
  441. add_tag(line, key, value);
  442. }
  443. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  444. if is_first { line.push_str(" "); } else { line.push_str(","); }
  445. line.push_str(&escape_tag(key));
  446. line.push_str("=");
  447. match *value {
  448. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  449. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  450. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  451. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  452. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  453. #[cfg(not(feature = "disable-short-uuid"))]
  454. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  455. #[cfg(feature = "disable-short-uuid")]
  456. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)),
  457. };
  458. };
  459. let mut fields = measurement.fields.iter();
  460. // first time separate from tags with space
  461. //
  462. fields.next().map(|kv| {
  463. add_field(line, &kv.0, &kv.1, true);
  464. });
  465. // then seperate the rest w/ comma
  466. //
  467. for kv in fields {
  468. add_field(line, kv.0, &kv.1, false);
  469. }
  470. if let Some(t) = measurement.timestamp {
  471. line.push_str(" ");
  472. line.push_str(&t.to_string());
  473. }
  474. }
  475. #[cfg(feature = "warnings")]
  476. #[deprecated(since="0.4", note="Replace with InfluxWriter")]
  477. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  478. assert!(false);
  479. thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || {
  480. const DB_HOST: &'static str = "http://127.0.0.1:8086/write";
  481. let _ = fs::create_dir("/tmp/mm");
  482. let ctx = zmq::Context::new();
  483. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  484. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  485. let client = Client::new();
  486. let mut buf = String::with_capacity(4096);
  487. let mut server_resp = String::with_capacity(4096);
  488. let mut count = 0;
  489. loop {
  490. if let Ok(bytes) = socket.recv_bytes(0) {
  491. if let Ok(msg) = String::from_utf8(bytes) {
  492. count = match count {
  493. 0 => {
  494. buf.push_str(&msg);
  495. 1
  496. }
  497. n @ 1...40 => {
  498. buf.push_str("\n");
  499. buf.push_str(&msg);
  500. n + 1
  501. }
  502. _ => {
  503. buf.push_str("\n");
  504. buf.push_str(&msg);
  505. match client.post(url.clone())
  506. .body(&buf)
  507. .send() {
  508. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  509. Ok(mut resp) => {
  510. let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  511. let _ = warnings.send(
  512. Warning::Error(
  513. format!("Influx server: {}", server_resp)));
  514. server_resp.clear();
  515. }
  516. Err(why) => {
  517. let _ = warnings.send(
  518. Warning::Error(
  519. format!("Influx write error: {}", why)));
  520. }
  521. }
  522. buf.clear();
  523. 0
  524. }
  525. }
  526. }
  527. }
  528. }
  529. }).unwrap()
  530. }
  531. #[derive(Debug, Clone, PartialEq)]
  532. pub enum OwnedValue {
  533. String(String),
  534. Float(f64),
  535. Integer(i64),
  536. Boolean(bool),
  537. D128(d128),
  538. Uuid(Uuid),
  539. }
  540. /// Holds data meant for an influxdb measurement in transit to the
  541. /// writing thread.
  542. ///
  543. /// TODO: convert `Map` to `SmallVec`?
  544. ///
  545. #[derive(Clone, Debug)]
  546. pub struct OwnedMeasurement {
  547. pub key: &'static str,
  548. pub timestamp: Option<i64>,
  549. //pub fields: Map<&'static str, OwnedValue>,
  550. //pub tags: Map<&'static str, &'static str>,
  551. pub fields: SmallVec<[(&'static str, OwnedValue); 8]>,
  552. pub tags: SmallVec<[(&'static str, &'static str); 8]>,
  553. }
  554. impl OwnedMeasurement {
  555. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  556. OwnedMeasurement {
  557. key,
  558. timestamp: None,
  559. tags: SmallVec::with_capacity(n_tags),
  560. fields: SmallVec::with_capacity(n_fields),
  561. }
  562. }
  563. pub fn new(key: &'static str) -> Self {
  564. OwnedMeasurement {
  565. key,
  566. timestamp: None,
  567. tags: SmallVec::new(),
  568. fields: SmallVec::new(),
  569. }
  570. }
  571. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  572. self.tags.push((key, value));
  573. self
  574. }
  575. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  576. self.fields.push((key, value));
  577. self
  578. }
  579. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  580. self.timestamp = Some(timestamp);
  581. self
  582. }
  583. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  584. match self.tags.iter().position(|kv| kv.0 == key) {
  585. Some(i) => {
  586. self.tags.get_mut(i)
  587. .map(|x| {
  588. x.0 = value;
  589. });
  590. self
  591. }
  592. None => {
  593. self.add_tag(key, value)
  594. }
  595. }
  596. }
  597. pub fn get_field(&self, key: &'static str) -> Option<&OwnedValue> {
  598. self.fields.iter()
  599. .find(|kv| kv.0 == key)
  600. .map(|kv| &kv.1)
  601. }
  602. pub fn get_tag(&self, key: &'static str) -> Option<&'static str> {
  603. self.tags.iter()
  604. .find(|kv| kv.0 == key)
  605. .map(|kv| kv.1)
  606. }
  607. }
  608. #[allow(unused_imports, unused_variables)]
  609. #[cfg(test)]
  610. mod tests {
  611. use super::*;
  612. use test::{black_box, Bencher};
  613. #[test]
  614. fn it_uses_the_utc_shortcut_to_convert_a_datetime_utc() {
  615. const VERSION: &str = "0.3.90";
  616. let tag_value = "one";
  617. let color = "red";
  618. let time = Utc::now();
  619. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), utc(time));
  620. assert_eq!(m.get_tag("color"), Some("red"));
  621. assert_eq!(m.get_tag("version"), Some(VERSION));
  622. assert_eq!(m.timestamp, Some(nanos(time) as i64));
  623. }
  624. #[test]
  625. fn it_uses_the_v_for_version_shortcut() {
  626. const VERSION: &str = "0.3.90";
  627. let tag_value = "one";
  628. let color = "red";
  629. let time = now();
  630. let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
  631. assert_eq!(m.get_tag("color"), Some("red"));
  632. assert_eq!(m.get_tag("version"), Some(VERSION));
  633. assert_eq!(m.timestamp, Some(time));
  634. }
  635. #[test]
  636. fn it_uses_the_new_tag_k_only_shortcut() {
  637. let tag_value = "one";
  638. let color = "red";
  639. let time = now();
  640. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  641. assert_eq!(m.get_tag("color"), Some("red"));
  642. assert_eq!(m.get_tag("tag_value"), Some("one"));
  643. assert_eq!(m.timestamp, Some(time));
  644. }
  645. #[test]
  646. fn it_uses_measure_macro_parenthesis_syntax() {
  647. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  648. assert_eq!(m.key, "test");
  649. assert_eq!(m.get_tag("a"), Some("b"));
  650. assert_eq!(m.get_field("n"), Some(&OwnedValue::Integer(1)));
  651. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  652. assert_eq!(m.timestamp, Some(1));
  653. }
  654. #[test]
  655. fn it_uses_measure_macro_on_a_self_attribute() {
  656. struct A {
  657. pub influx: InfluxWriter,
  658. }
  659. impl A {
  660. fn f(&self) {
  661. measure!(self.influx, test, t(color, "red"), i(n, 1));
  662. }
  663. }
  664. let a = A { influx: InfluxWriter::default() };
  665. a.f();
  666. }
  667. #[test]
  668. fn it_clones_an_influx_writer_to_check_both_drop() {
  669. let influx = InfluxWriter::default();
  670. measure!(influx, drop_test, i(a, 1), i(b, 2));
  671. {
  672. let influx = influx.clone();
  673. thread::spawn(move || {
  674. measure!(influx, drop_test, i(a, 3), i(b, 4));
  675. });
  676. }
  677. }
  678. #[bench]
  679. fn influx_writer_send_basic(b: &mut Bencher) {
  680. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  681. b.iter(|| {
  682. measure!(m, test, tag[color; "red"], int[n; 1]); //, float[p; 1.234]);
  683. });
  684. }
  685. #[bench]
  686. fn influx_writer_send_price(b: &mut Bencher) {
  687. let m = InfluxWriter::new("localhost", "test", "var/log/influx-test.log", 4000);
  688. b.iter(|| {
  689. measure!(m, test,
  690. t(ticker, t!(xmr-btc).as_str()),
  691. t(exchange, "plnx"),
  692. d(bid, d128::zero()),
  693. d(ask, d128::zero()),
  694. );
  695. });
  696. }
  697. #[test]
  698. fn it_checks_color_tag_error_in_non_doctest() {
  699. let (tx, rx) = channel();
  700. measure!(tx, test, tag[color;"red"], int[n;1]);
  701. let meas: OwnedMeasurement = rx.recv().unwrap();
  702. assert_eq!(meas.get_tag("color"), Some("red"), "meas = \n {:?} \n", meas);
  703. }
  704. #[test]
  705. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  706. let meas = measure!(@make_meas test_measurement,
  707. tag [ one => "a" ],
  708. tag [ two => "b" ],
  709. int [ three => 2 ],
  710. float [ four => 1.2345 ],
  711. string [ five => String::from("d") ],
  712. bool [ six => true ],
  713. int [ seven => { 1 + 2 } ],
  714. time [ 1 ]
  715. );
  716. assert_eq!(meas.key, "test_measurement");
  717. assert_eq!(meas.get_tag("one"), Some("a"));
  718. assert_eq!(meas.get_tag("two"), Some("b"));
  719. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  720. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  721. assert_eq!(meas.timestamp, Some(1));
  722. }
  723. #[test]
  724. fn it_uses_the_measure_macro() {
  725. let (tx, rx) = channel();
  726. measure!(tx, test_measurement,
  727. tag [ one => "a" ],
  728. tag [ two => "b" ],
  729. int [ three => 2 ],
  730. float [ four => 1.2345 ],
  731. string [ five => String::from("d") ],
  732. bool [ six => true ],
  733. int [ seven => { 1 + 2 } ],
  734. time [ 1 ]
  735. );
  736. thread::sleep(Duration::from_millis(10));
  737. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  738. assert_eq!(meas.key, "test_measurement");
  739. assert_eq!(meas.get_tag("one"), Some("a"));
  740. assert_eq!(meas.get_tag("two"), Some("b"));
  741. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  742. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  743. assert_eq!(meas.timestamp, Some(1));
  744. }
  745. #[test]
  746. fn it_uses_measure_macro_for_d128_and_uuid() {
  747. let (tx, rx) = channel();
  748. let u = Uuid::new_v4();
  749. let d = d128::zero();
  750. let t = now();
  751. measure!(tx, test_measurement,
  752. tag[one; "a"],
  753. d128[two; d],
  754. uuid[three; u],
  755. time[t]
  756. );
  757. thread::sleep(Duration::from_millis(10));
  758. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  759. assert_eq!(meas.key, "test_measurement");
  760. assert_eq!(meas.get_tag("one"), Some("a"));
  761. assert_eq!(meas.get_field("two"), Some(&OwnedValue::D128(d128::zero())));
  762. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Uuid(u)));
  763. assert_eq!(meas.timestamp, Some(t));
  764. }
  765. #[test]
  766. fn it_uses_the_measure_macro_alt_syntax() {
  767. let (tx, rx) = channel();
  768. measure!(tx, test_measurement,
  769. tag[one; "a"],
  770. tag[two; "b"],
  771. int[three; 2],
  772. float[four; 1.2345],
  773. string[five; String::from("d")],
  774. bool [ six => true ],
  775. int[seven; { 1 + 2 }],
  776. time[1]
  777. );
  778. thread::sleep(Duration::from_millis(10));
  779. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  780. assert_eq!(meas.key, "test_measurement");
  781. assert_eq!(meas.get_tag("one"), Some("a"));
  782. assert_eq!(meas.get_tag("two"), Some("b"));
  783. assert_eq!(meas.get_field("three"), Some(&OwnedValue::Integer(2)));
  784. assert_eq!(meas.get_field("seven"), Some(&OwnedValue::Integer(3)));
  785. assert_eq!(meas.timestamp, Some(1));
  786. }
  787. #[test]
  788. fn it_checks_that_fields_are_separated_correctly() {
  789. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  790. assert_eq!(m.key, "test");
  791. assert_eq!(m.get_tag("a"), Some("one"));
  792. assert_eq!(m.get_field("x"), Some(&OwnedValue::Float(1.1)));
  793. let mut buf = String::new();
  794. serialize_owned(&m, &mut buf);
  795. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  796. }
  797. #[test]
  798. fn try_to_break_measure_macro() {
  799. let (tx, _) = channel();
  800. measure!(tx, one, tag[x=>"y"], int[n;1]);
  801. measure!(tx, one, tag[x;"y"], int[n;1],);
  802. struct A {
  803. pub one: i32,
  804. pub two: i32,
  805. }
  806. struct B {
  807. pub a: A
  808. }
  809. let b = B { a: A { one: 1, two: 2 } };
  810. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  811. assert_eq!(m.get_field("a"), Some(&OwnedValue::Integer(1)));
  812. }
  813. #[bench]
  814. fn measure_macro_small(b: &mut Bencher) {
  815. let (tx, rx) = channel();
  816. let listener = thread::spawn(move || {
  817. loop { if rx.recv().is_err() { break } }
  818. });
  819. b.iter(|| {
  820. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  821. });
  822. }
  823. #[bench]
  824. fn measure_macro_medium(b: &mut Bencher) {
  825. let (tx, rx) = channel();
  826. let listener = thread::spawn(move || {
  827. loop { if rx.recv().is_err() { break } }
  828. });
  829. b.iter(|| {
  830. measure!(tx, test,
  831. tag[color; "red"],
  832. tag[mood => "playful"],
  833. tag [ ticker => "xmr_btc" ],
  834. float[ price => 1.2345 ],
  835. float[ amount => 56.323],
  836. int[n; 1],
  837. time[now()]
  838. );
  839. });
  840. }
  841. #[cfg(feature = "warnings")]
  842. #[test]
  843. #[ignore]
  844. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  845. let ctx = zmq::Context::new();
  846. let socket = push(&ctx).unwrap();
  847. let (tx, rx) = channel();
  848. let w = writer(tx.clone());
  849. let mut buf = String::with_capacity(4096);
  850. let mut meas = Measurement::new("rust_test");
  851. meas.add_tag("a", "t");
  852. meas.add_field("c", Value::Float(1.23456));
  853. let now = now();
  854. meas.set_timestamp(now);
  855. serialize(&meas, &mut buf);
  856. socket.send_str(&buf, 0).unwrap();
  857. drop(w);
  858. }
  859. #[test]
  860. fn it_serializes_a_measurement_in_place() {
  861. let mut buf = String::with_capacity(4096);
  862. let mut meas = Measurement::new("rust_test");
  863. meas.add_tag("a", "b");
  864. meas.add_field("c", Value::Float(1.0));
  865. let now = now();
  866. meas.set_timestamp(now);
  867. serialize(&meas, &mut buf);
  868. let ans = format!("rust_test,a=b c=1 {}", now);
  869. assert_eq!(buf, ans);
  870. }
  871. #[test]
  872. fn it_serializes_a_hard_to_serialize_message() {
  873. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  874. let mut buf = String::new();
  875. let mut server_resp = String::new();
  876. let mut m = Measurement::new("rust_test");
  877. m.add_field("s", Value::String(&raw));
  878. let now = now();
  879. m.set_timestamp(now);
  880. serialize(&m, &mut buf);
  881. println!("{}", buf);
  882. buf.push_str("\n");
  883. let buf_copy = buf.clone();
  884. buf.push_str(&buf_copy);
  885. println!("{}", buf);
  886. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  887. let client = Client::new();
  888. match client.post(url.clone())
  889. .body(&buf)
  890. .send() {
  891. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  892. Ok(mut resp) => {
  893. resp.read_to_string(&mut server_resp).unwrap();
  894. panic!("{}", server_resp);
  895. }
  896. Err(why) => {
  897. panic!(why)
  898. }
  899. }
  900. }
  901. #[bench]
  902. fn serialize_owned_longer(b: &mut Bencher) {
  903. let mut buf = String::with_capacity(1024);
  904. let m =
  905. OwnedMeasurement::new("test")
  906. .add_tag("one", "a")
  907. .add_tag("two", "b")
  908. .add_tag("ticker", "xmr_btc")
  909. .add_tag("exchange", "plnx")
  910. .add_tag("side", "bid")
  911. .add_field("three", OwnedValue::Float(1.2345))
  912. .add_field("four", OwnedValue::Integer(57))
  913. .add_field("five", OwnedValue::Boolean(true))
  914. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  915. .set_timestamp(now());
  916. b.iter(|| {
  917. serialize_owned(&m, &mut buf);
  918. buf.clear()
  919. });
  920. }
  921. #[bench]
  922. fn serialize_owned_simple(b: &mut Bencher) {
  923. let mut buf = String::with_capacity(1024);
  924. let m =
  925. OwnedMeasurement::new("test")
  926. .add_tag("one", "a")
  927. .add_tag("two", "b")
  928. .add_field("three", OwnedValue::Float(1.2345))
  929. .add_field("four", OwnedValue::Integer(57))
  930. .set_timestamp(now());
  931. b.iter(|| {
  932. serialize_owned(&m, &mut buf);
  933. buf.clear()
  934. });
  935. }
  936. #[test]
  937. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  938. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  939. let mut buf = String::new();
  940. let mut server_resp = String::new();
  941. let m = OwnedMeasurement::new("rust_test")
  942. .add_field("s", OwnedValue::String(raw.to_string()))
  943. .set_timestamp(now());
  944. serialize_owned(&m, &mut buf);
  945. println!("{}", buf);
  946. buf.push_str("\n");
  947. let buf_copy = buf.clone();
  948. buf.push_str(&buf_copy);
  949. println!("{}", buf);
  950. let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse");
  951. let client = Client::new();
  952. match client.post(url.clone())
  953. .body(&buf)
  954. .send() {
  955. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  956. Ok(mut resp) => {
  957. resp.read_to_string(&mut server_resp).unwrap();
  958. panic!("{}", server_resp);
  959. }
  960. Err(why) => {
  961. panic!(why)
  962. }
  963. }
  964. }
  965. }