You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

influx.rs 17KB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel};
  6. use std::thread;
  7. use std::collections::HashMap;
  8. use std::fs::{self, OpenOptions};
  9. use std::time::Duration;
  10. use hyper::status::StatusCode;
  11. use hyper::client::response::Response;
  12. use hyper::Url;
  13. use hyper::client::Client;
  14. use influent::measurement::{Measurement, Value};
  15. use zmq;
  16. use chrono::{DateTime, Utc, TimeZone};
  17. use super::{nanos, file_logger};
  18. use warnings::Warning;
  19. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  20. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  21. const DB_NAME: &'static str = "mm";
  22. //const DB_HOST: &'static str = "http://localhost:8086/write";
  23. const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  24. const ZMQ_RCV_HWM: i32 = 0;
  25. const ZMQ_SND_HWM: i32 = 0;
  26. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  27. let socket = ctx.socket(zmq::PULL)?;
  28. socket.bind(WRITER_ADDR)?;
  29. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  30. Ok(socket)
  31. }
  32. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  33. let socket = ctx.socket(zmq::PUSH)?;
  34. socket.connect(WRITER_ADDR)?;
  35. socket.set_sndhwm(ZMQ_SND_HWM)?;
  36. Ok(socket)
  37. }
  38. fn escape(s: &str) -> String {
  39. s.replace(" ", "\\ ")
  40. .replace(",", "\\,")
  41. }
  42. fn as_string(s: &str) -> String {
  43. // the second replace removes double escapes
  44. //
  45. format!("\"{}\"", s.replace("\"", "\\\"")
  46. .replace(r#"\\""#, r#"\""#))
  47. }
  48. #[test]
  49. fn it_checks_as_string_does_not_double_escape() {
  50. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  51. let escaped = as_string(&raw);
  52. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  53. }
  54. fn as_integer(i: &i64) -> String {
  55. format!("{}i", i)
  56. }
  57. fn as_float(f: &f64) -> String {
  58. f.to_string()
  59. }
  60. fn as_boolean(b: &bool) -> &str {
  61. if *b { "t" } else { "f" }
  62. }
  63. pub fn now() -> i64 {
  64. nanos(Utc::now()) as i64
  65. }
  66. /// Serialize the measurement into influx line protocol
  67. /// and append to the buffer.
  68. ///
  69. /// # Examples
  70. ///
  71. /// ```
  72. /// extern crate influent;
  73. /// extern crate logging;
  74. ///
  75. /// use influent::measurement::{Measurement, Value};
  76. /// use std::string::String;
  77. /// use logging::influx::serialize;
  78. ///
  79. /// fn main() {
  80. /// let mut buf = String::new();
  81. /// let mut m = Measurement::new("test");
  82. /// m.add_field("x", Value::Integer(1));
  83. /// serialize(&m, &mut buf);
  84. /// }
  85. ///
  86. /// ```
  87. ///
  88. pub fn serialize(measurement: &Measurement, line: &mut String) {
  89. line.push_str(&escape(measurement.key));
  90. for (tag, value) in measurement.tags.iter() {
  91. line.push_str(",");
  92. line.push_str(&escape(tag));
  93. line.push_str("=");
  94. line.push_str(&escape(value));
  95. }
  96. let mut was_spaced = false;
  97. for (field, value) in measurement.fields.iter() {
  98. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  99. line.push_str(&escape(field));
  100. line.push_str("=");
  101. match value {
  102. &Value::String(ref s) => line.push_str(&as_string(s)),
  103. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  104. &Value::Float(ref f) => line.push_str(&as_float(f)),
  105. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  106. };
  107. }
  108. match measurement.timestamp {
  109. Some(t) => {
  110. line.push_str(" ");
  111. line.push_str(&t.to_string());
  112. }
  113. _ => {}
  114. }
  115. }
  116. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  117. line.push_str(&escape(measurement.key));
  118. let add_tag = |line: &mut String, key: &str, value: &str| {
  119. line.push_str(",");
  120. line.push_str(&escape(key));
  121. line.push_str("=");
  122. line.push_str(&escape(value));
  123. };
  124. for (key, value) in measurement.tags.iter() {
  125. add_tag(line, key, value);
  126. }
  127. for (key, value) in measurement.string_tags.iter() {
  128. add_tag(line, key, value);
  129. }
  130. let mut was_spaced = false;
  131. for (field, value) in measurement.fields.iter() {
  132. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  133. line.push_str(&escape(field));
  134. line.push_str("=");
  135. match value {
  136. &OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  137. &OwnedValue::Integer(ref i) => line.push_str(&as_integer(i)),
  138. &OwnedValue::Float(ref f) => line.push_str(&as_float(f)),
  139. &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b))
  140. };
  141. }
  142. match measurement.timestamp {
  143. Some(t) => {
  144. line.push_str(" ");
  145. line.push_str(&t.to_string());
  146. }
  147. _ => {}
  148. }
  149. }
  150. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  151. thread::spawn(move || {
  152. let _ = fs::create_dir("/tmp/mm");
  153. let ctx = zmq::Context::new();
  154. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  155. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  156. let client = Client::new();
  157. let mut buf = String::with_capacity(4096);
  158. let mut server_resp = String::with_capacity(4096);
  159. let mut count = 0;
  160. loop {
  161. if let Ok(bytes) = socket.recv_bytes(0) {
  162. if let Ok(msg) = String::from_utf8(bytes) {
  163. count = match count {
  164. 0 => {
  165. buf.push_str(&msg);
  166. 1
  167. }
  168. n @ 1...40 => {
  169. buf.push_str("\n");
  170. buf.push_str(&msg);
  171. n + 1
  172. }
  173. _ => {
  174. buf.push_str("\n");
  175. buf.push_str(&msg);
  176. match client.post(url.clone())
  177. .body(&buf)
  178. .send() {
  179. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  180. Ok(mut resp) => {
  181. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  182. warnings.send(
  183. Warning::Error(
  184. format!("Influx server: {}", server_resp)));
  185. server_resp.clear();
  186. }
  187. Err(why) => {
  188. warnings.send(
  189. Warning::Error(
  190. format!("Influx write error: {}", why)));
  191. }
  192. }
  193. buf.clear();
  194. 0
  195. }
  196. }
  197. }
  198. }
  199. }
  200. })
  201. }
  202. #[derive(Debug, Clone, PartialEq)]
  203. pub enum OwnedValue {
  204. String(String),
  205. Float(f64),
  206. Integer(i64),
  207. Boolean(bool)
  208. }
  209. pub struct OwnedMeasurement {
  210. pub key: &'static str,
  211. pub timestamp: Option<i64>,
  212. pub fields: HashMap<&'static str, OwnedValue>,
  213. pub tags: HashMap<&'static str, &'static str>,
  214. pub string_tags: HashMap<&'static str, String>
  215. }
  216. impl OwnedMeasurement {
  217. pub fn new(key: &'static str) -> Self {
  218. OwnedMeasurement {
  219. key,
  220. timestamp: None,
  221. fields: HashMap::new(),
  222. tags: HashMap::new(),
  223. string_tags: HashMap::new()
  224. }
  225. }
  226. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  227. self.tags.insert(key, value);
  228. self
  229. }
  230. pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
  231. self.string_tags.insert(key, value);
  232. self
  233. }
  234. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  235. self.fields.insert(key, value);
  236. self
  237. }
  238. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  239. self.timestamp = Some(timestamp);
  240. self
  241. }
  242. }
  243. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  244. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  245. }
  246. //pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) }
  247. /// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
  248. /// incoming `Measurement`s that way *in addition* to the old socket/`String`
  249. /// method
  250. ///
  251. pub fn writer_str_or_meas(warnings: Sender<Warning>) -> (thread::JoinHandle<()>, Sender<OwnedMeasurement>) {
  252. let (tx, rx) = channel();
  253. let thread = thread::spawn(move || {
  254. let logger = file_logger("var/log/influx.log");
  255. info!(logger, "initializing zmq");
  256. let _ = fs::create_dir("/tmp/mm");
  257. let ctx = zmq::Context::new();
  258. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  259. info!(logger, "initializing url";
  260. "DB_HOST" => DB_HOST,
  261. "DB_NAME" => DB_NAME);
  262. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  263. let client = Client::new();
  264. info!(logger, "initializing buffers");
  265. let mut meas_buf = String::with_capacity(4096);
  266. let mut buf = String::with_capacity(4096);
  267. let mut server_resp = String::with_capacity(4096);
  268. let mut count = 0;
  269. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  270. debug!(logger, "appending serialized measurement to buffer";
  271. "prev" => prev,
  272. "buf.len()" => buf.len());
  273. match prev {
  274. 0 => {
  275. buf.push_str(s);
  276. 1
  277. }
  278. n @ 1...40 => {
  279. buf.push_str("\n");
  280. buf.push_str(s);
  281. n + 1
  282. }
  283. _ => {
  284. buf.push_str("\n");
  285. buf.push_str(s);
  286. debug!(logger, "sending buffer to influx";
  287. "buf.len()" => buf.len());
  288. let resp = client.post(url.clone())
  289. .body(buf.as_str())
  290. .send();
  291. match resp {
  292. Ok(Response { status, .. }) if status == StatusCode::NoContent => {
  293. debug!(logger, "server responded ok: 204 NoContent");
  294. }
  295. Ok(mut resp) => {
  296. let mut server_resp = String::with_capacity(1024);
  297. //server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
  298. //server_resp.push_str(&buf);
  299. //server_resp.push_str("\nreceived:\n");
  300. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  301. error!(logger, "influx server error";
  302. "status" => resp.status.to_string(),
  303. "body" => server_resp);
  304. // OpenOptions::new()
  305. // .create(true)
  306. // .append(true)
  307. // .open("/home/jstrong/src/market-maker/influx-errors.txt")
  308. // .map_err(|e| {
  309. // warnings.send(Warning::Error(format!("failed to save influx error: {}", e)));
  310. // }).map(|mut file| {
  311. // write!(file, "{}", server_resp);
  312. // });
  313. // server_resp.truncate(120);
  314. // warnings.send(
  315. // Warning::Error(
  316. // format!("Influx server: {}", server_resp)));
  317. }
  318. Err(why) => {
  319. error!(logger, "http request failed: {:?}", why);
  320. // warnings.send(
  321. // Warning::Error(
  322. // format!("Influx write error: {}", why)));
  323. }
  324. }
  325. buf.clear();
  326. 0
  327. }
  328. }
  329. };
  330. let mut rcvd_msg = false;
  331. loop {
  332. rcvd_msg = false;
  333. rx.try_recv()
  334. .map(|meas| {
  335. debug!(logger, "rcvd new OwnedMeasurement";
  336. "count" => count);
  337. serialize_owned(&meas, &mut meas_buf);
  338. count = next(count, &meas_buf, &mut buf);
  339. meas_buf.clear();
  340. rcvd_msg = true;
  341. });
  342. socket.recv_bytes(zmq::DONTWAIT).ok()
  343. .and_then(|bytes| {
  344. String::from_utf8(bytes).ok()
  345. }).map(|s| {
  346. debug!(logger, "rcvd new serialized";
  347. "count" => count);
  348. count = next(count, &s, &mut buf);
  349. rcvd_msg = true;
  350. });
  351. if !rcvd_msg {
  352. thread::sleep(Duration::from_millis(1) / 10);
  353. }
  354. }
  355. crit!(logger, "goodbye");
  356. });
  357. (thread, tx)
  358. }
  359. mod tests {
  360. use super::*;
  361. #[test]
  362. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  363. let ctx = zmq::Context::new();
  364. let socket = push(&ctx).unwrap();
  365. let (tx, rx) = channel();
  366. let w = writer(tx.clone());
  367. let mut buf = String::with_capacity(4096);
  368. let mut meas = Measurement::new("rust_test");
  369. meas.add_tag("a", "t");
  370. meas.add_field("c", Value::Float(1.23456));
  371. let now = now();
  372. meas.set_timestamp(now);
  373. serialize(&meas, &mut buf);
  374. socket.send_str(&buf, 0);
  375. drop(w);
  376. }
  377. #[test]
  378. fn it_serializes_a_measurement_in_place() {
  379. let mut buf = String::with_capacity(4096);
  380. let mut meas = Measurement::new("rust_test");
  381. meas.add_tag("a", "b");
  382. meas.add_field("c", Value::Float(1.0));
  383. let now = now();
  384. meas.set_timestamp(now);
  385. serialize(&meas, &mut buf);
  386. let ans = format!("rust_test,a=b c=1 {}", now);
  387. assert_eq!(buf, ans);
  388. }
  389. #[test]
  390. fn it_serializes_a_hard_to_serialize_message() {
  391. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  392. let mut buf = String::new();
  393. let mut server_resp = String::new();
  394. let mut m = Measurement::new("rust_test");
  395. m.add_field("s", Value::String(&raw));
  396. let now = now();
  397. m.set_timestamp(now);
  398. serialize(&m, &mut buf);
  399. println!("{}", buf);
  400. buf.push_str("\n");
  401. let buf_copy = buf.clone();
  402. buf.push_str(&buf_copy);
  403. println!("{}", buf);
  404. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  405. let client = Client::new();
  406. match client.post(url.clone())
  407. .body(&buf)
  408. .send() {
  409. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  410. Ok(mut resp) => {
  411. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  412. panic!("{}", server_resp);
  413. }
  414. Err(why) => {
  415. panic!(why)
  416. }
  417. }
  418. }
  419. #[test]
  420. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  421. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  422. let mut buf = String::new();
  423. let mut server_resp = String::new();
  424. let mut m = OwnedMeasurement::new("rust_test")
  425. .add_field("s", OwnedValue::String(raw.to_string()))
  426. .set_timestamp(now());
  427. serialize_owned(&m, &mut buf);
  428. println!("{}", buf);
  429. buf.push_str("\n");
  430. let buf_copy = buf.clone();
  431. buf.push_str(&buf_copy);
  432. println!("{}", buf);
  433. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  434. let client = Client::new();
  435. match client.post(url.clone())
  436. .body(&buf)
  437. .send() {
  438. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  439. Ok(mut resp) => {
  440. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  441. panic!("{}", server_resp);
  442. }
  443. Err(why) => {
  444. panic!(why)
  445. }
  446. }
  447. }
  448. }