You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1012 lines
35KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. use std::fs::{self, OpenOptions};
  8. use std::time::Duration;
  9. use std::hash::{Hash, BuildHasherDefault};
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. use chrono::{DateTime, Utc, TimeZone};
  17. use sloggers::types::Severity;
  18. use ordermap::OrderMap;
  19. use fnv::FnvHasher;
  20. use decimal::d128;
  21. use uuid::Uuid;
  22. use money::Ticker;
  23. use super::{nanos, file_logger};
  24. use warnings::Warning;
  25. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  26. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  27. const DB_NAME: &'static str = "mm";
  28. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  29. //const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  30. const ZMQ_RCV_HWM: i32 = 0;
  31. const ZMQ_SND_HWM: i32 = 0;
  32. const BUFFER_SIZE: u8 = 80;
  33. pub use super::{dur_nanos, dt_nanos};
  34. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  35. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  36. Map::with_capacity_and_hasher(capacity, Default::default())
  37. }
  38. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  39. ///
  40. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  41. /// values, as well as sends it with the `Sender`.
  42. ///
  43. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  44. /// measurement (see `tests` mod).
  45. ///
  46. /// # Examples
  47. ///
  48. /// ```
  49. /// #[macro_use] extern crate logging;
  50. /// extern crate decimal;
  51. ///
  52. /// use std::sync::mpsc::channel;
  53. /// use decimal::d128;
  54. /// use logging::influx::*;
  55. ///
  56. /// fn main() {
  57. /// let (tx, rx) = channel();
  58. ///
  59. /// // "shorthand" syntax
  60. ///
  61. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  62. ///
  63. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  64. ///
  65. /// assert_eq!(meas.key, "test");
  66. /// assert_eq!(meas.tags.get("color"), Some(&"red"));
  67. /// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
  68. ///
  69. /// // alternate syntax ...
  70. ///
  71. /// measure!(tx, test,
  72. /// tag [ one => "a" ],
  73. /// tag [ two => "b" ],
  74. /// int [ three => 2 ],
  75. /// float [ four => 1.2345 ],
  76. /// string [ five => String::from("d") ],
  77. /// bool [ six => true ],
  78. /// int [ seven => { 1 + 2 } ],
  79. /// time [ 1 ]
  80. /// );
  81. ///
  82. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  83. ///
  84. /// assert_eq!(meas.key, "test");
  85. /// assert_eq!(meas.tags.get("one"), Some(&"a"));
  86. /// assert_eq!(meas.tags.get("two"), Some(&"b"));
  87. /// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  88. /// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  89. /// assert_eq!(meas.timestamp, Some(1));
  90. ///
  91. /// // use the @make_meas flag to skip sending a measurement, instead merely
  92. /// // creating it.
  93. ///
  94. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  95. ///
  96. /// // each variant also has shorthand aliases
  97. ///
  98. /// let meas: OwnedMeasurement =
  99. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  100. /// }
  101. /// ```
  102. ///
  103. #[macro_export]
  104. macro_rules! measure {
  105. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  106. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  107. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  108. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  109. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  110. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  111. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  112. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  113. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  114. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  115. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  116. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  117. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  118. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  119. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  120. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  121. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  122. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  123. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  124. (@count_tags) => {0usize};
  125. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  126. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  127. (@count_fields) => {0usize};
  128. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  129. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  130. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  131. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  132. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  133. };
  134. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  135. let n_tags = measure!(@count_tags $($t)*);
  136. let n_fields = measure!(@count_fields $($t)*);
  137. let mut meas =
  138. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  139. $(
  140. measure!(@kv $t, meas, $($tail)*);
  141. )*
  142. meas
  143. }};
  144. ($m:tt, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  145. measure!($m, $name, $($t [ $($tail)* ] ),+)
  146. };
  147. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  148. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  149. let _ = $m.send(measurement);
  150. }};
  151. }
  152. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  153. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  154. /// measurements have accumulated.
  155. ///
  156. pub struct InfluxWriter {
  157. host: &'static str,
  158. db: &'static str,
  159. tx: Sender<OwnedMeasurement>,
  160. kill_switch: Sender<()>,
  161. thread: Option<thread::JoinHandle<()>>,
  162. }
  163. impl Default for InfluxWriter {
  164. fn default() -> Self {
  165. InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE)
  166. }
  167. }
  168. impl InfluxWriter {
  169. /// Sends the `OwnedMeasurement` to the serialization thread.
  170. ///
  171. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<OwnedMeasurement>> {
  172. self.tx.send(m)
  173. }
  174. pub fn tx(&self) -> Sender<OwnedMeasurement> {
  175. self.tx.clone()
  176. }
  177. pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self {
  178. let (kill_switch, terminate) = channel();
  179. let (tx, rx) = channel();
  180. let logger = file_logger(log_path, Severity::Info);
  181. let thread = thread::spawn(move || {
  182. info!(logger, "initializing url";
  183. "DB_HOST" => host,
  184. "DB_NAME" => db);
  185. let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
  186. let client = Client::new();
  187. info!(logger, "initializing buffers");
  188. let mut meas_buf = String::with_capacity(32 * 32 * 32);
  189. let mut buf = String::with_capacity(32 * 32 * 32);
  190. let mut count = 0;
  191. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  192. trace!(logger, "appending serialized measurement to buffer";
  193. "prev" => prev,
  194. "buf.len()" => buf.len());
  195. match prev {
  196. 0 => {
  197. buf.push_str(s);
  198. 1
  199. }
  200. n if n < buffer_size => {
  201. buf.push_str("\n");
  202. buf.push_str(s);
  203. n + 1
  204. }
  205. _ => {
  206. buf.push_str("\n");
  207. if s.len() > 0 {
  208. buf.push_str(s);
  209. }
  210. trace!(logger, "sending buffer to influx";
  211. "buf.len()" => buf.len());
  212. #[cfg(not(test))]
  213. {
  214. let resp = client.post(url.clone())
  215. .body(buf.as_str())
  216. .send();
  217. match resp {
  218. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  219. trace!(logger, "server responded ok: 204 NoContent");
  220. }
  221. Ok(mut resp) => {
  222. let mut server_resp = String::with_capacity(1024);
  223. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  224. error!(logger, "influx server error";
  225. "status" => resp.status.to_string(),
  226. "body" => server_resp);
  227. }
  228. Err(why) => {
  229. error!(logger, "http request failed: {:?}", why);
  230. }
  231. }
  232. }
  233. buf.clear();
  234. 0
  235. }
  236. }
  237. };
  238. let mut rcvd_msg = false;
  239. loop {
  240. rcvd_msg = false;
  241. rx.recv_timeout(Duration::from_millis(10))
  242. .map(|mut meas: OwnedMeasurement| {
  243. // if we didn't set the timestamp, it would end up
  244. // being whenever we accumulated `BUFFER_SIZE` messages,
  245. // which might be some period of time after we received
  246. // the message.
  247. //
  248. if meas.timestamp.is_none() {
  249. meas.timestamp = Some(now());
  250. }
  251. trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
  252. serialize_owned(&meas, &mut meas_buf);
  253. count = next(count, &meas_buf, &mut buf);
  254. meas_buf.clear();
  255. rcvd_msg = true;
  256. });
  257. let end = terminate.try_recv()
  258. .map(|_| {
  259. let _ = next(::std::u8::MAX, "", &mut buf);
  260. true
  261. }).unwrap_or(false);
  262. if end { break }
  263. #[cfg(feature = "no-thrash")]
  264. {
  265. if !rcvd_msg {
  266. thread::sleep(Duration::new(0, 5000));
  267. }
  268. }
  269. }
  270. crit!(logger, "goodbye");
  271. });
  272. InfluxWriter {
  273. host,
  274. db,
  275. tx,
  276. kill_switch,
  277. thread: Some(thread)
  278. }
  279. }
  280. }
  281. impl Drop for InfluxWriter {
  282. fn drop(&mut self) {
  283. let _ = self.kill_switch.send(()).unwrap();
  284. if let Some(thread) = self.thread.take() {
  285. let _ = thread.join();
  286. }
  287. }
  288. }
  289. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  290. let socket = ctx.socket(zmq::PULL)?;
  291. socket.bind(WRITER_ADDR)?;
  292. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  293. Ok(socket)
  294. }
  295. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  296. let socket = ctx.socket(zmq::PUSH)?;
  297. socket.connect(WRITER_ADDR)?;
  298. socket.set_sndhwm(ZMQ_SND_HWM)?;
  299. Ok(socket)
  300. }
  301. /// This removes offending things rather than escaping them.
  302. ///
  303. fn escape_tag(s: &str) -> String {
  304. s.replace(" ", "")
  305. .replace(",", "")
  306. .replace("\"", "")
  307. }
  308. fn escape(s: &str) -> String {
  309. s.replace(" ", "\\ ")
  310. .replace(",", "\\,")
  311. }
  312. fn as_string(s: &str) -> String {
  313. // the second replace removes double escapes
  314. //
  315. format!("\"{}\"", s.replace("\"", "\\\"")
  316. .replace(r#"\\""#, r#"\""#))
  317. }
  318. #[test]
  319. fn it_checks_as_string_does_not_double_escape() {
  320. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  321. let escaped = as_string(&raw);
  322. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  323. }
  324. fn as_integer(i: &i64) -> String {
  325. format!("{}i", i)
  326. }
  327. fn as_float(f: &f64) -> String {
  328. f.to_string()
  329. }
  330. fn as_boolean(b: &bool) -> &str {
  331. if *b { "t" } else { "f" }
  332. }
  333. pub fn now() -> i64 {
  334. nanos(Utc::now()) as i64
  335. }
  336. /// Serialize the measurement into influx line protocol
  337. /// and append to the buffer.
  338. ///
  339. /// # Examples
  340. ///
  341. /// ```
  342. /// extern crate influent;
  343. /// extern crate logging;
  344. ///
  345. /// use influent::measurement::{Measurement, Value};
  346. /// use std::string::String;
  347. /// use logging::influx::serialize;
  348. ///
  349. /// fn main() {
  350. /// let mut buf = String::new();
  351. /// let mut m = Measurement::new("test");
  352. /// m.add_field("x", Value::Integer(1));
  353. /// serialize(&m, &mut buf);
  354. /// }
  355. ///
  356. /// ```
  357. ///
  358. pub fn serialize(measurement: &Measurement, line: &mut String) {
  359. line.push_str(&escape(measurement.key));
  360. for (tag, value) in measurement.tags.iter() {
  361. line.push_str(",");
  362. line.push_str(&escape(tag));
  363. line.push_str("=");
  364. line.push_str(&escape(value));
  365. }
  366. let mut was_spaced = false;
  367. for (field, value) in measurement.fields.iter() {
  368. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  369. line.push_str(&escape(field));
  370. line.push_str("=");
  371. match value {
  372. &Value::String(ref s) => line.push_str(&as_string(s)),
  373. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  374. &Value::Float(ref f) => line.push_str(&as_float(f)),
  375. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  376. };
  377. }
  378. match measurement.timestamp {
  379. Some(t) => {
  380. line.push_str(" ");
  381. line.push_str(&t.to_string());
  382. }
  383. _ => {}
  384. }
  385. }
  386. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  387. ///
  388. /// The serialized measurement is appended to the end of the string without
  389. /// any regard for what exited in it previously.
  390. ///
  391. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  392. line.push_str(&escape_tag(measurement.key));
  393. let add_tag = |line: &mut String, key: &str, value: &str| {
  394. line.push_str(",");
  395. line.push_str(&escape_tag(key));
  396. line.push_str("=");
  397. line.push_str(&escape(value));
  398. };
  399. for (key, value) in measurement.tags.iter() {
  400. add_tag(line, key, value);
  401. }
  402. let mut fields = measurement.fields.iter();
  403. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  404. if is_first { line.push_str(" "); } else { line.push_str(","); }
  405. line.push_str(&escape_tag(key));
  406. line.push_str("=");
  407. match *value {
  408. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  409. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  410. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  411. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  412. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  413. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  414. };
  415. };
  416. let mut fields = measurement.fields.iter();
  417. // first time separate from tags with space
  418. //
  419. fields.next().map(|kv| {
  420. add_field(line, kv.0, kv.1, true);
  421. });
  422. // then seperate the rest w/ comma
  423. //
  424. for kv in fields {
  425. add_field(line, kv.0, kv.1, false);
  426. }
  427. if let Some(t) = measurement.timestamp {
  428. line.push_str(" ");
  429. line.push_str(&t.to_string());
  430. }
  431. }
  432. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  433. thread::spawn(move || {
  434. let _ = fs::create_dir("/tmp/mm");
  435. let ctx = zmq::Context::new();
  436. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  437. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  438. let client = Client::new();
  439. let mut buf = String::with_capacity(4096);
  440. let mut server_resp = String::with_capacity(4096);
  441. let mut count = 0;
  442. loop {
  443. if let Ok(bytes) = socket.recv_bytes(0) {
  444. if let Ok(msg) = String::from_utf8(bytes) {
  445. count = match count {
  446. 0 => {
  447. buf.push_str(&msg);
  448. 1
  449. }
  450. n @ 1...40 => {
  451. buf.push_str("\n");
  452. buf.push_str(&msg);
  453. n + 1
  454. }
  455. _ => {
  456. buf.push_str("\n");
  457. buf.push_str(&msg);
  458. match client.post(url.clone())
  459. .body(&buf)
  460. .send() {
  461. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  462. Ok(mut resp) => {
  463. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  464. let _ = warnings.send(
  465. Warning::Error(
  466. format!("Influx server: {}", server_resp)));
  467. server_resp.clear();
  468. }
  469. Err(why) => {
  470. let _ = warnings.send(
  471. Warning::Error(
  472. format!("Influx write error: {}", why)));
  473. }
  474. }
  475. buf.clear();
  476. 0
  477. }
  478. }
  479. }
  480. }
  481. }
  482. })
  483. }
  484. #[derive(Debug, Clone, PartialEq)]
  485. pub enum OwnedValue {
  486. String(String),
  487. Float(f64),
  488. Integer(i64),
  489. Boolean(bool),
  490. D128(d128),
  491. Uuid(Uuid),
  492. }
  493. #[derive(Clone, Debug)]
  494. pub struct OwnedMeasurement {
  495. pub key: &'static str,
  496. pub timestamp: Option<i64>,
  497. pub fields: Map<&'static str, OwnedValue>,
  498. pub tags: Map<&'static str, &'static str>,
  499. //pub n_tags: usize,
  500. //pub n_fields: usize,
  501. //pub string_tags: HashMap<&'static str, String>
  502. }
  503. impl OwnedMeasurement {
  504. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  505. OwnedMeasurement {
  506. key,
  507. timestamp: None,
  508. tags: new_map(n_tags),
  509. fields: new_map(n_fields),
  510. //n_tags,
  511. //n_fields,
  512. //string_tags: HashMap::new()
  513. }
  514. }
  515. pub fn new(key: &'static str) -> Self {
  516. OwnedMeasurement::with_capacity(key, 4, 4)
  517. }
  518. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  519. self.tags.insert(key, value);
  520. self
  521. }
  522. // pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
  523. // self.string_tags.insert(key, value);
  524. // self
  525. // }
  526. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  527. self.fields.insert(key, value);
  528. self
  529. }
  530. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  531. self.timestamp = Some(timestamp);
  532. self
  533. }
  534. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  535. *self.tags.entry(key).or_insert(value) = value;
  536. self
  537. }
  538. }
  539. #[cfg(test)]
  540. mod tests {
  541. use super::*;
  542. use test::{black_box, Bencher};
  543. #[test]
  544. fn it_uses_measure_macro_parenthesis_syntax() {
  545. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  546. assert_eq!(m.key, "test");
  547. assert_eq!(m.tags.get("a"), Some(&"b"));
  548. assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
  549. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  550. assert_eq!(m.timestamp, Some(1));
  551. }
  552. #[bench]
  553. fn influx_writer_send_basic(b: &mut Bencher) {
  554. let m = InfluxWriter::default();
  555. b.iter(|| {
  556. measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]);
  557. });
  558. }
  559. #[bench]
  560. fn influx_writer_send_price(b: &mut Bencher) {
  561. let m = InfluxWriter::default();
  562. b.iter(|| {
  563. measure!(m, test,
  564. tag[ticker; t!(xmr-btc).to_str()],
  565. tag[exchange; "plnx"],
  566. d128[bid; d128::zero()],
  567. d128[ask; d128::zero()],
  568. );
  569. });
  570. }
  571. #[test]
  572. fn it_checks_color_tag_error_in_non_doctest() {
  573. let (tx, rx) = channel();
  574. measure!(tx, test, tag[color;"red"], int[n;1]);
  575. let meas: OwnedMeasurement = rx.recv().unwrap();
  576. assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
  577. }
  578. #[test]
  579. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  580. let meas = measure!(@make_meas test_measurement,
  581. tag [ one => "a" ],
  582. tag [ two => "b" ],
  583. int [ three => 2 ],
  584. float [ four => 1.2345 ],
  585. string [ five => String::from("d") ],
  586. bool [ six => true ],
  587. int [ seven => { 1 + 2 } ],
  588. time [ 1 ]
  589. );
  590. assert_eq!(meas.key, "test_measurement");
  591. assert_eq!(meas.tags.get("one"), Some(&"a"));
  592. assert_eq!(meas.tags.get("two"), Some(&"b"));
  593. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  594. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  595. assert_eq!(meas.timestamp, Some(1));
  596. }
  597. #[test]
  598. fn it_uses_the_measure_macro() {
  599. let (tx, rx) = channel();
  600. measure!(tx, test_measurement,
  601. tag [ one => "a" ],
  602. tag [ two => "b" ],
  603. int [ three => 2 ],
  604. float [ four => 1.2345 ],
  605. string [ five => String::from("d") ],
  606. bool [ six => true ],
  607. int [ seven => { 1 + 2 } ],
  608. time [ 1 ]
  609. );
  610. thread::sleep_ms(10);
  611. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  612. assert_eq!(meas.key, "test_measurement");
  613. assert_eq!(meas.tags.get("one"), Some(&"a"));
  614. assert_eq!(meas.tags.get("two"), Some(&"b"));
  615. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  616. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  617. assert_eq!(meas.timestamp, Some(1));
  618. }
  619. #[test]
  620. fn it_uses_measure_macro_for_d128_and_uuid() {
  621. let (tx, rx) = channel();
  622. let u = Uuid::new_v4();
  623. let d = d128::zero();
  624. let t = now();
  625. measure!(tx, test_measurement,
  626. tag[one; "a"],
  627. d128[two; d],
  628. uuid[three; u],
  629. time[t]
  630. );
  631. thread::sleep_ms(10);
  632. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  633. assert_eq!(meas.key, "test_measurement");
  634. assert_eq!(meas.tags.get("one"), Some(&"a"));
  635. assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
  636. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
  637. assert_eq!(meas.timestamp, Some(t));
  638. }
  639. #[test]
  640. fn it_uses_the_measure_macro_alt_syntax() {
  641. let (tx, rx) = channel();
  642. measure!(tx, test_measurement,
  643. tag[one; "a"],
  644. tag[two; "b"],
  645. int[three; 2],
  646. float[four; 1.2345],
  647. string[five; String::from("d")],
  648. bool [ six => true ],
  649. int[seven; { 1 + 2 }],
  650. time[1]
  651. );
  652. thread::sleep_ms(10);
  653. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  654. assert_eq!(meas.key, "test_measurement");
  655. assert_eq!(meas.tags.get("one"), Some(&"a"));
  656. assert_eq!(meas.tags.get("two"), Some(&"b"));
  657. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  658. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  659. assert_eq!(meas.timestamp, Some(1));
  660. }
  661. #[test]
  662. fn it_checks_that_fields_are_separated_correctly() {
  663. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  664. assert_eq!(m.key, "test");
  665. assert_eq!(m.tags.get("a"), Some(&"one"));
  666. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  667. let mut buf = String::new();
  668. serialize_owned(&m, &mut buf);
  669. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  670. }
  671. #[test]
  672. fn try_to_break_measure_macro() {
  673. let (tx, _) = channel();
  674. measure!(tx, one, tag[x=>"y"], int[n;1]);
  675. measure!(tx, one, tag[x;"y"], int[n;1],);
  676. struct A {
  677. pub one: i32,
  678. pub two: i32,
  679. }
  680. struct B {
  681. pub a: A
  682. }
  683. let b = B { a: A { one: 1, two: 2 } };
  684. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  685. assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
  686. }
  687. #[bench]
  688. fn measure_macro_small(b: &mut Bencher) {
  689. let (tx, rx) = channel();
  690. let listener = thread::spawn(move || {
  691. loop { if rx.recv().is_err() { break } }
  692. });
  693. b.iter(|| {
  694. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  695. });
  696. }
  697. #[bench]
  698. fn measure_macro_medium(b: &mut Bencher) {
  699. let (tx, rx) = channel();
  700. let listener = thread::spawn(move || {
  701. loop { if rx.recv().is_err() { break } }
  702. });
  703. b.iter(|| {
  704. measure!(tx, test,
  705. tag[color; "red"],
  706. tag[mood => "playful"],
  707. tag [ ticker => "xmr_btc" ],
  708. float[ price => 1.2345 ],
  709. float[ amount => 56.323],
  710. int[n; 1],
  711. time[now()]
  712. );
  713. });
  714. }
  715. #[test]
  716. #[ignore]
  717. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  718. let ctx = zmq::Context::new();
  719. let socket = push(&ctx).unwrap();
  720. let (tx, rx) = channel();
  721. let w = writer(tx.clone());
  722. let mut buf = String::with_capacity(4096);
  723. let mut meas = Measurement::new("rust_test");
  724. meas.add_tag("a", "t");
  725. meas.add_field("c", Value::Float(1.23456));
  726. let now = now();
  727. meas.set_timestamp(now);
  728. serialize(&meas, &mut buf);
  729. socket.send_str(&buf, 0);
  730. drop(w);
  731. }
  732. #[test]
  733. fn it_serializes_a_measurement_in_place() {
  734. let mut buf = String::with_capacity(4096);
  735. let mut meas = Measurement::new("rust_test");
  736. meas.add_tag("a", "b");
  737. meas.add_field("c", Value::Float(1.0));
  738. let now = now();
  739. meas.set_timestamp(now);
  740. serialize(&meas, &mut buf);
  741. let ans = format!("rust_test,a=b c=1 {}", now);
  742. assert_eq!(buf, ans);
  743. }
  744. #[test]
  745. fn it_serializes_a_hard_to_serialize_message() {
  746. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  747. let mut buf = String::new();
  748. let mut server_resp = String::new();
  749. let mut m = Measurement::new("rust_test");
  750. m.add_field("s", Value::String(&raw));
  751. let now = now();
  752. m.set_timestamp(now);
  753. serialize(&m, &mut buf);
  754. println!("{}", buf);
  755. buf.push_str("\n");
  756. let buf_copy = buf.clone();
  757. buf.push_str(&buf_copy);
  758. println!("{}", buf);
  759. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  760. let client = Client::new();
  761. match client.post(url.clone())
  762. .body(&buf)
  763. .send() {
  764. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  765. Ok(mut resp) => {
  766. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  767. panic!("{}", server_resp);
  768. }
  769. Err(why) => {
  770. panic!(why)
  771. }
  772. }
  773. }
  774. #[bench]
  775. fn serialize_owned_longer(b: &mut Bencher) {
  776. let mut buf = String::with_capacity(1024);
  777. let m =
  778. OwnedMeasurement::new("test")
  779. .add_tag("one", "a")
  780. .add_tag("two", "b")
  781. .add_tag("ticker", "xmr_btc")
  782. .add_tag("exchange", "plnx")
  783. .add_tag("side", "bid")
  784. .add_field("three", OwnedValue::Float(1.2345))
  785. .add_field("four", OwnedValue::Integer(57))
  786. .add_field("five", OwnedValue::Boolean(true))
  787. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  788. .set_timestamp(now());
  789. b.iter(|| {
  790. serialize_owned(&m, &mut buf);
  791. buf.clear()
  792. });
  793. }
  794. #[bench]
  795. fn serialize_owned_simple(b: &mut Bencher) {
  796. let mut buf = String::with_capacity(1024);
  797. let m =
  798. OwnedMeasurement::new("test")
  799. .add_tag("one", "a")
  800. .add_tag("two", "b")
  801. .add_field("three", OwnedValue::Float(1.2345))
  802. .add_field("four", OwnedValue::Integer(57))
  803. .set_timestamp(now());
  804. b.iter(|| {
  805. serialize_owned(&m, &mut buf);
  806. buf.clear()
  807. });
  808. }
  809. #[test]
  810. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  811. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  812. let mut buf = String::new();
  813. let mut server_resp = String::new();
  814. let mut m = OwnedMeasurement::new("rust_test")
  815. .add_field("s", OwnedValue::String(raw.to_string()))
  816. .set_timestamp(now());
  817. serialize_owned(&m, &mut buf);
  818. println!("{}", buf);
  819. buf.push_str("\n");
  820. let buf_copy = buf.clone();
  821. buf.push_str(&buf_copy);
  822. println!("{}", buf);
  823. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  824. let client = Client::new();
  825. match client.post(url.clone())
  826. .body(&buf)
  827. .send() {
  828. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  829. Ok(mut resp) => {
  830. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  831. panic!("{}", server_resp);
  832. }
  833. Err(why) => {
  834. panic!(why)
  835. }
  836. }
  837. }
  838. // macro_rules! make_measurement {
  839. // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  840. // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  841. // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  842. // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  843. // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
  844. // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
  845. // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  846. // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
  847. //
  848. // (@count_tags) => {0usize};
  849. // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  850. // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  851. //
  852. // (@count_fields) => {0usize};
  853. // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  854. // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  855. // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  856. //
  857. // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  858. // let n_tags = measure!(@count_tags $($t)*);
  859. // let n_fields = measure!(@count_fields $($t)*);
  860. // let mut meas =
  861. // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  862. // $(
  863. // measure!(@kv $t, meas, $($tail)*);
  864. // )*
  865. // //let _ = $m.send(meas);
  866. // meas
  867. // }};
  868. // }
  869. //
  870. // #[test]
  871. // fn it_checks_n_tags_is_correct() {
  872. // let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
  873. // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
  874. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
  875. // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
  876. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
  877. //
  878. // let m4 =
  879. // make_measurement!(tx, test,
  880. // tag[a;"b"],
  881. // tag[c;"d"],
  882. // int[n; 1],
  883. // tag[e;"f"],
  884. // float[x; 1.234],
  885. // tag[g;"h"],
  886. // time[1],
  887. // );
  888. // assert_eq!(m4.n_tags, 4);
  889. // assert_eq!(m4.n_fields, 2);
  890. // }
  891. }