You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1056 lines
36KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. use std::fs::{self, OpenOptions};
  8. use std::time::Duration;
  9. use std::hash::{Hash, BuildHasherDefault};
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. use chrono::{DateTime, Utc, TimeZone};
  17. use sloggers::types::Severity;
  18. use ordermap::OrderMap;
  19. use fnv::FnvHasher;
  20. use decimal::d128;
  21. use uuid::Uuid;
  22. use money::Ticker;
  23. use super::{nanos, file_logger};
  24. use warnings::Warning;
  25. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  26. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  27. const DB_NAME: &'static str = "mm";
  28. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  29. //const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  30. const ZMQ_RCV_HWM: i32 = 0;
  31. const ZMQ_SND_HWM: i32 = 0;
  32. const BUFFER_SIZE: u8 = 80;
  33. pub use super::{dur_nanos, dt_nanos};
  34. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  35. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  36. Map::with_capacity_and_hasher(capacity, Default::default())
  37. }
  38. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  39. ///
  40. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  41. /// values, as well as sends it with the `Sender`.
  42. ///
  43. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  44. /// measurement (see `tests` mod).
  45. ///
  46. /// # Examples
  47. ///
  48. /// ```
  49. /// #[macro_use] extern crate logging;
  50. /// extern crate decimal;
  51. ///
  52. /// use std::sync::mpsc::channel;
  53. /// use decimal::d128;
  54. /// use logging::influx::*;
  55. ///
  56. /// fn main() {
  57. /// let (tx, rx) = channel();
  58. ///
  59. /// // "shorthand" syntax
  60. ///
  61. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  62. ///
  63. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  64. ///
  65. /// assert_eq!(meas.key, "test");
  66. /// assert_eq!(meas.tags.get("color"), Some(&"red"));
  67. /// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
  68. ///
  69. /// // alternate syntax ...
  70. ///
  71. /// measure!(tx, test,
  72. /// tag [ one => "a" ],
  73. /// tag [ two => "b" ],
  74. /// int [ three => 2 ],
  75. /// float [ four => 1.2345 ],
  76. /// string [ five => String::from("d") ],
  77. /// bool [ six => true ],
  78. /// int [ seven => { 1 + 2 } ],
  79. /// time [ 1 ]
  80. /// );
  81. ///
  82. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  83. ///
  84. /// assert_eq!(meas.key, "test");
  85. /// assert_eq!(meas.tags.get("one"), Some(&"a"));
  86. /// assert_eq!(meas.tags.get("two"), Some(&"b"));
  87. /// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  88. /// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  89. /// assert_eq!(meas.timestamp, Some(1));
  90. ///
  91. /// // use the @make_meas flag to skip sending a measurement, instead merely
  92. /// // creating it.
  93. ///
  94. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  95. ///
  96. /// // each variant also has shorthand aliases
  97. ///
  98. /// let meas: OwnedMeasurement =
  99. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  100. /// }
  101. /// ```
  102. ///
  103. #[macro_export]
  104. macro_rules! measure {
  105. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  106. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  107. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  108. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  109. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  110. (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) };
  111. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  112. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  113. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  114. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  115. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  116. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  117. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  118. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  119. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  120. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  121. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  122. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  123. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  124. (@ea b, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  125. (@as_expr $e:expr) => {$e};
  126. (@count_tags) => {0usize};
  127. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  128. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  129. (@count_fields) => {0usize};
  130. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  131. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  132. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  133. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  134. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  135. };
  136. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  137. let n_tags = measure!(@count_tags $($t)*);
  138. let n_fields = measure!(@count_fields $($t)*);
  139. let mut meas =
  140. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  141. $(
  142. measure!(@kv $t, meas, $($tail)*);
  143. )*
  144. meas
  145. }};
  146. ($m:expr, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  147. measure!($m, $name, $($t [ $($tail)* ] ),+)
  148. };
  149. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  150. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  151. let _ = $m.send(measurement);
  152. }};
  153. }
  154. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  155. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  156. /// measurements have accumulated.
  157. ///
  158. pub struct InfluxWriter {
  159. host: &'static str,
  160. db: &'static str,
  161. tx: Sender<OwnedMeasurement>,
  162. kill_switch: Sender<()>,
  163. thread: Option<thread::JoinHandle<()>>,
  164. }
  165. impl Default for InfluxWriter {
  166. fn default() -> Self {
  167. InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE)
  168. }
  169. }
  170. impl Clone for InfluxWriter {
  171. fn clone(&self) -> Self {
  172. let (tx, _) = channel();
  173. InfluxWriter {
  174. host: self.host,
  175. db: self.db,
  176. tx: self.tx.clone(),
  177. kill_switch: tx,
  178. thread: None,
  179. }
  180. }
  181. }
  182. impl InfluxWriter {
  183. /// Sends the `OwnedMeasurement` to the serialization thread.
  184. ///
  185. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<OwnedMeasurement>> {
  186. self.tx.send(m)
  187. }
  188. pub fn tx(&self) -> Sender<OwnedMeasurement> {
  189. self.tx.clone()
  190. }
  191. pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self {
  192. let (kill_switch, terminate) = channel();
  193. let (tx, rx) = channel();
  194. let logger = file_logger(log_path, Severity::Info);
  195. let thread = thread::spawn(move || {
  196. info!(logger, "initializing url";
  197. "DB_HOST" => host,
  198. "DB_NAME" => db);
  199. let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
  200. let client = Client::new();
  201. info!(logger, "initializing buffers");
  202. let mut meas_buf = String::with_capacity(32 * 32 * 32);
  203. let mut buf = String::with_capacity(32 * 32 * 32);
  204. let mut count = 0;
  205. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  206. trace!(logger, "appending serialized measurement to buffer";
  207. "prev" => prev,
  208. "buf.len()" => buf.len());
  209. match prev {
  210. 0 if buffer_size > 0 => {
  211. buf.push_str(s);
  212. 1
  213. }
  214. n if n < buffer_size => {
  215. buf.push_str("\n");
  216. buf.push_str(s);
  217. n + 1
  218. }
  219. _ => {
  220. buf.push_str("\n");
  221. if s.len() > 0 {
  222. buf.push_str(s);
  223. }
  224. trace!(logger, "sending buffer to influx";
  225. "buf.len()" => buf.len());
  226. #[cfg(not(test))]
  227. {
  228. let resp = client.post(url.clone())
  229. .body(buf.as_str())
  230. .send();
  231. match resp {
  232. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  233. trace!(logger, "server responded ok: 204 NoContent");
  234. }
  235. Ok(mut resp) => {
  236. let mut server_resp = String::with_capacity(1024);
  237. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  238. error!(logger, "influx server error";
  239. "status" => resp.status.to_string(),
  240. "body" => server_resp);
  241. }
  242. Err(why) => {
  243. error!(logger, "http request failed: {:?}", why);
  244. }
  245. }
  246. }
  247. buf.clear();
  248. 0
  249. }
  250. }
  251. };
  252. let mut rcvd_msg = false;
  253. loop {
  254. rcvd_msg = false;
  255. rx.recv_timeout(Duration::from_millis(10))
  256. .map(|mut meas: OwnedMeasurement| {
  257. // if we didn't set the timestamp, it would end up
  258. // being whenever we accumulated `BUFFER_SIZE` messages,
  259. // which might be some period of time after we received
  260. // the message.
  261. //
  262. if meas.timestamp.is_none() {
  263. meas.timestamp = Some(now());
  264. }
  265. trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
  266. serialize_owned(&meas, &mut meas_buf);
  267. count = next(count, &meas_buf, &mut buf);
  268. meas_buf.clear();
  269. rcvd_msg = true;
  270. });
  271. let end = terminate.try_recv()
  272. .map(|_| {
  273. let _ = next(::std::u8::MAX, "", &mut buf);
  274. true
  275. }).unwrap_or(false);
  276. if end { break }
  277. #[cfg(feature = "no-thrash")]
  278. {
  279. if !rcvd_msg {
  280. thread::sleep(Duration::new(0, 5000));
  281. }
  282. }
  283. }
  284. crit!(logger, "goodbye");
  285. });
  286. InfluxWriter {
  287. host,
  288. db,
  289. tx,
  290. kill_switch,
  291. thread: Some(thread)
  292. }
  293. }
  294. }
  295. impl Drop for InfluxWriter {
  296. fn drop(&mut self) {
  297. let _ = self.kill_switch.send(()).unwrap();
  298. if let Some(thread) = self.thread.take() {
  299. let _ = thread.join();
  300. }
  301. }
  302. }
  303. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  304. let socket = ctx.socket(zmq::PULL)?;
  305. socket.bind(WRITER_ADDR)?;
  306. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  307. Ok(socket)
  308. }
  309. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  310. let socket = ctx.socket(zmq::PUSH)?;
  311. socket.connect(WRITER_ADDR)?;
  312. socket.set_sndhwm(ZMQ_SND_HWM)?;
  313. Ok(socket)
  314. }
  315. /// This removes offending things rather than escaping them.
  316. ///
  317. fn escape_tag(s: &str) -> String {
  318. s.replace(" ", "")
  319. .replace(",", "")
  320. .replace("\"", "")
  321. }
  322. fn escape(s: &str) -> String {
  323. s.replace(" ", "\\ ")
  324. .replace(",", "\\,")
  325. }
  326. fn as_string(s: &str) -> String {
  327. // the second replace removes double escapes
  328. //
  329. format!("\"{}\"", s.replace("\"", "\\\"")
  330. .replace(r#"\\""#, r#"\""#))
  331. }
  332. #[test]
  333. fn it_checks_as_string_does_not_double_escape() {
  334. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  335. let escaped = as_string(&raw);
  336. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  337. }
  338. fn as_integer(i: &i64) -> String {
  339. format!("{}i", i)
  340. }
  341. fn as_float(f: &f64) -> String {
  342. f.to_string()
  343. }
  344. fn as_boolean(b: &bool) -> &str {
  345. if *b { "t" } else { "f" }
  346. }
  347. pub fn now() -> i64 {
  348. nanos(Utc::now()) as i64
  349. }
  350. /// Serialize the measurement into influx line protocol
  351. /// and append to the buffer.
  352. ///
  353. /// # Examples
  354. ///
  355. /// ```
  356. /// extern crate influent;
  357. /// extern crate logging;
  358. ///
  359. /// use influent::measurement::{Measurement, Value};
  360. /// use std::string::String;
  361. /// use logging::influx::serialize;
  362. ///
  363. /// fn main() {
  364. /// let mut buf = String::new();
  365. /// let mut m = Measurement::new("test");
  366. /// m.add_field("x", Value::Integer(1));
  367. /// serialize(&m, &mut buf);
  368. /// }
  369. ///
  370. /// ```
  371. ///
  372. pub fn serialize(measurement: &Measurement, line: &mut String) {
  373. line.push_str(&escape(measurement.key));
  374. for (tag, value) in measurement.tags.iter() {
  375. line.push_str(",");
  376. line.push_str(&escape(tag));
  377. line.push_str("=");
  378. line.push_str(&escape(value));
  379. }
  380. let mut was_spaced = false;
  381. for (field, value) in measurement.fields.iter() {
  382. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  383. line.push_str(&escape(field));
  384. line.push_str("=");
  385. match value {
  386. &Value::String(ref s) => line.push_str(&as_string(s)),
  387. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  388. &Value::Float(ref f) => line.push_str(&as_float(f)),
  389. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  390. };
  391. }
  392. match measurement.timestamp {
  393. Some(t) => {
  394. line.push_str(" ");
  395. line.push_str(&t.to_string());
  396. }
  397. _ => {}
  398. }
  399. }
  400. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  401. ///
  402. /// The serialized measurement is appended to the end of the string without
  403. /// any regard for what exited in it previously.
  404. ///
  405. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  406. line.push_str(&escape_tag(measurement.key));
  407. let add_tag = |line: &mut String, key: &str, value: &str| {
  408. line.push_str(",");
  409. line.push_str(&escape_tag(key));
  410. line.push_str("=");
  411. line.push_str(&escape(value));
  412. };
  413. for (key, value) in measurement.tags.iter() {
  414. add_tag(line, key, value);
  415. }
  416. let mut fields = measurement.fields.iter();
  417. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  418. if is_first { line.push_str(" "); } else { line.push_str(","); }
  419. line.push_str(&escape_tag(key));
  420. line.push_str("=");
  421. match *value {
  422. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  423. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  424. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  425. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  426. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  427. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  428. };
  429. };
  430. let mut fields = measurement.fields.iter();
  431. // first time separate from tags with space
  432. //
  433. fields.next().map(|kv| {
  434. add_field(line, kv.0, kv.1, true);
  435. });
  436. // then seperate the rest w/ comma
  437. //
  438. for kv in fields {
  439. add_field(line, kv.0, kv.1, false);
  440. }
  441. if let Some(t) = measurement.timestamp {
  442. line.push_str(" ");
  443. line.push_str(&t.to_string());
  444. }
  445. }
  446. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  447. thread::spawn(move || {
  448. let _ = fs::create_dir("/tmp/mm");
  449. let ctx = zmq::Context::new();
  450. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  451. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  452. let client = Client::new();
  453. let mut buf = String::with_capacity(4096);
  454. let mut server_resp = String::with_capacity(4096);
  455. let mut count = 0;
  456. loop {
  457. if let Ok(bytes) = socket.recv_bytes(0) {
  458. if let Ok(msg) = String::from_utf8(bytes) {
  459. count = match count {
  460. 0 => {
  461. buf.push_str(&msg);
  462. 1
  463. }
  464. n @ 1...40 => {
  465. buf.push_str("\n");
  466. buf.push_str(&msg);
  467. n + 1
  468. }
  469. _ => {
  470. buf.push_str("\n");
  471. buf.push_str(&msg);
  472. match client.post(url.clone())
  473. .body(&buf)
  474. .send() {
  475. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  476. Ok(mut resp) => {
  477. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  478. let _ = warnings.send(
  479. Warning::Error(
  480. format!("Influx server: {}", server_resp)));
  481. server_resp.clear();
  482. }
  483. Err(why) => {
  484. let _ = warnings.send(
  485. Warning::Error(
  486. format!("Influx write error: {}", why)));
  487. }
  488. }
  489. buf.clear();
  490. 0
  491. }
  492. }
  493. }
  494. }
  495. }
  496. })
  497. }
  498. #[derive(Debug, Clone, PartialEq)]
  499. pub enum OwnedValue {
  500. String(String),
  501. Float(f64),
  502. Integer(i64),
  503. Boolean(bool),
  504. D128(d128),
  505. Uuid(Uuid),
  506. }
  507. #[derive(Clone, Debug)]
  508. pub struct OwnedMeasurement {
  509. pub key: &'static str,
  510. pub timestamp: Option<i64>,
  511. pub fields: Map<&'static str, OwnedValue>,
  512. pub tags: Map<&'static str, &'static str>,
  513. //pub n_tags: usize,
  514. //pub n_fields: usize,
  515. //pub string_tags: HashMap<&'static str, String>
  516. }
  517. impl OwnedMeasurement {
  518. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  519. OwnedMeasurement {
  520. key,
  521. timestamp: None,
  522. tags: new_map(n_tags),
  523. fields: new_map(n_fields),
  524. //n_tags,
  525. //n_fields,
  526. //string_tags: HashMap::new()
  527. }
  528. }
  529. pub fn new(key: &'static str) -> Self {
  530. OwnedMeasurement::with_capacity(key, 4, 4)
  531. }
  532. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  533. self.tags.insert(key, value);
  534. self
  535. }
  536. // pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
  537. // self.string_tags.insert(key, value);
  538. // self
  539. // }
  540. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  541. self.fields.insert(key, value);
  542. self
  543. }
  544. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  545. self.timestamp = Some(timestamp);
  546. self
  547. }
  548. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  549. *self.tags.entry(key).or_insert(value) = value;
  550. self
  551. }
  552. }
  553. #[cfg(test)]
  554. mod tests {
  555. use super::*;
  556. use test::{black_box, Bencher};
  557. #[test]
  558. fn it_uses_the_new_tag_k_only_shortcut() {
  559. let tag_value = "one";
  560. let color = "red";
  561. let time = now();
  562. let m = measure!(@make_meas test, t(color), t(tag_value), tm(time));
  563. assert_eq!(m.tags.get("color"), Some(&"red"));
  564. assert_eq!(m.tags.get("tag_value"), Some(&"one"));
  565. assert_eq!(m.timestamp, Some(time));
  566. }
  567. #[test]
  568. fn it_uses_measure_macro_parenthesis_syntax() {
  569. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  570. assert_eq!(m.key, "test");
  571. assert_eq!(m.tags.get("a"), Some(&"b"));
  572. assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
  573. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  574. assert_eq!(m.timestamp, Some(1));
  575. }
  576. #[test]
  577. fn it_uses_measure_macro_on_a_self_attribute() {
  578. struct A {
  579. pub influx: InfluxWriter,
  580. }
  581. impl A {
  582. fn f(&self) {
  583. measure!(self.influx, test, t(color, "red"), i(n, 1));
  584. }
  585. }
  586. let a = A { influx: InfluxWriter::default() };
  587. a.f();
  588. }
  589. #[bench]
  590. fn influx_writer_send_basic(b: &mut Bencher) {
  591. let m = InfluxWriter::default();
  592. b.iter(|| {
  593. measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]);
  594. });
  595. }
  596. #[bench]
  597. fn influx_writer_send_price(b: &mut Bencher) {
  598. let m = InfluxWriter::default();
  599. b.iter(|| {
  600. measure!(m, test,
  601. tag[ticker; t!(xmr-btc).to_str()],
  602. tag[exchange; "plnx"],
  603. d128[bid; d128::zero()],
  604. d128[ask; d128::zero()],
  605. );
  606. });
  607. }
  608. #[test]
  609. fn it_checks_color_tag_error_in_non_doctest() {
  610. let (tx, rx) = channel();
  611. measure!(tx, test, tag[color;"red"], int[n;1]);
  612. let meas: OwnedMeasurement = rx.recv().unwrap();
  613. assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
  614. }
  615. #[test]
  616. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  617. let meas = measure!(@make_meas test_measurement,
  618. tag [ one => "a" ],
  619. tag [ two => "b" ],
  620. int [ three => 2 ],
  621. float [ four => 1.2345 ],
  622. string [ five => String::from("d") ],
  623. bool [ six => true ],
  624. int [ seven => { 1 + 2 } ],
  625. time [ 1 ]
  626. );
  627. assert_eq!(meas.key, "test_measurement");
  628. assert_eq!(meas.tags.get("one"), Some(&"a"));
  629. assert_eq!(meas.tags.get("two"), Some(&"b"));
  630. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  631. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  632. assert_eq!(meas.timestamp, Some(1));
  633. }
  634. #[test]
  635. fn it_uses_the_measure_macro() {
  636. let (tx, rx) = channel();
  637. measure!(tx, test_measurement,
  638. tag [ one => "a" ],
  639. tag [ two => "b" ],
  640. int [ three => 2 ],
  641. float [ four => 1.2345 ],
  642. string [ five => String::from("d") ],
  643. bool [ six => true ],
  644. int [ seven => { 1 + 2 } ],
  645. time [ 1 ]
  646. );
  647. thread::sleep_ms(10);
  648. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  649. assert_eq!(meas.key, "test_measurement");
  650. assert_eq!(meas.tags.get("one"), Some(&"a"));
  651. assert_eq!(meas.tags.get("two"), Some(&"b"));
  652. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  653. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  654. assert_eq!(meas.timestamp, Some(1));
  655. }
  656. #[test]
  657. fn it_uses_measure_macro_for_d128_and_uuid() {
  658. let (tx, rx) = channel();
  659. let u = Uuid::new_v4();
  660. let d = d128::zero();
  661. let t = now();
  662. measure!(tx, test_measurement,
  663. tag[one; "a"],
  664. d128[two; d],
  665. uuid[three; u],
  666. time[t]
  667. );
  668. thread::sleep_ms(10);
  669. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  670. assert_eq!(meas.key, "test_measurement");
  671. assert_eq!(meas.tags.get("one"), Some(&"a"));
  672. assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
  673. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
  674. assert_eq!(meas.timestamp, Some(t));
  675. }
  676. #[test]
  677. fn it_uses_the_measure_macro_alt_syntax() {
  678. let (tx, rx) = channel();
  679. measure!(tx, test_measurement,
  680. tag[one; "a"],
  681. tag[two; "b"],
  682. int[three; 2],
  683. float[four; 1.2345],
  684. string[five; String::from("d")],
  685. bool [ six => true ],
  686. int[seven; { 1 + 2 }],
  687. time[1]
  688. );
  689. thread::sleep_ms(10);
  690. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  691. assert_eq!(meas.key, "test_measurement");
  692. assert_eq!(meas.tags.get("one"), Some(&"a"));
  693. assert_eq!(meas.tags.get("two"), Some(&"b"));
  694. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  695. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  696. assert_eq!(meas.timestamp, Some(1));
  697. }
  698. #[test]
  699. fn it_checks_that_fields_are_separated_correctly() {
  700. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  701. assert_eq!(m.key, "test");
  702. assert_eq!(m.tags.get("a"), Some(&"one"));
  703. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  704. let mut buf = String::new();
  705. serialize_owned(&m, &mut buf);
  706. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  707. }
  708. #[test]
  709. fn try_to_break_measure_macro() {
  710. let (tx, _) = channel();
  711. measure!(tx, one, tag[x=>"y"], int[n;1]);
  712. measure!(tx, one, tag[x;"y"], int[n;1],);
  713. struct A {
  714. pub one: i32,
  715. pub two: i32,
  716. }
  717. struct B {
  718. pub a: A
  719. }
  720. let b = B { a: A { one: 1, two: 2 } };
  721. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  722. assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
  723. }
  724. #[bench]
  725. fn measure_macro_small(b: &mut Bencher) {
  726. let (tx, rx) = channel();
  727. let listener = thread::spawn(move || {
  728. loop { if rx.recv().is_err() { break } }
  729. });
  730. b.iter(|| {
  731. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  732. });
  733. }
  734. #[bench]
  735. fn measure_macro_medium(b: &mut Bencher) {
  736. let (tx, rx) = channel();
  737. let listener = thread::spawn(move || {
  738. loop { if rx.recv().is_err() { break } }
  739. });
  740. b.iter(|| {
  741. measure!(tx, test,
  742. tag[color; "red"],
  743. tag[mood => "playful"],
  744. tag [ ticker => "xmr_btc" ],
  745. float[ price => 1.2345 ],
  746. float[ amount => 56.323],
  747. int[n; 1],
  748. time[now()]
  749. );
  750. });
  751. }
  752. #[test]
  753. #[ignore]
  754. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  755. let ctx = zmq::Context::new();
  756. let socket = push(&ctx).unwrap();
  757. let (tx, rx) = channel();
  758. let w = writer(tx.clone());
  759. let mut buf = String::with_capacity(4096);
  760. let mut meas = Measurement::new("rust_test");
  761. meas.add_tag("a", "t");
  762. meas.add_field("c", Value::Float(1.23456));
  763. let now = now();
  764. meas.set_timestamp(now);
  765. serialize(&meas, &mut buf);
  766. socket.send_str(&buf, 0);
  767. drop(w);
  768. }
  769. #[test]
  770. fn it_serializes_a_measurement_in_place() {
  771. let mut buf = String::with_capacity(4096);
  772. let mut meas = Measurement::new("rust_test");
  773. meas.add_tag("a", "b");
  774. meas.add_field("c", Value::Float(1.0));
  775. let now = now();
  776. meas.set_timestamp(now);
  777. serialize(&meas, &mut buf);
  778. let ans = format!("rust_test,a=b c=1 {}", now);
  779. assert_eq!(buf, ans);
  780. }
  781. #[test]
  782. fn it_serializes_a_hard_to_serialize_message() {
  783. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  784. let mut buf = String::new();
  785. let mut server_resp = String::new();
  786. let mut m = Measurement::new("rust_test");
  787. m.add_field("s", Value::String(&raw));
  788. let now = now();
  789. m.set_timestamp(now);
  790. serialize(&m, &mut buf);
  791. println!("{}", buf);
  792. buf.push_str("\n");
  793. let buf_copy = buf.clone();
  794. buf.push_str(&buf_copy);
  795. println!("{}", buf);
  796. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  797. let client = Client::new();
  798. match client.post(url.clone())
  799. .body(&buf)
  800. .send() {
  801. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  802. Ok(mut resp) => {
  803. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  804. panic!("{}", server_resp);
  805. }
  806. Err(why) => {
  807. panic!(why)
  808. }
  809. }
  810. }
  811. #[bench]
  812. fn serialize_owned_longer(b: &mut Bencher) {
  813. let mut buf = String::with_capacity(1024);
  814. let m =
  815. OwnedMeasurement::new("test")
  816. .add_tag("one", "a")
  817. .add_tag("two", "b")
  818. .add_tag("ticker", "xmr_btc")
  819. .add_tag("exchange", "plnx")
  820. .add_tag("side", "bid")
  821. .add_field("three", OwnedValue::Float(1.2345))
  822. .add_field("four", OwnedValue::Integer(57))
  823. .add_field("five", OwnedValue::Boolean(true))
  824. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  825. .set_timestamp(now());
  826. b.iter(|| {
  827. serialize_owned(&m, &mut buf);
  828. buf.clear()
  829. });
  830. }
  831. #[bench]
  832. fn serialize_owned_simple(b: &mut Bencher) {
  833. let mut buf = String::with_capacity(1024);
  834. let m =
  835. OwnedMeasurement::new("test")
  836. .add_tag("one", "a")
  837. .add_tag("two", "b")
  838. .add_field("three", OwnedValue::Float(1.2345))
  839. .add_field("four", OwnedValue::Integer(57))
  840. .set_timestamp(now());
  841. b.iter(|| {
  842. serialize_owned(&m, &mut buf);
  843. buf.clear()
  844. });
  845. }
  846. #[test]
  847. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  848. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  849. let mut buf = String::new();
  850. let mut server_resp = String::new();
  851. let mut m = OwnedMeasurement::new("rust_test")
  852. .add_field("s", OwnedValue::String(raw.to_string()))
  853. .set_timestamp(now());
  854. serialize_owned(&m, &mut buf);
  855. println!("{}", buf);
  856. buf.push_str("\n");
  857. let buf_copy = buf.clone();
  858. buf.push_str(&buf_copy);
  859. println!("{}", buf);
  860. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  861. let client = Client::new();
  862. match client.post(url.clone())
  863. .body(&buf)
  864. .send() {
  865. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  866. Ok(mut resp) => {
  867. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  868. panic!("{}", server_resp);
  869. }
  870. Err(why) => {
  871. panic!(why)
  872. }
  873. }
  874. }
  875. // macro_rules! make_measurement {
  876. // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  877. // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  878. // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  879. // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  880. // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
  881. // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
  882. // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  883. // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
  884. //
  885. // (@count_tags) => {0usize};
  886. // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  887. // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  888. //
  889. // (@count_fields) => {0usize};
  890. // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  891. // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  892. // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  893. //
  894. // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  895. // let n_tags = measure!(@count_tags $($t)*);
  896. // let n_fields = measure!(@count_fields $($t)*);
  897. // let mut meas =
  898. // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  899. // $(
  900. // measure!(@kv $t, meas, $($tail)*);
  901. // )*
  902. // //let _ = $m.send(meas);
  903. // meas
  904. // }};
  905. // }
  906. //
  907. // #[test]
  908. // fn it_checks_n_tags_is_correct() {
  909. // let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
  910. // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
  911. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
  912. // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
  913. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
  914. //
  915. // let m4 =
  916. // make_measurement!(tx, test,
  917. // tag[a;"b"],
  918. // tag[c;"d"],
  919. // int[n; 1],
  920. // tag[e;"f"],
  921. // float[x; 1.234],
  922. // tag[g;"h"],
  923. // time[1],
  924. // );
  925. // assert_eq!(m4.n_tags, 4);
  926. // assert_eq!(m4.n_fields, 2);
  927. // }
  928. }