You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

543 lines
17KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel};
  6. use std::thread;
  7. use std::collections::HashMap;
  8. use std::fs::{self, OpenOptions};
  9. use std::time::Duration;
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. use chrono::{DateTime, Utc, TimeZone};
  17. use sloggers::types::Severity;
  18. use super::{nanos, file_logger};
  19. use warnings::Warning;
  20. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  21. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  22. const DB_NAME: &'static str = "mm";
  23. const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
  24. //const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  25. const ZMQ_RCV_HWM: i32 = 0;
  26. const ZMQ_SND_HWM: i32 = 0;
  27. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  28. let socket = ctx.socket(zmq::PULL)?;
  29. socket.bind(WRITER_ADDR)?;
  30. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  31. Ok(socket)
  32. }
  33. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  34. let socket = ctx.socket(zmq::PUSH)?;
  35. socket.connect(WRITER_ADDR)?;
  36. socket.set_sndhwm(ZMQ_SND_HWM)?;
  37. Ok(socket)
  38. }
  39. fn escape(s: &str) -> String {
  40. s.replace(" ", "\\ ")
  41. .replace(",", "\\,")
  42. }
  43. fn as_string(s: &str) -> String {
  44. // the second replace removes double escapes
  45. //
  46. format!("\"{}\"", s.replace("\"", "\\\"")
  47. .replace(r#"\\""#, r#"\""#))
  48. }
  49. #[test]
  50. fn it_checks_as_string_does_not_double_escape() {
  51. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  52. let escaped = as_string(&raw);
  53. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  54. }
  55. fn as_integer(i: &i64) -> String {
  56. format!("{}i", i)
  57. }
  58. fn as_float(f: &f64) -> String {
  59. f.to_string()
  60. }
  61. fn as_boolean(b: &bool) -> &str {
  62. if *b { "t" } else { "f" }
  63. }
  64. pub fn now() -> i64 {
  65. nanos(Utc::now()) as i64
  66. }
  67. /// Serialize the measurement into influx line protocol
  68. /// and append to the buffer.
  69. ///
  70. /// # Examples
  71. ///
  72. /// ```
  73. /// extern crate influent;
  74. /// extern crate logging;
  75. ///
  76. /// use influent::measurement::{Measurement, Value};
  77. /// use std::string::String;
  78. /// use logging::influx::serialize;
  79. ///
  80. /// fn main() {
  81. /// let mut buf = String::new();
  82. /// let mut m = Measurement::new("test");
  83. /// m.add_field("x", Value::Integer(1));
  84. /// serialize(&m, &mut buf);
  85. /// }
  86. ///
  87. /// ```
  88. ///
  89. pub fn serialize(measurement: &Measurement, line: &mut String) {
  90. line.push_str(&escape(measurement.key));
  91. for (tag, value) in measurement.tags.iter() {
  92. line.push_str(",");
  93. line.push_str(&escape(tag));
  94. line.push_str("=");
  95. line.push_str(&escape(value));
  96. }
  97. let mut was_spaced = false;
  98. for (field, value) in measurement.fields.iter() {
  99. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  100. line.push_str(&escape(field));
  101. line.push_str("=");
  102. match value {
  103. &Value::String(ref s) => line.push_str(&as_string(s)),
  104. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  105. &Value::Float(ref f) => line.push_str(&as_float(f)),
  106. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  107. };
  108. }
  109. match measurement.timestamp {
  110. Some(t) => {
  111. line.push_str(" ");
  112. line.push_str(&t.to_string());
  113. }
  114. _ => {}
  115. }
  116. }
  117. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  118. line.push_str(&escape(measurement.key));
  119. let add_tag = |line: &mut String, key: &str, value: &str| {
  120. line.push_str(",");
  121. line.push_str(&escape(key));
  122. line.push_str("=");
  123. line.push_str(&escape(value));
  124. };
  125. for (key, value) in measurement.tags.iter() {
  126. add_tag(line, key, value);
  127. }
  128. for (key, value) in measurement.string_tags.iter() {
  129. add_tag(line, key, value);
  130. }
  131. let mut was_spaced = false;
  132. for (field, value) in measurement.fields.iter() {
  133. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  134. line.push_str(&escape(field));
  135. line.push_str("=");
  136. match value {
  137. &OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  138. &OwnedValue::Integer(ref i) => line.push_str(&as_integer(i)),
  139. &OwnedValue::Float(ref f) => line.push_str(&as_float(f)),
  140. &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b))
  141. };
  142. }
  143. match measurement.timestamp {
  144. Some(t) => {
  145. line.push_str(" ");
  146. line.push_str(&t.to_string());
  147. }
  148. _ => {}
  149. }
  150. }
  151. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  152. thread::spawn(move || {
  153. let _ = fs::create_dir("/tmp/mm");
  154. let ctx = zmq::Context::new();
  155. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  156. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  157. let client = Client::new();
  158. let mut buf = String::with_capacity(4096);
  159. let mut server_resp = String::with_capacity(4096);
  160. let mut count = 0;
  161. loop {
  162. if let Ok(bytes) = socket.recv_bytes(0) {
  163. if let Ok(msg) = String::from_utf8(bytes) {
  164. count = match count {
  165. 0 => {
  166. buf.push_str(&msg);
  167. 1
  168. }
  169. n @ 1...40 => {
  170. buf.push_str("\n");
  171. buf.push_str(&msg);
  172. n + 1
  173. }
  174. _ => {
  175. buf.push_str("\n");
  176. buf.push_str(&msg);
  177. match client.post(url.clone())
  178. .body(&buf)
  179. .send() {
  180. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  181. Ok(mut resp) => {
  182. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  183. warnings.send(
  184. Warning::Error(
  185. format!("Influx server: {}", server_resp)));
  186. server_resp.clear();
  187. }
  188. Err(why) => {
  189. warnings.send(
  190. Warning::Error(
  191. format!("Influx write error: {}", why)));
  192. }
  193. }
  194. buf.clear();
  195. 0
  196. }
  197. }
  198. }
  199. }
  200. }
  201. })
  202. }
  203. #[derive(Debug, Clone, PartialEq)]
  204. pub enum OwnedValue {
  205. String(String),
  206. Float(f64),
  207. Integer(i64),
  208. Boolean(bool)
  209. }
  210. pub struct OwnedMeasurement {
  211. pub key: &'static str,
  212. pub timestamp: Option<i64>,
  213. pub fields: HashMap<&'static str, OwnedValue>,
  214. pub tags: HashMap<&'static str, &'static str>,
  215. pub string_tags: HashMap<&'static str, String>
  216. }
  217. impl OwnedMeasurement {
  218. pub fn new(key: &'static str) -> Self {
  219. OwnedMeasurement {
  220. key,
  221. timestamp: None,
  222. fields: HashMap::new(),
  223. tags: HashMap::new(),
  224. string_tags: HashMap::new()
  225. }
  226. }
  227. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  228. self.tags.insert(key, value);
  229. self
  230. }
  231. pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
  232. self.string_tags.insert(key, value);
  233. self
  234. }
  235. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  236. self.fields.insert(key, value);
  237. self
  238. }
  239. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  240. self.timestamp = Some(timestamp);
  241. self
  242. }
  243. }
  244. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  245. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  246. }
  247. //pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) }
  248. /// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
  249. /// incoming `Measurement`s that way *in addition* to the old socket/`String`
  250. /// method
  251. ///
  252. pub struct InfluxWriter {
  253. kill_switch: Sender<()>,
  254. thread: Option<thread::JoinHandle<()>>,
  255. }
  256. impl InfluxWriter {
  257. pub fn new(log_path: &str, warnings: Sender<Warning>) -> (Self, Sender<OwnedMeasurement>) {
  258. let (kill_switch, terminate) = channel();
  259. let (tx, rx) = channel();
  260. let logger = file_logger(log_path, Severity::Info);
  261. let thread = thread::spawn(move || {
  262. info!(logger, "initializing zmq");
  263. let _ = fs::create_dir("/tmp/mm");
  264. let ctx = zmq::Context::new();
  265. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  266. info!(logger, "initializing url";
  267. "DB_HOST" => DB_HOST,
  268. "DB_NAME" => DB_NAME);
  269. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  270. let client = Client::new();
  271. info!(logger, "initializing buffers");
  272. let mut meas_buf = String::with_capacity(4096);
  273. let mut buf = String::with_capacity(4096);
  274. let mut server_resp = String::with_capacity(4096);
  275. let mut count = 0;
  276. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  277. debug!(logger, "appending serialized measurement to buffer";
  278. "prev" => prev,
  279. "buf.len()" => buf.len());
  280. match prev {
  281. 0 => {
  282. buf.push_str(s);
  283. 1
  284. }
  285. n @ 1...80 => {
  286. buf.push_str("\n");
  287. buf.push_str(s);
  288. n + 1
  289. }
  290. _ => {
  291. buf.push_str("\n");
  292. if s.len() > 0 {
  293. buf.push_str(s);
  294. }
  295. debug!(logger, "sending buffer to influx";
  296. "buf.len()" => buf.len());
  297. let resp = client.post(url.clone())
  298. .body(buf.as_str())
  299. .send();
  300. match resp {
  301. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  302. debug!(logger, "server responded ok: 204 NoContent");
  303. }
  304. Ok(mut resp) => {
  305. let mut server_resp = String::with_capacity(1024);
  306. //server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
  307. //server_resp.push_str(&buf);
  308. //server_resp.push_str("\nreceived:\n");
  309. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  310. error!(logger, "influx server error";
  311. "status" => resp.status.to_string(),
  312. "body" => server_resp);
  313. }
  314. Err(why) => {
  315. error!(logger, "http request failed: {:?}", why);
  316. // warnings.send(
  317. // Warning::Error(
  318. // format!("Influx write error: {}", why)));
  319. }
  320. }
  321. buf.clear();
  322. 0
  323. }
  324. }
  325. };
  326. let mut rcvd_msg = false;
  327. loop {
  328. rcvd_msg = false;
  329. rx.try_recv()
  330. .map(|meas| {
  331. debug!(logger, "rcvd new OwnedMeasurement";
  332. "count" => count);
  333. serialize_owned(&meas, &mut meas_buf);
  334. count = next(count, &meas_buf, &mut buf);
  335. meas_buf.clear();
  336. rcvd_msg = true;
  337. });
  338. socket.recv_bytes(zmq::DONTWAIT).ok()
  339. .and_then(|bytes| {
  340. String::from_utf8(bytes).ok()
  341. }).map(|s| {
  342. debug!(logger, "rcvd new serialized";
  343. "count" => count);
  344. count = next(count, &s, &mut buf);
  345. rcvd_msg = true;
  346. });
  347. let end = terminate.try_recv()
  348. .map(|_| {
  349. let _ = next(::std::u8::MAX, "", &mut buf);
  350. true
  351. }).unwrap_or(false);
  352. if end { break }
  353. if !rcvd_msg {
  354. #[cfg(feature = "no-thrash")]
  355. thread::sleep(Duration::from_millis(1) / 10);
  356. }
  357. }
  358. crit!(logger, "goodbye");
  359. });
  360. let writer = InfluxWriter {
  361. kill_switch,
  362. thread: Some(thread)
  363. };
  364. (writer, tx)
  365. }
  366. }
  367. impl Drop for InfluxWriter {
  368. fn drop(&mut self) {
  369. self.kill_switch.send(());
  370. if let Some(thread) = self.thread.take() {
  371. let _ = thread.join();
  372. }
  373. }
  374. }
  375. mod tests {
  376. use super::*;
  377. #[test]
  378. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  379. let ctx = zmq::Context::new();
  380. let socket = push(&ctx).unwrap();
  381. let (tx, rx) = channel();
  382. let w = writer(tx.clone());
  383. let mut buf = String::with_capacity(4096);
  384. let mut meas = Measurement::new("rust_test");
  385. meas.add_tag("a", "t");
  386. meas.add_field("c", Value::Float(1.23456));
  387. let now = now();
  388. meas.set_timestamp(now);
  389. serialize(&meas, &mut buf);
  390. socket.send_str(&buf, 0);
  391. drop(w);
  392. }
  393. #[test]
  394. fn it_serializes_a_measurement_in_place() {
  395. let mut buf = String::with_capacity(4096);
  396. let mut meas = Measurement::new("rust_test");
  397. meas.add_tag("a", "b");
  398. meas.add_field("c", Value::Float(1.0));
  399. let now = now();
  400. meas.set_timestamp(now);
  401. serialize(&meas, &mut buf);
  402. let ans = format!("rust_test,a=b c=1 {}", now);
  403. assert_eq!(buf, ans);
  404. }
  405. #[test]
  406. fn it_serializes_a_hard_to_serialize_message() {
  407. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  408. let mut buf = String::new();
  409. let mut server_resp = String::new();
  410. let mut m = Measurement::new("rust_test");
  411. m.add_field("s", Value::String(&raw));
  412. let now = now();
  413. m.set_timestamp(now);
  414. serialize(&m, &mut buf);
  415. println!("{}", buf);
  416. buf.push_str("\n");
  417. let buf_copy = buf.clone();
  418. buf.push_str(&buf_copy);
  419. println!("{}", buf);
  420. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  421. let client = Client::new();
  422. match client.post(url.clone())
  423. .body(&buf)
  424. .send() {
  425. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  426. Ok(mut resp) => {
  427. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  428. panic!("{}", server_resp);
  429. }
  430. Err(why) => {
  431. panic!(why)
  432. }
  433. }
  434. }
  435. #[test]
  436. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  437. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  438. let mut buf = String::new();
  439. let mut server_resp = String::new();
  440. let mut m = OwnedMeasurement::new("rust_test")
  441. .add_field("s", OwnedValue::String(raw.to_string()))
  442. .set_timestamp(now());
  443. serialize_owned(&m, &mut buf);
  444. println!("{}", buf);
  445. buf.push_str("\n");
  446. let buf_copy = buf.clone();
  447. buf.push_str(&buf_copy);
  448. println!("{}", buf);
  449. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  450. let client = Client::new();
  451. match client.post(url.clone())
  452. .body(&buf)
  453. .send() {
  454. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  455. Ok(mut resp) => {
  456. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  457. panic!("{}", server_resp);
  458. }
  459. Err(why) => {
  460. panic!(why)
  461. }
  462. }
  463. }
  464. }