You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1011 lines
35KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  6. use std::thread;
  7. use std::fs::{self, OpenOptions};
  8. use std::time::Duration;
  9. use std::hash::{Hash, BuildHasherDefault};
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. use chrono::{DateTime, Utc, TimeZone};
  17. use sloggers::types::Severity;
  18. use ordermap::OrderMap;
  19. use fnv::FnvHasher;
  20. use decimal::d128;
  21. use uuid::Uuid;
  22. use money::Ticker;
  23. use super::{nanos, file_logger};
  24. use warnings::Warning;
  25. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  26. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  27. const DB_NAME: &'static str = "mm";
  28. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  29. //const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  30. const ZMQ_RCV_HWM: i32 = 0;
  31. const ZMQ_SND_HWM: i32 = 0;
  32. const BUFFER_SIZE: u8 = 80;
  33. pub use super::{dur_nanos, dt_nanos};
  34. pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;
  35. pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
  36. Map::with_capacity_and_hasher(capacity, Default::default())
  37. }
  38. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
  39. ///
  40. /// The macro both creates an `OwnedMeasurement` from the supplied tags and
  41. /// values, as well as sends it with the `Sender`.
  42. ///
  43. /// Benchmarks show around 600ns for a small measurement and 1u for a medium-sized
  44. /// measurement (see `tests` mod).
  45. ///
  46. /// # Examples
  47. ///
  48. /// ```
  49. /// #[macro_use] extern crate logging;
  50. /// extern crate decimal;
  51. ///
  52. /// use std::sync::mpsc::channel;
  53. /// use decimal::d128;
  54. /// use logging::influx::*;
  55. ///
  56. /// fn main() {
  57. /// let (tx, rx) = channel();
  58. ///
  59. /// // "shorthand" syntax
  60. ///
  61. /// measure!(tx, test, tag[color;"red"], int[n;1]);
  62. ///
  63. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  64. ///
  65. /// assert_eq!(meas.key, "test");
  66. /// assert_eq!(meas.tags.get("color"), Some(&"red"));
  67. /// assert_eq!(meas.fields.get("n"), Some(&OwnedValue::Integer(1)));
  68. ///
  69. /// // alternate syntax ...
  70. ///
  71. /// measure!(tx, test,
  72. /// tag [ one => "a" ],
  73. /// tag [ two => "b" ],
  74. /// int [ three => 2 ],
  75. /// float [ four => 1.2345 ],
  76. /// string [ five => String::from("d") ],
  77. /// bool [ six => true ],
  78. /// int [ seven => { 1 + 2 } ],
  79. /// time [ 1 ]
  80. /// );
  81. ///
  82. /// let meas: OwnedMeasurement = rx.recv().unwrap();
  83. ///
  84. /// assert_eq!(meas.key, "test");
  85. /// assert_eq!(meas.tags.get("one"), Some(&"a"));
  86. /// assert_eq!(meas.tags.get("two"), Some(&"b"));
  87. /// assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  88. /// assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  89. /// assert_eq!(meas.timestamp, Some(1));
  90. ///
  91. /// // use the @make_meas flag to skip sending a measurement, instead merely
  92. /// // creating it.
  93. ///
  94. /// let meas: OwnedMeasurement = measure!(@make_meas meas_only, tag[color; "red"], int[n; 1]);
  95. ///
  96. /// // each variant also has shorthand aliases
  97. ///
  98. /// let meas: OwnedMeasurement =
  99. /// measure!(@make_meas abcd, t[color; "red"], i[n; 1], d[price; d128::zero()]);
  100. /// }
  101. /// ```
  102. ///
  103. #[macro_export]
  104. macro_rules! measure {
  105. (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  106. (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  107. (@kv $t:tt, $meas:ident, $k:tt, $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  108. (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  109. (@kv tm, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  110. (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  111. (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  112. (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  113. (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v as i64)) };
  114. (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  115. (@ea f, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v as f64)) };
  116. (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  117. (@ea s, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  118. (@ea d128, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  119. (@ea d, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::D128($v)) };
  120. (@ea uuid, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  121. (@ea u, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Uuid($v)) };
  122. (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v as bool)) };
  123. (@count_tags) => {0usize};
  124. (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  125. (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  126. (@count_fields) => {0usize};
  127. (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  128. (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  129. (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  130. (@make_meas $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  131. measure!(@make_meas $name, $( $t [ $($tail)* ] ),*)
  132. };
  133. (@make_meas $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  134. let n_tags = measure!(@count_tags $($t)*);
  135. let n_fields = measure!(@count_fields $($t)*);
  136. let mut meas =
  137. $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  138. $(
  139. measure!(@kv $t, meas, $($tail)*);
  140. )*
  141. meas
  142. }};
  143. ($m:tt, $name:tt, $( $t:tt ( $($tail:tt)* ) ),+ $(,)*) => {
  144. measure!($m, $name, $($t [ $($tail)* ] ),+)
  145. };
  146. ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  147. let measurement = measure!(@make_meas $name, $( $t [ $($tail)* ] ),*);
  148. let _ = $m.send(measurement);
  149. }};
  150. }
  151. /// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
  152. /// it receives (over a SPSC channel) and inserts to influxdb via http when `BUFFER_SIZE`
  153. /// measurements have accumulated.
  154. ///
  155. pub struct InfluxWriter {
  156. host: &'static str,
  157. db: &'static str,
  158. tx: Sender<OwnedMeasurement>,
  159. kill_switch: Sender<()>,
  160. thread: Option<thread::JoinHandle<()>>,
  161. }
  162. impl Default for InfluxWriter {
  163. fn default() -> Self {
  164. InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log", BUFFER_SIZE)
  165. }
  166. }
  167. impl InfluxWriter {
  168. /// Sends the `OwnedMeasurement` to the serialization thread.
  169. ///
  170. pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<OwnedMeasurement>> {
  171. self.tx.send(m)
  172. }
  173. pub fn tx(&self) -> Sender<OwnedMeasurement> {
  174. self.tx.clone()
  175. }
  176. pub fn new(host: &'static str, db: &'static str, log_path: &str, buffer_size: u8) -> Self {
  177. let (kill_switch, terminate) = channel();
  178. let (tx, rx) = channel();
  179. let logger = file_logger(log_path, Severity::Info);
  180. let thread = thread::spawn(move || {
  181. info!(logger, "initializing url";
  182. "DB_HOST" => host,
  183. "DB_NAME" => db);
  184. let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
  185. let client = Client::new();
  186. info!(logger, "initializing buffers");
  187. let mut meas_buf = String::with_capacity(32 * 32 * 32);
  188. let mut buf = String::with_capacity(32 * 32 * 32);
  189. let mut count = 0;
  190. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  191. trace!(logger, "appending serialized measurement to buffer";
  192. "prev" => prev,
  193. "buf.len()" => buf.len());
  194. match prev {
  195. 0 => {
  196. buf.push_str(s);
  197. 1
  198. }
  199. n if n < buffer_size => {
  200. buf.push_str("\n");
  201. buf.push_str(s);
  202. n + 1
  203. }
  204. _ => {
  205. buf.push_str("\n");
  206. if s.len() > 0 {
  207. buf.push_str(s);
  208. }
  209. trace!(logger, "sending buffer to influx";
  210. "buf.len()" => buf.len());
  211. #[cfg(not(test))]
  212. {
  213. let resp = client.post(url.clone())
  214. .body(buf.as_str())
  215. .send();
  216. match resp {
  217. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  218. trace!(logger, "server responded ok: 204 NoContent");
  219. }
  220. Ok(mut resp) => {
  221. let mut server_resp = String::with_capacity(1024);
  222. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  223. error!(logger, "influx server error";
  224. "status" => resp.status.to_string(),
  225. "body" => server_resp);
  226. }
  227. Err(why) => {
  228. error!(logger, "http request failed: {:?}", why);
  229. }
  230. }
  231. }
  232. buf.clear();
  233. 0
  234. }
  235. }
  236. };
  237. let mut rcvd_msg = false;
  238. loop {
  239. rcvd_msg = false;
  240. rx.recv_timeout(Duration::from_millis(10))
  241. .map(|mut meas: OwnedMeasurement| {
  242. // if we didn't set the timestamp, it would end up
  243. // being whenever we accumulated `BUFFER_SIZE` messages,
  244. // which might be some period of time after we received
  245. // the message.
  246. //
  247. if meas.timestamp.is_none() {
  248. meas.timestamp = Some(now());
  249. }
  250. trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
  251. serialize_owned(&meas, &mut meas_buf);
  252. count = next(count, &meas_buf, &mut buf);
  253. meas_buf.clear();
  254. rcvd_msg = true;
  255. });
  256. let end = terminate.try_recv()
  257. .map(|_| {
  258. let _ = next(::std::u8::MAX, "", &mut buf);
  259. true
  260. }).unwrap_or(false);
  261. if end { break }
  262. #[cfg(feature = "no-thrash")]
  263. {
  264. if !rcvd_msg {
  265. thread::sleep(Duration::new(0, 5000));
  266. }
  267. }
  268. }
  269. crit!(logger, "goodbye");
  270. });
  271. InfluxWriter {
  272. host,
  273. db,
  274. tx,
  275. kill_switch,
  276. thread: Some(thread)
  277. }
  278. }
  279. }
  280. impl Drop for InfluxWriter {
  281. fn drop(&mut self) {
  282. let _ = self.kill_switch.send(()).unwrap();
  283. if let Some(thread) = self.thread.take() {
  284. let _ = thread.join();
  285. }
  286. }
  287. }
  288. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  289. let socket = ctx.socket(zmq::PULL)?;
  290. socket.bind(WRITER_ADDR)?;
  291. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  292. Ok(socket)
  293. }
  294. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  295. let socket = ctx.socket(zmq::PUSH)?;
  296. socket.connect(WRITER_ADDR)?;
  297. socket.set_sndhwm(ZMQ_SND_HWM)?;
  298. Ok(socket)
  299. }
  300. /// This removes offending things rather than escaping them.
  301. ///
  302. fn escape_tag(s: &str) -> String {
  303. s.replace(" ", "")
  304. .replace(",", "")
  305. .replace("\"", "")
  306. }
  307. fn escape(s: &str) -> String {
  308. s.replace(" ", "\\ ")
  309. .replace(",", "\\,")
  310. }
  311. fn as_string(s: &str) -> String {
  312. // the second replace removes double escapes
  313. //
  314. format!("\"{}\"", s.replace("\"", "\\\"")
  315. .replace(r#"\\""#, r#"\""#))
  316. }
  317. #[test]
  318. fn it_checks_as_string_does_not_double_escape() {
  319. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  320. let escaped = as_string(&raw);
  321. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  322. }
  323. fn as_integer(i: &i64) -> String {
  324. format!("{}i", i)
  325. }
  326. fn as_float(f: &f64) -> String {
  327. f.to_string()
  328. }
  329. fn as_boolean(b: &bool) -> &str {
  330. if *b { "t" } else { "f" }
  331. }
  332. pub fn now() -> i64 {
  333. nanos(Utc::now()) as i64
  334. }
  335. /// Serialize the measurement into influx line protocol
  336. /// and append to the buffer.
  337. ///
  338. /// # Examples
  339. ///
  340. /// ```
  341. /// extern crate influent;
  342. /// extern crate logging;
  343. ///
  344. /// use influent::measurement::{Measurement, Value};
  345. /// use std::string::String;
  346. /// use logging::influx::serialize;
  347. ///
  348. /// fn main() {
  349. /// let mut buf = String::new();
  350. /// let mut m = Measurement::new("test");
  351. /// m.add_field("x", Value::Integer(1));
  352. /// serialize(&m, &mut buf);
  353. /// }
  354. ///
  355. /// ```
  356. ///
  357. pub fn serialize(measurement: &Measurement, line: &mut String) {
  358. line.push_str(&escape(measurement.key));
  359. for (tag, value) in measurement.tags.iter() {
  360. line.push_str(",");
  361. line.push_str(&escape(tag));
  362. line.push_str("=");
  363. line.push_str(&escape(value));
  364. }
  365. let mut was_spaced = false;
  366. for (field, value) in measurement.fields.iter() {
  367. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  368. line.push_str(&escape(field));
  369. line.push_str("=");
  370. match value {
  371. &Value::String(ref s) => line.push_str(&as_string(s)),
  372. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  373. &Value::Float(ref f) => line.push_str(&as_float(f)),
  374. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  375. };
  376. }
  377. match measurement.timestamp {
  378. Some(t) => {
  379. line.push_str(" ");
  380. line.push_str(&t.to_string());
  381. }
  382. _ => {}
  383. }
  384. }
  385. /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
  386. ///
  387. /// The serialized measurement is appended to the end of the string without
  388. /// any regard for what exited in it previously.
  389. ///
  390. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  391. line.push_str(&escape_tag(measurement.key));
  392. let add_tag = |line: &mut String, key: &str, value: &str| {
  393. line.push_str(",");
  394. line.push_str(&escape_tag(key));
  395. line.push_str("=");
  396. line.push_str(&escape(value));
  397. };
  398. for (key, value) in measurement.tags.iter() {
  399. add_tag(line, key, value);
  400. }
  401. let mut fields = measurement.fields.iter();
  402. let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| {
  403. if is_first { line.push_str(" "); } else { line.push_str(","); }
  404. line.push_str(&escape_tag(key));
  405. line.push_str("=");
  406. match *value {
  407. OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  408. OwnedValue::Integer(ref i) => line.push_str(&format!("{}i", i)),
  409. OwnedValue::Float(ref f) => line.push_str(&format!("{}", f)),
  410. OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b)),
  411. OwnedValue::D128(ref d) => line.push_str(&format!("{}", d)),
  412. OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", &u.to_string()[..8])),
  413. };
  414. };
  415. let mut fields = measurement.fields.iter();
  416. // first time separate from tags with space
  417. //
  418. fields.next().map(|kv| {
  419. add_field(line, kv.0, kv.1, true);
  420. });
  421. // then seperate the rest w/ comma
  422. //
  423. for kv in fields {
  424. add_field(line, kv.0, kv.1, false);
  425. }
  426. if let Some(t) = measurement.timestamp {
  427. line.push_str(" ");
  428. line.push_str(&t.to_string());
  429. }
  430. }
  431. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  432. thread::spawn(move || {
  433. let _ = fs::create_dir("/tmp/mm");
  434. let ctx = zmq::Context::new();
  435. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  436. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  437. let client = Client::new();
  438. let mut buf = String::with_capacity(4096);
  439. let mut server_resp = String::with_capacity(4096);
  440. let mut count = 0;
  441. loop {
  442. if let Ok(bytes) = socket.recv_bytes(0) {
  443. if let Ok(msg) = String::from_utf8(bytes) {
  444. count = match count {
  445. 0 => {
  446. buf.push_str(&msg);
  447. 1
  448. }
  449. n @ 1...40 => {
  450. buf.push_str("\n");
  451. buf.push_str(&msg);
  452. n + 1
  453. }
  454. _ => {
  455. buf.push_str("\n");
  456. buf.push_str(&msg);
  457. match client.post(url.clone())
  458. .body(&buf)
  459. .send() {
  460. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  461. Ok(mut resp) => {
  462. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  463. let _ = warnings.send(
  464. Warning::Error(
  465. format!("Influx server: {}", server_resp)));
  466. server_resp.clear();
  467. }
  468. Err(why) => {
  469. let _ = warnings.send(
  470. Warning::Error(
  471. format!("Influx write error: {}", why)));
  472. }
  473. }
  474. buf.clear();
  475. 0
  476. }
  477. }
  478. }
  479. }
  480. }
  481. })
  482. }
  483. #[derive(Debug, Clone, PartialEq)]
  484. pub enum OwnedValue {
  485. String(String),
  486. Float(f64),
  487. Integer(i64),
  488. Boolean(bool),
  489. D128(d128),
  490. Uuid(Uuid),
  491. }
  492. #[derive(Clone, Debug)]
  493. pub struct OwnedMeasurement {
  494. pub key: &'static str,
  495. pub timestamp: Option<i64>,
  496. pub fields: Map<&'static str, OwnedValue>,
  497. pub tags: Map<&'static str, &'static str>,
  498. //pub n_tags: usize,
  499. //pub n_fields: usize,
  500. //pub string_tags: HashMap<&'static str, String>
  501. }
  502. impl OwnedMeasurement {
  503. pub fn with_capacity(key: &'static str, n_tags: usize, n_fields: usize) -> Self {
  504. OwnedMeasurement {
  505. key,
  506. timestamp: None,
  507. tags: new_map(n_tags),
  508. fields: new_map(n_fields),
  509. //n_tags,
  510. //n_fields,
  511. //string_tags: HashMap::new()
  512. }
  513. }
  514. pub fn new(key: &'static str) -> Self {
  515. OwnedMeasurement::with_capacity(key, 4, 4)
  516. }
  517. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  518. self.tags.insert(key, value);
  519. self
  520. }
  521. // pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
  522. // self.string_tags.insert(key, value);
  523. // self
  524. // }
  525. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  526. self.fields.insert(key, value);
  527. self
  528. }
  529. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  530. self.timestamp = Some(timestamp);
  531. self
  532. }
  533. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  534. *self.tags.entry(key).or_insert(value) = value;
  535. self
  536. }
  537. }
  538. #[cfg(test)]
  539. mod tests {
  540. use super::*;
  541. use test::{black_box, Bencher};
  542. #[test]
  543. fn it_uses_measure_macro_parenthesis_syntax() {
  544. let m = measure!(@make_meas test, t(a,"b"), i(n,1), f(x,1.1), tm(1));
  545. assert_eq!(m.key, "test");
  546. assert_eq!(m.tags.get("a"), Some(&"b"));
  547. assert_eq!(m.fields.get("n"), Some(&OwnedValue::Integer(1)));
  548. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  549. assert_eq!(m.timestamp, Some(1));
  550. }
  551. #[bench]
  552. fn influx_writer_send_basic(b: &mut Bencher) {
  553. let m = InfluxWriter::default();
  554. b.iter(|| {
  555. measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]);
  556. });
  557. }
  558. #[bench]
  559. fn influx_writer_send_price(b: &mut Bencher) {
  560. let m = InfluxWriter::default();
  561. b.iter(|| {
  562. measure!(m, test,
  563. tag[ticker; t!(xmr-btc).to_str()],
  564. tag[exchange; "plnx"],
  565. d128[bid; d128::zero()],
  566. d128[ask; d128::zero()],
  567. );
  568. });
  569. }
  570. #[test]
  571. fn it_checks_color_tag_error_in_non_doctest() {
  572. let (tx, rx) = channel();
  573. measure!(tx, test, tag[color;"red"], int[n;1]);
  574. let meas: OwnedMeasurement = rx.recv().unwrap();
  575. assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
  576. }
  577. #[test]
  578. fn it_uses_the_make_meas_pattern_of_the_measure_macro() {
  579. let meas = measure!(@make_meas test_measurement,
  580. tag [ one => "a" ],
  581. tag [ two => "b" ],
  582. int [ three => 2 ],
  583. float [ four => 1.2345 ],
  584. string [ five => String::from("d") ],
  585. bool [ six => true ],
  586. int [ seven => { 1 + 2 } ],
  587. time [ 1 ]
  588. );
  589. assert_eq!(meas.key, "test_measurement");
  590. assert_eq!(meas.tags.get("one"), Some(&"a"));
  591. assert_eq!(meas.tags.get("two"), Some(&"b"));
  592. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  593. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  594. assert_eq!(meas.timestamp, Some(1));
  595. }
  596. #[test]
  597. fn it_uses_the_measure_macro() {
  598. let (tx, rx) = channel();
  599. measure!(tx, test_measurement,
  600. tag [ one => "a" ],
  601. tag [ two => "b" ],
  602. int [ three => 2 ],
  603. float [ four => 1.2345 ],
  604. string [ five => String::from("d") ],
  605. bool [ six => true ],
  606. int [ seven => { 1 + 2 } ],
  607. time [ 1 ]
  608. );
  609. thread::sleep_ms(10);
  610. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  611. assert_eq!(meas.key, "test_measurement");
  612. assert_eq!(meas.tags.get("one"), Some(&"a"));
  613. assert_eq!(meas.tags.get("two"), Some(&"b"));
  614. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  615. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  616. assert_eq!(meas.timestamp, Some(1));
  617. }
  618. #[test]
  619. fn it_uses_measure_macro_for_d128_and_uuid() {
  620. let (tx, rx) = channel();
  621. let u = Uuid::new_v4();
  622. let d = d128::zero();
  623. let t = now();
  624. measure!(tx, test_measurement,
  625. tag[one; "a"],
  626. d128[two; d],
  627. uuid[three; u],
  628. time[t]
  629. );
  630. thread::sleep_ms(10);
  631. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  632. assert_eq!(meas.key, "test_measurement");
  633. assert_eq!(meas.tags.get("one"), Some(&"a"));
  634. assert_eq!(meas.fields.get("two"), Some(&OwnedValue::D128(d128::zero())));
  635. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Uuid(u)));
  636. assert_eq!(meas.timestamp, Some(t));
  637. }
  638. #[test]
  639. fn it_uses_the_measure_macro_alt_syntax() {
  640. let (tx, rx) = channel();
  641. measure!(tx, test_measurement,
  642. tag[one; "a"],
  643. tag[two; "b"],
  644. int[three; 2],
  645. float[four; 1.2345],
  646. string[five; String::from("d")],
  647. bool [ six => true ],
  648. int[seven; { 1 + 2 }],
  649. time[1]
  650. );
  651. thread::sleep_ms(10);
  652. let meas: OwnedMeasurement = rx.try_recv().unwrap();
  653. assert_eq!(meas.key, "test_measurement");
  654. assert_eq!(meas.tags.get("one"), Some(&"a"));
  655. assert_eq!(meas.tags.get("two"), Some(&"b"));
  656. assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
  657. assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
  658. assert_eq!(meas.timestamp, Some(1));
  659. }
  660. #[test]
  661. fn it_checks_that_fields_are_separated_correctly() {
  662. let m = measure!(@make_meas test, t[a; "one"], t[b; "two"], f[x; 1.1], f[y; -1.1]);
  663. assert_eq!(m.key, "test");
  664. assert_eq!(m.tags.get("a"), Some(&"one"));
  665. assert_eq!(m.fields.get("x"), Some(&OwnedValue::Float(1.1)));
  666. let mut buf = String::new();
  667. serialize_owned(&m, &mut buf);
  668. assert!(buf.contains("b=two x=1.1,y=-1.1"), "buf = {}", buf);
  669. }
  670. #[test]
  671. fn try_to_break_measure_macro() {
  672. let (tx, _) = channel();
  673. measure!(tx, one, tag[x=>"y"], int[n;1]);
  674. measure!(tx, one, tag[x;"y"], int[n;1],);
  675. struct A {
  676. pub one: i32,
  677. pub two: i32,
  678. }
  679. struct B {
  680. pub a: A
  681. }
  682. let b = B { a: A { one: 1, two: 2 } };
  683. let m = measure!(@make_meas test, t(name, "a"), i(a, b.a.one));
  684. assert_eq!(m.fields.get("a"), Some(&OwnedValue::Integer(1)));
  685. }
  686. #[bench]
  687. fn measure_macro_small(b: &mut Bencher) {
  688. let (tx, rx) = channel();
  689. let listener = thread::spawn(move || {
  690. loop { if rx.recv().is_err() { break } }
  691. });
  692. b.iter(|| {
  693. measure!(tx, test, tag[color; "red"], int[n; 1], time[now()]);
  694. });
  695. }
  696. #[bench]
  697. fn measure_macro_medium(b: &mut Bencher) {
  698. let (tx, rx) = channel();
  699. let listener = thread::spawn(move || {
  700. loop { if rx.recv().is_err() { break } }
  701. });
  702. b.iter(|| {
  703. measure!(tx, test,
  704. tag[color; "red"],
  705. tag[mood => "playful"],
  706. tag [ ticker => "xmr_btc" ],
  707. float[ price => 1.2345 ],
  708. float[ amount => 56.323],
  709. int[n; 1],
  710. time[now()]
  711. );
  712. });
  713. }
  714. #[test]
  715. #[ignore]
  716. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  717. let ctx = zmq::Context::new();
  718. let socket = push(&ctx).unwrap();
  719. let (tx, rx) = channel();
  720. let w = writer(tx.clone());
  721. let mut buf = String::with_capacity(4096);
  722. let mut meas = Measurement::new("rust_test");
  723. meas.add_tag("a", "t");
  724. meas.add_field("c", Value::Float(1.23456));
  725. let now = now();
  726. meas.set_timestamp(now);
  727. serialize(&meas, &mut buf);
  728. socket.send_str(&buf, 0);
  729. drop(w);
  730. }
  731. #[test]
  732. fn it_serializes_a_measurement_in_place() {
  733. let mut buf = String::with_capacity(4096);
  734. let mut meas = Measurement::new("rust_test");
  735. meas.add_tag("a", "b");
  736. meas.add_field("c", Value::Float(1.0));
  737. let now = now();
  738. meas.set_timestamp(now);
  739. serialize(&meas, &mut buf);
  740. let ans = format!("rust_test,a=b c=1 {}", now);
  741. assert_eq!(buf, ans);
  742. }
  743. #[test]
  744. fn it_serializes_a_hard_to_serialize_message() {
  745. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  746. let mut buf = String::new();
  747. let mut server_resp = String::new();
  748. let mut m = Measurement::new("rust_test");
  749. m.add_field("s", Value::String(&raw));
  750. let now = now();
  751. m.set_timestamp(now);
  752. serialize(&m, &mut buf);
  753. println!("{}", buf);
  754. buf.push_str("\n");
  755. let buf_copy = buf.clone();
  756. buf.push_str(&buf_copy);
  757. println!("{}", buf);
  758. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  759. let client = Client::new();
  760. match client.post(url.clone())
  761. .body(&buf)
  762. .send() {
  763. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  764. Ok(mut resp) => {
  765. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  766. panic!("{}", server_resp);
  767. }
  768. Err(why) => {
  769. panic!(why)
  770. }
  771. }
  772. }
  773. #[bench]
  774. fn serialize_owned_longer(b: &mut Bencher) {
  775. let mut buf = String::with_capacity(1024);
  776. let m =
  777. OwnedMeasurement::new("test")
  778. .add_tag("one", "a")
  779. .add_tag("two", "b")
  780. .add_tag("ticker", "xmr_btc")
  781. .add_tag("exchange", "plnx")
  782. .add_tag("side", "bid")
  783. .add_field("three", OwnedValue::Float(1.2345))
  784. .add_field("four", OwnedValue::Integer(57))
  785. .add_field("five", OwnedValue::Boolean(true))
  786. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  787. .set_timestamp(now());
  788. b.iter(|| {
  789. serialize_owned(&m, &mut buf);
  790. buf.clear()
  791. });
  792. }
  793. #[bench]
  794. fn serialize_owned_simple(b: &mut Bencher) {
  795. let mut buf = String::with_capacity(1024);
  796. let m =
  797. OwnedMeasurement::new("test")
  798. .add_tag("one", "a")
  799. .add_tag("two", "b")
  800. .add_field("three", OwnedValue::Float(1.2345))
  801. .add_field("four", OwnedValue::Integer(57))
  802. .set_timestamp(now());
  803. b.iter(|| {
  804. serialize_owned(&m, &mut buf);
  805. buf.clear()
  806. });
  807. }
  808. #[test]
  809. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  810. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  811. let mut buf = String::new();
  812. let mut server_resp = String::new();
  813. let mut m = OwnedMeasurement::new("rust_test")
  814. .add_field("s", OwnedValue::String(raw.to_string()))
  815. .set_timestamp(now());
  816. serialize_owned(&m, &mut buf);
  817. println!("{}", buf);
  818. buf.push_str("\n");
  819. let buf_copy = buf.clone();
  820. buf.push_str(&buf_copy);
  821. println!("{}", buf);
  822. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  823. let client = Client::new();
  824. match client.post(url.clone())
  825. .body(&buf)
  826. .send() {
  827. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  828. Ok(mut resp) => {
  829. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  830. panic!("{}", server_resp);
  831. }
  832. Err(why) => {
  833. panic!(why)
  834. }
  835. }
  836. }
  837. // macro_rules! make_measurement {
  838. // (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  839. // (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
  840. // (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };
  841. // (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
  842. // (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
  843. // (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
  844. // (@ea string, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::String($v)) };
  845. // (@ea bool, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Boolean($v)) };
  846. //
  847. // (@count_tags) => {0usize};
  848. // (@count_tags tag $($tail:tt)*) => {1usize + measure!(@count_tags $($tail)*)};
  849. // (@count_tags $t:tt $($tail:tt)*) => {0usize + measure!(@count_tags $($tail)*)};
  850. //
  851. // (@count_fields) => {0usize};
  852. // (@count_fields tag $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  853. // (@count_fields time $($tail:tt)*) => {0usize + measure!(@count_fields $($tail)*)};
  854. // (@count_fields $t:tt $($tail:tt)*) => {1usize + measure!(@count_fields $($tail)*)};
  855. //
  856. // ($m:tt, $name:tt, $( $t:tt [ $($tail:tt)* ] ),+ $(,)*) => {{
  857. // let n_tags = measure!(@count_tags $($t)*);
  858. // let n_fields = measure!(@count_fields $($t)*);
  859. // let mut meas =
  860. // $crate::influx::OwnedMeasurement::with_capacity(stringify!($name), n_tags, n_fields);
  861. // $(
  862. // measure!(@kv $t, meas, $($tail)*);
  863. // )*
  864. // //let _ = $m.send(meas);
  865. // meas
  866. // }};
  867. // }
  868. //
  869. // #[test]
  870. // fn it_checks_n_tags_is_correct() {
  871. // let (tx, _): (Sender<OwnedMeasurement>, Receiver<OwnedMeasurement>) = channel();
  872. // assert_eq!(make_measurement!(tx, test, tag[a;"b"]).n_tags, 1);
  873. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_tags, 2);
  874. // assert_eq!(make_measurement!(tx, test, int[a;1]).n_tags, 0);
  875. // assert_eq!(make_measurement!(tx, test, tag[a;"b"], tag[c;"d"]).n_fields, 0);
  876. //
  877. // let m4 =
  878. // make_measurement!(tx, test,
  879. // tag[a;"b"],
  880. // tag[c;"d"],
  881. // int[n; 1],
  882. // tag[e;"f"],
  883. // float[x; 1.234],
  884. // tag[g;"h"],
  885. // time[1],
  886. // );
  887. // assert_eq!(m4.n_tags, 4);
  888. // assert_eq!(m4.n_fields, 2);
  889. // }
  890. }