You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

470 lines
15KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::{Write, Read};
  5. use std::sync::mpsc::{Sender, Receiver, channel};
  6. use std::thread;
  7. use std::collections::HashMap;
  8. use std::fs::{self, OpenOptions};
  9. use hyper::status::StatusCode;
  10. use hyper::client::response::Response;
  11. use hyper::Url;
  12. use hyper::client::Client;
  13. use influent::measurement::{Measurement, Value};
  14. use zmq;
  15. use chrono::{DateTime, Utc, TimeZone};
  16. use super::nanos;
  17. use warnings::Warning;
  18. const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx";
  19. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  20. const DB_NAME: &'static str = "mm";
  21. const DB_HOST: &'static str = "http://localhost:8086/write";
  22. const ZMQ_RCV_HWM: i32 = 0;
  23. const ZMQ_SND_HWM: i32 = 0;
  24. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  25. let socket = ctx.socket(zmq::PULL)?;
  26. socket.bind(WRITER_ADDR)?;
  27. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  28. Ok(socket)
  29. }
  30. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  31. let socket = ctx.socket(zmq::PUSH)?;
  32. socket.connect(WRITER_ADDR)?;
  33. socket.set_sndhwm(ZMQ_SND_HWM)?;
  34. Ok(socket)
  35. }
  36. fn escape(s: &str) -> String {
  37. s.replace(" ", "\\ ")
  38. .replace(",", "\\,")
  39. }
  40. fn as_string(s: &str) -> String {
  41. // the second replace removes double escapes
  42. //
  43. format!("\"{}\"", s.replace("\"", "\\\"")
  44. .replace(r#"\\""#, r#"\""#))
  45. }
  46. #[test]
  47. fn it_checks_as_string_does_not_double_escape() {
  48. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  49. let escaped = as_string(&raw);
  50. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  51. }
  52. fn as_integer(i: &i64) -> String {
  53. format!("{}i", i)
  54. }
  55. fn as_float(f: &f64) -> String {
  56. f.to_string()
  57. }
  58. fn as_boolean(b: &bool) -> &str {
  59. if *b { "t" } else { "f" }
  60. }
  61. pub fn now() -> i64 {
  62. nanos(Utc::now()) as i64
  63. }
  64. /// Serialize the measurement into influx line protocol
  65. /// and append to the buffer.
  66. ///
  67. /// # Examples
  68. ///
  69. /// ```
  70. /// extern crate influent;
  71. /// extern crate logging;
  72. ///
  73. /// use influent::measurement::{Measurement, Value};
  74. /// use std::string::String;
  75. /// use logging::influx::serialize;
  76. ///
  77. /// fn main() {
  78. /// let mut buf = String::new();
  79. /// let mut m = Measurement::new("test");
  80. /// m.add_field("x", Value::Integer(1));
  81. /// serialize(&m, &mut buf);
  82. /// }
  83. ///
  84. /// ```
  85. ///
  86. pub fn serialize(measurement: &Measurement, line: &mut String) {
  87. line.push_str(&escape(measurement.key));
  88. for (tag, value) in measurement.tags.iter() {
  89. line.push_str(",");
  90. line.push_str(&escape(tag));
  91. line.push_str("=");
  92. line.push_str(&escape(value));
  93. }
  94. let mut was_spaced = false;
  95. for (field, value) in measurement.fields.iter() {
  96. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  97. line.push_str(&escape(field));
  98. line.push_str("=");
  99. match value {
  100. &Value::String(ref s) => line.push_str(&as_string(s)),
  101. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  102. &Value::Float(ref f) => line.push_str(&as_float(f)),
  103. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  104. };
  105. }
  106. match measurement.timestamp {
  107. Some(t) => {
  108. line.push_str(" ");
  109. line.push_str(&t.to_string());
  110. }
  111. _ => {}
  112. }
  113. }
  114. pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
  115. line.push_str(&escape(measurement.key));
  116. for (tag, value) in measurement.tags.iter() {
  117. line.push_str(",");
  118. line.push_str(&escape(tag));
  119. line.push_str("=");
  120. line.push_str(&escape(value));
  121. }
  122. let mut was_spaced = false;
  123. for (field, value) in measurement.fields.iter() {
  124. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  125. line.push_str(&escape(field));
  126. line.push_str("=");
  127. match value {
  128. &OwnedValue::String(ref s) => line.push_str(&as_string(s)),
  129. &OwnedValue::Integer(ref i) => line.push_str(&as_integer(i)),
  130. &OwnedValue::Float(ref f) => line.push_str(&as_float(f)),
  131. &OwnedValue::Boolean(ref b) => line.push_str(as_boolean(b))
  132. };
  133. }
  134. match measurement.timestamp {
  135. Some(t) => {
  136. line.push_str(" ");
  137. line.push_str(&t.to_string());
  138. }
  139. _ => {}
  140. }
  141. }
  142. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  143. thread::spawn(move || {
  144. let _ = fs::create_dir("/tmp/mm");
  145. let ctx = zmq::Context::new();
  146. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  147. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  148. let client = Client::new();
  149. let mut buf = String::with_capacity(4096);
  150. let mut server_resp = String::with_capacity(4096);
  151. let mut count = 0;
  152. loop {
  153. if let Ok(bytes) = socket.recv_bytes(0) {
  154. if let Ok(msg) = String::from_utf8(bytes) {
  155. count = match count {
  156. 0 => {
  157. buf.push_str(&msg);
  158. 1
  159. }
  160. n @ 1...40 => {
  161. buf.push_str("\n");
  162. buf.push_str(&msg);
  163. n + 1
  164. }
  165. _ => {
  166. buf.push_str("\n");
  167. buf.push_str(&msg);
  168. match client.post(url.clone())
  169. .body(&buf)
  170. .send() {
  171. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  172. Ok(mut resp) => {
  173. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  174. warnings.send(
  175. Warning::Error(
  176. format!("Influx server: {}", server_resp)));
  177. server_resp.clear();
  178. }
  179. Err(why) => {
  180. warnings.send(
  181. Warning::Error(
  182. format!("Influx write error: {}", why)));
  183. }
  184. }
  185. buf.clear();
  186. 0
  187. }
  188. }
  189. }
  190. }
  191. }
  192. })
  193. }
  194. #[derive(Debug, Clone, PartialEq)]
  195. pub enum OwnedValue {
  196. String(String),
  197. Float(f64),
  198. Integer(i64),
  199. Boolean(bool)
  200. }
  201. pub struct OwnedMeasurement {
  202. pub key: &'static str,
  203. pub timestamp: Option<i64>,
  204. pub fields: HashMap<&'static str, OwnedValue>,
  205. pub tags: HashMap<&'static str, &'static str>
  206. }
  207. impl OwnedMeasurement {
  208. pub fn new(key: &'static str) -> Self {
  209. OwnedMeasurement {
  210. key,
  211. timestamp: None,
  212. fields: HashMap::new(),
  213. tags: HashMap::new()
  214. }
  215. }
  216. pub fn add_tag(mut self, key: &'static str, value: &'static str) -> Self {
  217. self.tags.insert(key, value);
  218. self
  219. }
  220. pub fn add_field(mut self, key: &'static str, value: OwnedValue) -> Self {
  221. self.fields.insert(key, value);
  222. self
  223. }
  224. pub fn set_timestamp(mut self, timestamp: i64) -> Self {
  225. self.timestamp = Some(timestamp);
  226. self
  227. }
  228. }
  229. pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
  230. (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
  231. }
  232. //pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) }
  233. /// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
  234. /// incoming `Measurement`s that way *in addition* to the old socket/`String`
  235. /// method
  236. ///
  237. pub fn writer_str_or_meas(warnings: Sender<Warning>) -> (thread::JoinHandle<()>, Sender<OwnedMeasurement>) {
  238. let (tx, rx) = channel();
  239. let thread = thread::spawn(move || {
  240. let _ = fs::create_dir("/tmp/mm");
  241. let ctx = zmq::Context::new();
  242. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  243. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  244. let client = Client::new();
  245. let mut meas_buf = String::with_capacity(4096);
  246. let mut buf = String::with_capacity(4096);
  247. let mut server_resp = String::with_capacity(4096);
  248. let mut count = 0;
  249. let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
  250. match prev {
  251. 0 => {
  252. buf.push_str(s);
  253. 1
  254. }
  255. n @ 1...40 => {
  256. buf.push_str("\n");
  257. buf.push_str(s);
  258. n + 1
  259. }
  260. _ => {
  261. buf.push_str("\n");
  262. buf.push_str(s);
  263. match client.post(url.clone())
  264. .body(buf.as_str())
  265. .send() {
  266. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  267. Ok(mut resp) => {
  268. let mut server_resp = String::with_capacity(1024);
  269. server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
  270. server_resp.push_str(&buf);
  271. server_resp.push_str("\nreceived:\n");
  272. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  273. OpenOptions::new()
  274. .create(true)
  275. .append(true)
  276. .open("/home/jstrong/src/market-maker/influx-errors.txt")
  277. .map_err(|e| {
  278. warnings.send(Warning::Error(format!("failed to save influx error: {}", e)));
  279. }).map(|mut file| {
  280. write!(file, "{}", server_resp);
  281. });
  282. server_resp.truncate(120);
  283. warnings.send(
  284. Warning::Error(
  285. format!("Influx server: {}", server_resp)));
  286. }
  287. Err(why) => {
  288. warnings.send(
  289. Warning::Error(
  290. format!("Influx write error: {}", why)));
  291. }
  292. }
  293. buf.clear();
  294. 0
  295. }
  296. }
  297. };
  298. loop {
  299. rx.try_recv()
  300. .map(|meas| {
  301. serialize_owned(&meas, &mut meas_buf);
  302. count = next(count, &meas_buf, &mut buf);
  303. meas_buf.clear();
  304. });
  305. socket.recv_bytes(0).ok()
  306. .and_then(|bytes| {
  307. String::from_utf8(bytes).ok()
  308. }).map(|s| {
  309. count = next(count, &s, &mut buf);
  310. });
  311. }
  312. });
  313. (thread, tx)
  314. }
  315. mod tests {
  316. use super::*;
  317. #[test]
  318. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  319. let ctx = zmq::Context::new();
  320. let socket = push(&ctx).unwrap();
  321. let (tx, rx) = channel();
  322. let w = writer(tx.clone());
  323. let mut buf = String::with_capacity(4096);
  324. let mut meas = Measurement::new("rust_test");
  325. meas.add_tag("a", "t");
  326. meas.add_field("c", Value::Float(1.23456));
  327. let now = now();
  328. meas.set_timestamp(now);
  329. serialize(&meas, &mut buf);
  330. socket.send_str(&buf, 0);
  331. drop(w);
  332. }
  333. #[test]
  334. fn it_serializes_a_measurement_in_place() {
  335. let mut buf = String::with_capacity(4096);
  336. let mut meas = Measurement::new("rust_test");
  337. meas.add_tag("a", "b");
  338. meas.add_field("c", Value::Float(1.0));
  339. let now = now();
  340. meas.set_timestamp(now);
  341. serialize(&meas, &mut buf);
  342. let ans = format!("rust_test,a=b c=1 {}", now);
  343. assert_eq!(buf, ans);
  344. }
  345. #[test]
  346. fn it_serializes_a_hard_to_serialize_message() {
  347. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  348. let mut buf = String::new();
  349. let mut server_resp = String::new();
  350. let mut m = Measurement::new("rust_test");
  351. m.add_field("s", Value::String(&raw));
  352. let now = now();
  353. m.set_timestamp(now);
  354. serialize(&m, &mut buf);
  355. println!("{}", buf);
  356. buf.push_str("\n");
  357. let buf_copy = buf.clone();
  358. buf.push_str(&buf_copy);
  359. println!("{}", buf);
  360. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  361. let client = Client::new();
  362. match client.post(url.clone())
  363. .body(&buf)
  364. .send() {
  365. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  366. Ok(mut resp) => {
  367. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  368. panic!("{}", server_resp);
  369. }
  370. Err(why) => {
  371. panic!(why)
  372. }
  373. }
  374. }
  375. #[test]
  376. fn it_serializes_a_hard_to_serialize_message_from_owned() {
  377. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  378. let mut buf = String::new();
  379. let mut server_resp = String::new();
  380. let mut m = OwnedMeasurement::new("rust_test")
  381. .add_field("s", OwnedValue::String(raw.to_string()))
  382. .set_timestamp(now());
  383. serialize_owned(&m, &mut buf);
  384. println!("{}", buf);
  385. buf.push_str("\n");
  386. let buf_copy = buf.clone();
  387. buf.push_str(&buf_copy);
  388. println!("{}", buf);
  389. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  390. let client = Client::new();
  391. match client.post(url.clone())
  392. .body(&buf)
  393. .send() {
  394. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  395. Ok(mut resp) => {
  396. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  397. panic!("{}", server_resp);
  398. }
  399. Err(why) => {
  400. panic!(why)
  401. }
  402. }
  403. }
  404. }