You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
8.2KB

  1. //! Utilities to efficiently send data to influx
  2. //!
  3. use std::iter::FromIterator;
  4. use std::io::Read;
  5. use std::sync::mpsc::{Sender, Receiver, channel};
  6. use std::thread;
  7. use hyper::status::StatusCode;
  8. use hyper::client::response::Response;
  9. use hyper::Url;
  10. use hyper::client::Client;
  11. use influent::measurement::{Measurement, Value};
  12. use zmq;
  13. use chrono::{DateTime, Utc, TimeZone};
  14. use super::nanos;
  15. use warnings::Warning;
  16. const WRITER_ADDR: &'static str = "ipc://mm-influx";
  17. //const WRITER_ADDR: &'static str = "tcp://127.0.0.1:17853";
  18. const DB_NAME: &'static str = "mm";
  19. const DB_HOST: &'static str = "http://localhost:8086/write";
  20. const ZMQ_RCV_HWM: i32 = 0;
  21. const ZMQ_SND_HWM: i32 = 0;
  22. pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  23. let socket = ctx.socket(zmq::PULL)?;
  24. socket.bind(WRITER_ADDR)?;
  25. socket.set_rcvhwm(ZMQ_RCV_HWM)?;
  26. Ok(socket)
  27. }
  28. pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
  29. let socket = ctx.socket(zmq::PUSH)?;
  30. socket.connect(WRITER_ADDR)?;
  31. socket.set_sndhwm(ZMQ_SND_HWM)?;
  32. Ok(socket)
  33. }
  34. fn escape(s: &str) -> String {
  35. s
  36. .replace(" ", "\\ ")
  37. .replace(",", "\\,")
  38. }
  39. fn as_string(s: &str) -> String {
  40. // the second replace removes double escapes
  41. //
  42. format!("\"{}\"", s.replace("\"", "\\\"")
  43. .replace(r#"\\""#, r#"\""#))
  44. }
  45. #[test]
  46. fn it_checks_as_string_does_not_double_escape() {
  47. let raw = "this is \\\"an escaped string\\\" so it's problematic";
  48. let escaped = as_string(&raw);
  49. assert_eq!(escaped, format!("\"{}\"", raw).as_ref());
  50. }
  51. fn as_integer(i: &i64) -> String {
  52. format!("{}i", i)
  53. }
  54. fn as_float(f: &f64) -> String {
  55. f.to_string()
  56. }
  57. fn as_boolean(b: &bool) -> &str {
  58. if *b { "t" } else { "f" }
  59. }
  60. pub fn now() -> i64 {
  61. nanos(Utc::now()) as i64
  62. }
  63. /// Serialize the measurement into influx line protocol
  64. /// and append to the buffer.
  65. ///
  66. /// # Examples
  67. ///
  68. /// ```
  69. /// extern crate influent;
  70. /// extern crate logging;
  71. ///
  72. /// use influent::measurement::{Measurement, Value};
  73. /// use std::string::String;
  74. /// use logging::influx::serialize;
  75. ///
  76. /// fn main() {
  77. /// let mut buf = String::new();
  78. /// let mut m = Measurement::new("test");
  79. /// m.add_field("x", Value::Integer(1));
  80. /// serialize(&m, &mut buf);
  81. /// }
  82. ///
  83. /// ```
  84. ///
  85. pub fn serialize(measurement: &Measurement, line: &mut String) {
  86. line.push_str(&escape(measurement.key));
  87. for (tag, value) in measurement.tags.iter() {
  88. line.push_str(",");
  89. line.push_str(&escape(tag));
  90. line.push_str("=");
  91. line.push_str(&escape(value));
  92. }
  93. let mut was_spaced = false;
  94. for (field, value) in measurement.fields.iter() {
  95. line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }});
  96. line.push_str(&escape(field));
  97. line.push_str("=");
  98. match value {
  99. &Value::String(ref s) => line.push_str(&as_string(s)),
  100. &Value::Integer(ref i) => line.push_str(&as_integer(i)),
  101. &Value::Float(ref f) => line.push_str(&as_float(f)),
  102. &Value::Boolean(ref b) => line.push_str(as_boolean(b))
  103. };
  104. }
  105. match measurement.timestamp {
  106. Some(t) => {
  107. line.push_str(" ");
  108. line.push_str(&t.to_string());
  109. }
  110. _ => {}
  111. }
  112. }
  113. pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> {
  114. let ctx = zmq::Context::new();
  115. let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
  116. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  117. let client = Client::new();
  118. let mut buf = String::with_capacity(4096);
  119. let mut server_resp = String::with_capacity(4096);
  120. let mut count = 0;
  121. thread::spawn(move || {
  122. loop {
  123. if let Ok(bytes) = socket.recv_bytes(0) {
  124. if let Ok(msg) = String::from_utf8(bytes) {
  125. count = match count {
  126. 0 => {
  127. buf.push_str(&msg);
  128. 1
  129. }
  130. n @ 1...20 => {
  131. buf.push_str("\n");
  132. buf.push_str(&msg);
  133. n + 1
  134. }
  135. _ => {
  136. buf.push_str("\n");
  137. buf.push_str(&msg);
  138. match client.post(url.clone())
  139. .body(&buf)
  140. .send() {
  141. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  142. Ok(mut resp) => {
  143. //let mut body = String::with_capacity(4096);
  144. //let _ =
  145. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  146. //println!("Influx write error: Server responded {} (sent '{}' to {}):\n{}",
  147. //warnings.send(Warning::Error(buf.clone()));
  148. //print!("\n\n\n\n\n{}", buf);
  149. warnings.send(
  150. Warning::Error(
  151. format!("Influx server: {}", server_resp)));
  152. server_resp.clear();
  153. //resp.status, String::from_utf8_lossy(&bytes), url, body);
  154. }
  155. Err(why) => {
  156. warnings.send(
  157. Warning::Error(
  158. format!("Influx write error: {}", why)));
  159. }
  160. }
  161. buf.clear();
  162. 0
  163. }
  164. }
  165. }
  166. }
  167. }
  168. })
  169. }
  170. mod tests {
  171. use super::*;
  172. #[test]
  173. fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() {
  174. let ctx = zmq::Context::new();
  175. let socket = push(&ctx).unwrap();
  176. let (tx, rx) = channel();
  177. let w = writer(tx.clone());
  178. let mut buf = String::with_capacity(4096);
  179. let mut meas = Measurement::new("rust_test");
  180. meas.add_tag("a", "t");
  181. meas.add_field("c", Value::Float(1.23456));
  182. let now = now();
  183. meas.set_timestamp(now);
  184. serialize(&meas, &mut buf);
  185. socket.send_str(&buf, 0);
  186. drop(w);
  187. }
  188. #[test]
  189. fn it_serializes_a_measurement_in_place() {
  190. let mut buf = String::with_capacity(4096);
  191. let mut meas = Measurement::new("rust_test");
  192. meas.add_tag("a", "b");
  193. meas.add_field("c", Value::Float(1.0));
  194. let now = now();
  195. meas.set_timestamp(now);
  196. serialize(&meas, &mut buf);
  197. let ans = format!("rust_test,a=b c=1 {}", now);
  198. assert_eq!(buf, ans);
  199. }
  200. #[test]
  201. fn it_serializes_a_hard_to_serialize_message() {
  202. let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#;
  203. let mut buf = String::new();
  204. let mut server_resp = String::new();
  205. let mut m = Measurement::new("rust_test");
  206. m.add_field("s", Value::String(&raw));
  207. let now = now();
  208. m.set_timestamp(now);
  209. serialize(&m, &mut buf);
  210. println!("{}", buf);
  211. buf.push_str("\n");
  212. let buf_copy = buf.clone();
  213. buf.push_str(&buf_copy);
  214. println!("{}", buf);
  215. let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
  216. let client = Client::new();
  217. match client.post(url.clone())
  218. .body(&buf)
  219. .send() {
  220. Ok(Response { status, .. }) if status == StatusCode::NoContent => {}
  221. Ok(mut resp) => {
  222. resp.read_to_string(&mut server_resp); //.unwrap_or(0);
  223. panic!("{}", server_resp);
  224. }
  225. Err(why) => {
  226. panic!(why)
  227. }
  228. }
  229. }
  230. }