You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

606 lines
19KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel};
  6. use std::thread;
  7. use std::collections::HashMap;
  8. use std::fs::{self, OpenOptions};
  9. use std::time::Duration;
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. use chrono::{DateTime, Utc, TimeZone};
  17. use sloggers::types::Severity;
  18. use shuteye;
  19. use super::{nanos, file_logger};
  20. use warnings::Warning;
  21. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  22. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  23. const DB_NAME: &'static str = "mm";
  24. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  25. //const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  26. const ZMQ_RCV_HWM: i32 = 0;
  27. const ZMQ_SND_HWM: i32 = 0;
  28. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  29. let socket = ctx.socket(zmq::PULL)?;
  30. socket.bind(WRITER_ADDR)?;
  31. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  32. Ok(socket)
  33. }
  34. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  35. let socket = ctx.socket(zmq::PUSH)?;
  36. socket.connect(WRITER_ADDR)?;
  37. socket.set_sndhwm(ZMQ_SND_HWM)?;
  38. Ok(socket)
  39. }
  40. fn escape(s: &str) -> String {
  41. s.replace(" ", "\\ ")
  42. .replace(",", "\\,")
  43. }
  44. fn as_string(s: &str) -> String {
  45. // the second replace removes double escapes
  46. //
  47. format!("\"{}\"", s.replace("\"", "\\\"")
  48. .replace(r#"\\""#, r#"\""#))
  49. }
  50. #[test]
  51. fn it_checks_as_string_does_not_double_escape() {
  52. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  53. let escaped = as_string(&raw);
  54. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  55. }
  56. fn as_integer(i: &i64) -> String {
  57. format!("{}i", i)
  58. }
  59. fn as_float(f: &f64) -> String {
  60. f.to_string()
  61. }
  62. fn as_boolean(b: &bool) -> &str {
  63. if *b { "t" } else { "f" }
  64. }
  65. pub fn now() -> i64 {
  66. nanos(Utc::now()) as i64
  67. }
  68. /// Serialize the measurement into influx line protocol
  69. /// and append to the buffer.
  70. ///
  71. /// # Examples
  72. ///
  73. /// ```
  74. /// extern crate influent;
  75. /// extern crate logging;
  76. ///
  77. /// use influent::measurement::{Measurement, Value};
  78. /// use std::string::String;
  79. /// use logging::influx::serialize;
  80. ///
  81. /// fn main() {
  82. /// let mut buf = String::new();
  83. /// let mut m = Measurement::new("test");
  84. /// m.add_field("x", Value::Integer(1));
  85. /// serialize(&m, &mut buf);
  86. /// }
  87. ///
  88. /// ```
  89. ///
  90. pub fn serialize(measurement: &Measurement, line: &mut String) {
  91. line.push_str(&escape(measurement.key));
  92. for (tag, value) in measurement.tags.iter() {
  93. line.push_str(",");
  94. line.push_str(&escape(tag));
  95. line.push_str("=");
  96. line.push_str(&escape(value));
  97. }
  98. let mut was_spaced = false;
  99. for (field, value) in measurement.fields.iter() {
  100. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  101. line.push_str(&escape(field));
  102. line.push_str("=");
  103. match value {
  104. &Value::String(ref s) => line.push_str(&as_string(s)),
  105. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  106. &Value::Float(ref f) => line.push_str(&as_float(f)),
  107. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  108. };
  109. }
  110. match measurement.timestamp {
  111. Some(t) => {
  112. line.push_str(" ");
  113. line.push_str(&t.to_string());
  114. }
  115. _ => {}
  116. }
  117. }
  118. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  119. line.push_str(&escape(measurement.key));
  120. let add_tag = |line: &mut String, key: &str, value: &str| {
  121. line.push_str(",");
  122. line.push_str(&escape(key));
  123. line.push_str("=");
  124. line.push_str(&escape(value));
  125. };
  126. for (key, value) in measurement.tags.iter() {
  127. add_tag(line, key, value);
  128. }
  129. for (key, value) in measurement.string_tags.iter() {
  130. add_tag(line, key, value);
  131. }
  132. //let mut was_spaced = false;
  133. let mut fields = measurement.fields.iter();
  134. // first time separate from tags with space
  135. //
  136. fields.next().map(|kv| {
  137. line.push_str(" ");
  138. line.push_str(&escape(kv.0));
  139. line.push_str("=");
  140. match kv.1 {
  141. &OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  142. &OwnedValue::Integer(ref i) => line.push_str(&as_integer(i)),
  143. &OwnedValue::Float(ref f) => line.push_str(&as_float(f)),
  144. &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b))
  145. };
  146. });
  147. //for (field, value) in measurement.fields.iter() {
  148. // subsequent times seperate with comma
  149. for kv in fields {
  150. //line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  151. //line.push_str(&escape(field));
  152. line.push_str(",");
  153. line.push_str(&escape(kv.0));
  154. line.push_str("=");
  155. match kv.1 {
  156. &OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  157. &OwnedValue::Integer(ref i) => line.push_str(&as_integer(i)),
  158. &OwnedValue::Float(ref f) => line.push_str(&as_float(f)),
  159. &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b))
  160. };
  161. }
  162. //match measurement.timestamp {
  163. if let Some(t) = measurement.timestamp {
  164. line.push_str(" ");
  165. line.push_str(&t.to_string());
  166. }
  167. }
  168. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  169. thread::spawn(move || {
  170. let _ = fs::create_dir("/tmp/mm");
  171. let ctx = zmq::Context::new();
  172. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  173. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  174. let client = Client::new();
  175. let mut buf = String::with_capacity(4096);
  176. let mut server_resp = String::with_capacity(4096);
  177. let mut count = 0;
  178. loop {
  179. if let Ok(bytes) = socket.recv_bytes(0) {
  180. if let Ok(msg) = String::from_utf8(bytes) {
  181. count = match count {
  182. 0 => {
  183. buf.push_str(&msg);
  184. 1
  185. }
  186. n @ 1...40 => {
  187. buf.push_str("\n");
  188. buf.push_str(&msg);
  189. n + 1
  190. }
  191. _ => {
  192. buf.push_str("\n");
  193. buf.push_str(&msg);
  194. match client.post(url.clone())
  195. .body(&buf)
  196. .send() {
  197. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  198. Ok(mut resp) => {
  199. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  200. warnings.send(
  201. Warning::Error(
  202. format!("Influx server: {}", server_resp)));
  203. server_resp.clear();
  204. }
  205. Err(why) => {
  206. warnings.send(
  207. Warning::Error(
  208. format!("Influx write error: {}", why)));
  209. }
  210. }
  211. buf.clear();
  212. 0
  213. }
  214. }
  215. }
  216. }
  217. }
  218. })
  219. }
  220. #[derive(Debug, Clone, PartialEq)]
  221. pub enum OwnedValue {
  222. String(String),
  223. Float(f64),
  224. Integer(i64),
  225. Boolean(bool)
  226. }
  227. #[derive(Clone)]
  228. pub struct OwnedMeasurement {
  229. pub key: &'static str,
  230. pub timestamp: Option<i64>,
  231. pub fields: HashMap<&'static str, OwnedValue>,
  232. pub tags: HashMap<&'static str, &'static str>,
  233. pub string_tags: HashMap<&'static str, String>
  234. }
  235. impl OwnedMeasurement {
  236. pub fn new(key: &'static str) -> Self {
  237. OwnedMeasurement {
  238. key,
  239. timestamp: None,
  240. fields: HashMap::new(),
  241. tags: HashMap::new(),
  242. string_tags: HashMap::new()
  243. }
  244. }
  245. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  246. self.tags.insert(key, value);
  247. self
  248. }
  249. pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
  250. self.string_tags.insert(key, value);
  251. self
  252. }
  253. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  254. self.fields.insert(key, value);
  255. self
  256. }
  257. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  258. self.timestamp = Some(timestamp);
  259. self
  260. }
  261. pub fn set_tag(mut self, key: &'static str, value: &'static str) -> Self {
  262. *self.tags.entry(key).or_insert(value) = value;
  263. self
  264. }
  265. }
  266. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  267. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  268. }
  269. //pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) }
  270. /// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
  271. /// incoming `Measurement`s that way *in addition* to the old socket/`String`
  272. /// method
  273. ///
  274. pub struct InfluxWriter {
  275. kill_switch: Sender<()>,
  276. thread: Option<thread::JoinHandle<()>>,
  277. }
  278. impl InfluxWriter {
  279. pub fn new(log_path: &str, warnings: Sender<Warning>) -> (Self, Sender<OwnedMeasurement>) {
  280. let (kill_switch, terminate) = channel();
  281. let (tx, rx) = channel();
  282. let logger = file_logger(log_path, Severity::Info);
  283. let thread = thread::spawn(move || {
  284. info!(logger, "initializing zmq");
  285. let _ = fs::create_dir("/tmp/mm");
  286. let ctx = zmq::Context::new();
  287. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  288. info!(logger, "initializing url";
  289. "DB_HOST" => DB_HOST,
  290. "DB_NAME" => DB_NAME);
  291. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  292. let client = Client::new();
  293. info!(logger, "initializing buffers");
  294. let mut meas_buf = String::with_capacity(4096);
  295. let mut buf = String::with_capacity(4096);
  296. let mut server_resp = String::with_capacity(4096);
  297. let mut count = 0;
  298. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  299. debug!(logger, "appending serialized measurement to buffer";
  300. "prev" => prev,
  301. "buf.len()" => buf.len());
  302. match prev {
  303. 0 => {
  304. buf.push_str(s);
  305. 1
  306. }
  307. n @ 1...80 => {
  308. buf.push_str("\n");
  309. buf.push_str(s);
  310. n + 1
  311. }
  312. _ => {
  313. buf.push_str("\n");
  314. if s.len() > 0 {
  315. buf.push_str(s);
  316. }
  317. debug!(logger, "sending buffer to influx";
  318. "buf.len()" => buf.len());
  319. let resp = client.post(url.clone())
  320. .body(buf.as_str())
  321. .send();
  322. match resp {
  323. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  324. debug!(logger, "server responded ok: 204 NoContent");
  325. }
  326. Ok(mut resp) => {
  327. let mut server_resp = String::with_capacity(1024);
  328. //server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
  329. //server_resp.push_str(&buf);
  330. //server_resp.push_str("\nreceived:\n");
  331. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  332. error!(logger, "influx server error";
  333. "status" => resp.status.to_string(),
  334. "body" => server_resp);
  335. }
  336. Err(why) => {
  337. error!(logger, "http request failed: {:?}", why);
  338. // warnings.send(
  339. // Warning::Error(
  340. // format!("Influx write error: {}", why)));
  341. }
  342. }
  343. buf.clear();
  344. 0
  345. }
  346. }
  347. };
  348. let mut rcvd_msg = false;
  349. loop {
  350. rcvd_msg = false;
  351. rx.try_recv()
  352. .map(|meas| {
  353. debug!(logger, "rcvd new OwnedMeasurement";
  354. "count" => count);
  355. serialize_owned(&meas, &mut meas_buf);
  356. count = next(count, &meas_buf, &mut buf);
  357. meas_buf.clear();
  358. rcvd_msg = true;
  359. });
  360. socket.recv_bytes(zmq::DONTWAIT).ok()
  361. .and_then(|bytes| {
  362. String::from_utf8(bytes).ok()
  363. }).map(|s| {
  364. debug!(logger, "rcvd new serialized";
  365. "count" => count);
  366. count = next(count, &s, &mut buf);
  367. rcvd_msg = true;
  368. });
  369. let end = terminate.try_recv()
  370. .map(|_| {
  371. let _ = next(::std::u8::MAX, "", &mut buf);
  372. true
  373. }).unwrap_or(false);
  374. if end { break }
  375. if !rcvd_msg {
  376. #[cfg(feature = "no-thrash")]
  377. shuteye::sleep(Duration::new(0, 5000));
  378. }
  379. }
  380. crit!(logger, "goodbye");
  381. });
  382. let writer = InfluxWriter {
  383. kill_switch,
  384. thread: Some(thread)
  385. };
  386. (writer, tx)
  387. }
  388. }
  389. impl Drop for InfluxWriter {
  390. fn drop(&mut self) {
  391. self.kill_switch.send(());
  392. if let Some(thread) = self.thread.take() {
  393. let _ = thread.join();
  394. }
  395. }
  396. }
  397. mod tests {
  398. use super::*;
  399. #[test]
  400. #[ignore]
  401. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  402. let ctx = zmq::Context::new();
  403. let socket = push(&ctx).unwrap();
  404. let (tx, rx) = channel();
  405. let w = writer(tx.clone());
  406. let mut buf = String::with_capacity(4096);
  407. let mut meas = Measurement::new("rust_test");
  408. meas.add_tag("a", "t");
  409. meas.add_field("c", Value::Float(1.23456));
  410. let now = now();
  411. meas.set_timestamp(now);
  412. serialize(&meas, &mut buf);
  413. socket.send_str(&buf, 0);
  414. drop(w);
  415. }
  416. #[test]
  417. fn it_serializes_a_measurement_in_place() {
  418. let mut buf = String::with_capacity(4096);
  419. let mut meas = Measurement::new("rust_test");
  420. meas.add_tag("a", "b");
  421. meas.add_field("c", Value::Float(1.0));
  422. let now = now();
  423. meas.set_timestamp(now);
  424. serialize(&meas, &mut buf);
  425. let ans = format!("rust_test,a=b c=1 {}", now);
  426. assert_eq!(buf, ans);
  427. }
  428. #[test]
  429. fn it_serializes_a_hard_to_serialize_message() {
  430. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  431. let mut buf = String::new();
  432. let mut server_resp = String::new();
  433. let mut m = Measurement::new("rust_test");
  434. m.add_field("s", Value::String(&raw));
  435. let now = now();
  436. m.set_timestamp(now);
  437. serialize(&m, &mut buf);
  438. println!("{}", buf);
  439. buf.push_str("\n");
  440. let buf_copy = buf.clone();
  441. buf.push_str(&buf_copy);
  442. println!("{}", buf);
  443. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  444. let client = Client::new();
  445. match client.post(url.clone())
  446. .body(&buf)
  447. .send() {
  448. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  449. Ok(mut resp) => {
  450. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  451. panic!("{}", server_resp);
  452. }
  453. Err(why) => {
  454. panic!(why)
  455. }
  456. }
  457. }
  458. #[bench]
  459. fn serialize_owned_longer(b: &mut Bencher) {
  460. let mut buf = String::with_capacity(1024);
  461. let m =
  462. OwnedMeasurement::new("test")
  463. .add_tag("one", "a")
  464. .add_tag("two", "b")
  465. .add_tag("ticker", "xmr_btc")
  466. .add_tag("exchange", "plnx")
  467. .add_tag("side", "bid")
  468. .add_field("three", OwnedValue::Float(1.2345))
  469. .add_field("four", OwnedValue::Integer(57))
  470. .add_field("five", OwnedValue::Boolean(true))
  471. .add_field("six", OwnedValue::String(String::from("abcdefghijklmnopqrstuvwxyz")))
  472. .set_timestamp(now());
  473. b.iter(|| {
  474. serialize_owned(&m, &mut buf);
  475. buf.clear()
  476. });
  477. }
  478. #[bench]
  479. fn serialize_owned_simple(b: &mut Bencher) {
  480. let mut buf = String::with_capacity(1024);
  481. let m =
  482. OwnedMeasurement::new("test")
  483. .add_tag("one", "a")
  484. .add_tag("two", "b")
  485. .add_field("three", OwnedValue::Float(1.2345))
  486. .add_field("four", OwnedValue::Integer(57))
  487. .set_timestamp(now());
  488. b.iter(|| {
  489. serialize_owned(&m, &mut buf);
  490. buf.clear()
  491. });
  492. }
  493. #[test]
  494. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  495. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  496. let mut buf = String::new();
  497. let mut server_resp = String::new();
  498. let mut m = OwnedMeasurement::new("rust_test")
  499. .add_field("s", OwnedValue::String(raw.to_string()))
  500. .set_timestamp(now());
  501. serialize_owned(&m, &mut buf);
  502. println!("{}", buf);
  503. buf.push_str("\n");
  504. let buf_copy = buf.clone();
  505. buf.push_str(&buf_copy);
  506. println!("{}", buf);
  507. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  508. let client = Client::new();
  509. match client.post(url.clone())
  510. .body(&buf)
  511. .send() {
  512. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  513. Ok(mut resp) => {
  514. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  515. panic!("{}", server_resp);
  516. }
  517. Err(why) => {
  518. panic!(why)
  519. }
  520. }
  521. }
  522. }