You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

486 lines
15KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel};
  6. use std::thread;
  7. use std::collections::HashMap;
  8. use std::fs::{self, OpenOptions};
  9. use hyper::status::StatusCode;
  10. use hyper::client::response::Response;
  11. use hyper::Url;
  12. use hyper::client::Client;
  13. use influent::measurement::{Measurement, Value};
  14. use zmq;
  15. use chrono::{DateTime, Utc, TimeZone};
  16. use super::nanos;
  17. use warnings::Warning;
  18. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  19. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  20. const DB_NAME: &'static str = "mm";
  21. //const DB_HOST: &'static str = "http://localhost:8086/write";
  22. const DB_HOST: &'static str = "http://harrison.0ptimus.internal:8086/write";
  23. const ZMQ_RCV_HWM: i32 = 0;
  24. const ZMQ_SND_HWM: i32 = 0;
  25. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  26. let socket = ctx.socket(zmq::PULL)?;
  27. socket.bind(WRITER_ADDR)?;
  28. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  29. Ok(socket)
  30. }
  31. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  32. let socket = ctx.socket(zmq::PUSH)?;
  33. socket.connect(WRITER_ADDR)?;
  34. socket.set_sndhwm(ZMQ_SND_HWM)?;
  35. Ok(socket)
  36. }
  37. fn escape(s: &str) -> String {
  38. s.replace(" ", "\\ ")
  39. .replace(",", "\\,")
  40. }
  41. fn as_string(s: &str) -> String {
  42. // the second replace removes double escapes
  43. //
  44. format!("\"{}\"", s.replace("\"", "\\\"")
  45. .replace(r#"\\""#, r#"\""#))
  46. }
  47. #[test]
  48. fn it_checks_as_string_does_not_double_escape() {
  49. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  50. let escaped = as_string(&raw);
  51. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  52. }
  53. fn as_integer(i: &i64) -> String {
  54. format!("{}i", i)
  55. }
  56. fn as_float(f: &f64) -> String {
  57. f.to_string()
  58. }
  59. fn as_boolean(b: &bool) -> &str {
  60. if *b { "t" } else { "f" }
  61. }
  62. pub fn now() -> i64 {
  63. nanos(Utc::now()) as i64
  64. }
  65. /// Serialize the measurement into influx line protocol
  66. /// and append to the buffer.
  67. ///
  68. /// # Examples
  69. ///
  70. /// ```
  71. /// extern crate influent;
  72. /// extern crate logging;
  73. ///
  74. /// use influent::measurement::{Measurement, Value};
  75. /// use std::string::String;
  76. /// use logging::influx::serialize;
  77. ///
  78. /// fn main() {
  79. /// let mut buf = String::new();
  80. /// let mut m = Measurement::new("test");
  81. /// m.add_field("x", Value::Integer(1));
  82. /// serialize(&m, &mut buf);
  83. /// }
  84. ///
  85. /// ```
  86. ///
  87. pub fn serialize(measurement: &Measurement, line: &mut String) {
  88. line.push_str(&escape(measurement.key));
  89. for (tag, value) in measurement.tags.iter() {
  90. line.push_str(",");
  91. line.push_str(&escape(tag));
  92. line.push_str("=");
  93. line.push_str(&escape(value));
  94. }
  95. let mut was_spaced = false;
  96. for (field, value) in measurement.fields.iter() {
  97. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  98. line.push_str(&escape(field));
  99. line.push_str("=");
  100. match value {
  101. &Value::String(ref s) => line.push_str(&as_string(s)),
  102. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  103. &Value::Float(ref f) => line.push_str(&as_float(f)),
  104. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  105. };
  106. }
  107. match measurement.timestamp {
  108. Some(t) => {
  109. line.push_str(" ");
  110. line.push_str(&t.to_string());
  111. }
  112. _ => {}
  113. }
  114. }
  115. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  116. line.push_str(&escape(measurement.key));
  117. let add_tag = |line: &mut String, key: &str, value: &str| {
  118. line.push_str(",");
  119. line.push_str(&escape(key));
  120. line.push_str("=");
  121. line.push_str(&escape(value));
  122. };
  123. for (key, value) in measurement.tags.iter() {
  124. add_tag(line, key, value);
  125. }
  126. for (key, value) in measurement.string_tags.iter() {
  127. add_tag(line, key, value);
  128. }
  129. let mut was_spaced = false;
  130. for (field, value) in measurement.fields.iter() {
  131. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  132. line.push_str(&escape(field));
  133. line.push_str("=");
  134. match value {
  135. &OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  136. &OwnedValue::Integer(ref i) => line.push_str(&as_integer(i)),
  137. &OwnedValue::Float(ref f) => line.push_str(&as_float(f)),
  138. &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b))
  139. };
  140. }
  141. match measurement.timestamp {
  142. Some(t) => {
  143. line.push_str(" ");
  144. line.push_str(&t.to_string());
  145. }
  146. _ => {}
  147. }
  148. }
  149. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  150. thread::spawn(move || {
  151. let _ = fs::create_dir("/tmp/mm");
  152. let ctx = zmq::Context::new();
  153. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  154. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  155. let client = Client::new();
  156. let mut buf = String::with_capacity(4096);
  157. let mut server_resp = String::with_capacity(4096);
  158. let mut count = 0;
  159. loop {
  160. if let Ok(bytes) = socket.recv_bytes(0) {
  161. if let Ok(msg) = String::from_utf8(bytes) {
  162. count = match count {
  163. 0 => {
  164. buf.push_str(&msg);
  165. 1
  166. }
  167. n @ 1...40 => {
  168. buf.push_str("\n");
  169. buf.push_str(&msg);
  170. n + 1
  171. }
  172. _ => {
  173. buf.push_str("\n");
  174. buf.push_str(&msg);
  175. match client.post(url.clone())
  176. .body(&buf)
  177. .send() {
  178. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  179. Ok(mut resp) => {
  180. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  181. warnings.send(
  182. Warning::Error(
  183. format!("Influx server: {}", server_resp)));
  184. server_resp.clear();
  185. }
  186. Err(why) => {
  187. warnings.send(
  188. Warning::Error(
  189. format!("Influx write error: {}", why)));
  190. }
  191. }
  192. buf.clear();
  193. 0
  194. }
  195. }
  196. }
  197. }
  198. }
  199. })
  200. }
  201. #[derive(Debug, Clone, PartialEq)]
  202. pub enum OwnedValue {
  203. String(String),
  204. Float(f64),
  205. Integer(i64),
  206. Boolean(bool)
  207. }
  208. pub struct OwnedMeasurement {
  209. pub key: &'static str,
  210. pub timestamp: Option<i64>,
  211. pub fields: HashMap<&'static str, OwnedValue>,
  212. pub tags: HashMap<&'static str, &'static str>,
  213. pub string_tags: HashMap<&'static str, String>
  214. }
  215. impl OwnedMeasurement {
  216. pub fn new(key: &'static str) -> Self {
  217. OwnedMeasurement {
  218. key,
  219. timestamp: None,
  220. fields: HashMap::new(),
  221. tags: HashMap::new(),
  222. string_tags: HashMap::new()
  223. }
  224. }
  225. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  226. self.tags.insert(key, value);
  227. self
  228. }
  229. pub fn add_string_tag(mut self, key: &'static str, value: String) -> Self {
  230. self.string_tags.insert(key, value);
  231. self
  232. }
  233. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  234. self.fields.insert(key, value);
  235. self
  236. }
  237. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  238. self.timestamp = Some(timestamp);
  239. self
  240. }
  241. }
  242. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  243. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  244. }
  245. //pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) }
  246. /// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
  247. /// incoming `Measurement`s that way *in addition* to the old socket/`String`
  248. /// method
  249. ///
  250. pub fn writer_str_or_meas(warnings: Sender<Warning>) -> (thread::JoinHandle<()>, Sender<OwnedMeasurement>) {
  251. let (tx, rx) = channel();
  252. let thread = thread::spawn(move || {
  253. let _ = fs::create_dir("/tmp/mm");
  254. let ctx = zmq::Context::new();
  255. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  256. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  257. let client = Client::new();
  258. let mut meas_buf = String::with_capacity(4096);
  259. let mut buf = String::with_capacity(4096);
  260. let mut server_resp = String::with_capacity(4096);
  261. let mut count = 0;
  262. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  263. match prev {
  264. 0 => {
  265. buf.push_str(s);
  266. 1
  267. }
  268. n @ 1...40 => {
  269. buf.push_str("\n");
  270. buf.push_str(s);
  271. n + 1
  272. }
  273. _ => {
  274. buf.push_str("\n");
  275. buf.push_str(s);
  276. match client.post(url.clone())
  277. .body(buf.as_str())
  278. .send() {
  279. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  280. Ok(mut resp) => {
  281. let mut server_resp = String::with_capacity(1024);
  282. server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
  283. server_resp.push_str(&buf);
  284. server_resp.push_str("\nreceived:\n");
  285. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  286. OpenOptions::new()
  287. .create(true)
  288. .append(true)
  289. .open("/home/jstrong/src/market-maker/influx-errors.txt")
  290. .map_err(|e| {
  291. warnings.send(Warning::Error(format!("failed to save influx error: {}", e)));
  292. }).map(|mut file| {
  293. write!(file, "{}", server_resp);
  294. });
  295. server_resp.truncate(120);
  296. warnings.send(
  297. Warning::Error(
  298. format!("Influx server: {}", server_resp)));
  299. }
  300. Err(why) => {
  301. warnings.send(
  302. Warning::Error(
  303. format!("Influx write error: {}", why)));
  304. }
  305. }
  306. buf.clear();
  307. 0
  308. }
  309. }
  310. };
  311. loop {
  312. rx.try_recv()
  313. .map(|meas| {
  314. serialize_owned(&meas, &mut meas_buf);
  315. count = next(count, &meas_buf, &mut buf);
  316. meas_buf.clear();
  317. });
  318. socket.recv_bytes(0).ok()
  319. .and_then(|bytes| {
  320. String::from_utf8(bytes).ok()
  321. }).map(|s| {
  322. count = next(count, &s, &mut buf);
  323. });
  324. }
  325. });
  326. (thread, tx)
  327. }
  328. mod tests {
  329. use super::*;
  330. #[test]
  331. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  332. let ctx = zmq::Context::new();
  333. let socket = push(&ctx).unwrap();
  334. let (tx, rx) = channel();
  335. let w = writer(tx.clone());
  336. let mut buf = String::with_capacity(4096);
  337. let mut meas = Measurement::new("rust_test");
  338. meas.add_tag("a", "t");
  339. meas.add_field("c", Value::Float(1.23456));
  340. let now = now();
  341. meas.set_timestamp(now);
  342. serialize(&meas, &mut buf);
  343. socket.send_str(&buf, 0);
  344. drop(w);
  345. }
  346. #[test]
  347. fn it_serializes_a_measurement_in_place() {
  348. let mut buf = String::with_capacity(4096);
  349. let mut meas = Measurement::new("rust_test");
  350. meas.add_tag("a", "b");
  351. meas.add_field("c", Value::Float(1.0));
  352. let now = now();
  353. meas.set_timestamp(now);
  354. serialize(&meas, &mut buf);
  355. let ans = format!("rust_test,a=b c=1 {}", now);
  356. assert_eq!(buf, ans);
  357. }
  358. #[test]
  359. fn it_serializes_a_hard_to_serialize_message() {
  360. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  361. let mut buf = String::new();
  362. let mut server_resp = String::new();
  363. let mut m = Measurement::new("rust_test");
  364. m.add_field("s", Value::String(&raw));
  365. let now = now();
  366. m.set_timestamp(now);
  367. serialize(&m, &mut buf);
  368. println!("{}", buf);
  369. buf.push_str("\n");
  370. let buf_copy = buf.clone();
  371. buf.push_str(&buf_copy);
  372. println!("{}", buf);
  373. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  374. let client = Client::new();
  375. match client.post(url.clone())
  376. .body(&buf)
  377. .send() {
  378. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  379. Ok(mut resp) => {
  380. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  381. panic!("{}", server_resp);
  382. }
  383. Err(why) => {
  384. panic!(why)
  385. }
  386. }
  387. }
  388. #[test]
  389. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  390. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  391. let mut buf = String::new();
  392. let mut server_resp = String::new();
  393. let mut m = OwnedMeasurement::new("rust_test")
  394. .add_field("s", OwnedValue::String(raw.to_string()))
  395. .set_timestamp(now());
  396. serialize_owned(&m, &mut buf);
  397. println!("{}", buf);
  398. buf.push_str("\n");
  399. let buf_copy = buf.clone();
  400. buf.push_str(&buf_copy);
  401. println!("{}", buf);
  402. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  403. let client = Client::new();
  404. match client.post(url.clone())
  405. .body(&buf)
  406. .send() {
  407. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  408. Ok(mut resp) => {
  409. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  410. panic!("{}", server_resp);
  411. }
  412. Err(why) => {
  413. panic!(why)
  414. }
  415. }
  416. }
  417. }