diff --git a/.gitignore b/.gitignore index f594fe0..33ba9ab 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ Cargo.lock .*.swp /var/*.log /var/ +log/ diff --git a/Cargo.toml b/Cargo.toml index 4974667..aad4b84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,25 +4,16 @@ version = "0.5.2" authors = ["Jonathan Strong "] edition = "2018" -[[example]] -name = "zmq-logger" -path = "examples/zmq-logger.rs" -required-features = ["warnings", "zmq"] - [[example]] name = "hist-interval" path = "examples/hist-interval.rs" [dependencies] -zmq = { version = "0.8", optional = true } -influent = "0.4" chrono = { version = "0.4", features = ["serde"] } hyper = "0.10" termion = "1.4.0" slog = "2.0.6" slog-term = "2" -ordermap = "0.3" -fnv = "1" uuid = { version = "0.6", features = ["serde", "v4", "nightly", "const_fn"] } hdrhistogram = "6" slog-async = "2" @@ -35,8 +26,10 @@ pretty_toa = "1.0.0" sloggers = "0.3" #sloggers = { path = "../sloggers" } -decimal = { path = "../decimal", version = "2" } +#decimal = { path = "../decimal", version = "2" } #decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } +decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } +decimal-macros = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } #windows = { path = "../windows", version = "0.2" } money = { path = "../money", version = "0.3" } diff --git a/examples/zmq-logger.rs b/examples/zmq-logger.rs deleted file mode 100644 index 8e47498..0000000 --- a/examples/zmq-logger.rs +++ /dev/null @@ -1,36 +0,0 @@ -#![allow(unused_imports)] -#[macro_use] extern crate slog; -extern crate logging; -extern crate slog_term; - -use slog::*; -use logging::warnings::ZmqDrain; - -use std::io::Write; -use std::thread; -use std::time::Duration; - -fn main() { - //let term_decorator = slog_term::TermDecorator::new().build(); - //let term_drain = slog_term::CompactFormat::new(term_decorator).build().fuse(); - let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); - let plain_fuse = slog_term::FullFormat::new(plain).build().fuse(); - let w = logging::warnings::WarningsManager::new("test"); - let w_drain = logging::warnings::WarningsDrain::new(w.tx.clone(), Level::Debug, plain_fuse); - //let zmq_drain = ZmqDrain::new(plain_fuse); - //let zmq_decorator = slog_term::PlainSyncDecorator::new(zmq_drain); - //let zmq_fuse = slog_term::FullFormat::new(zmq_decorator).build().fuse(); - let logger = Logger::root(w_drain, o!()); - //let logger = - // Logger::root(Duplicate::new(plain_fuse, zmq_fuse).fuse(), o!()); - - let mut i = 0; - - loop { - info!(logger, "hello world"; - "i" => i); - i += 1; - thread::sleep(Duration::from_secs(1)); - } - -} diff --git a/src/influx.rs b/src/influx.rs index 045cd32..b170927 100644 --- a/src/influx.rs +++ b/src/influx.rs @@ -3,24 +3,16 @@ use std::io::Read; use std::sync::Arc; -//use std::sync::mpsc::{Sender, Receiver, channel, SendError}; use crossbeam_channel::{Sender, Receiver, bounded, SendError}; use std::{thread, mem}; use std::time::*; -use std::hash::BuildHasherDefault; use std::collections::VecDeque; use hyper::status::StatusCode; use hyper::client::response::Response; use hyper::Url; use hyper::client::Client; -use influent::measurement::{Measurement, Value}; -#[cfg(feature = "zmq")] -use zmq; -#[allow(unused_imports)] use chrono::{DateTime, Utc}; -use ordermap::OrderMap; -use fnv::FnvHasher; use decimal::d128; use uuid::Uuid; use smallvec::SmallVec; @@ -31,16 +23,10 @@ use super::{nanos, file_logger, LOG_LEVEL}; #[cfg(feature = "warnings")] use warnings::Warning; -pub use super::{dur_nanos, dt_nanos}; - -pub type Map = OrderMap>; +pub use super::{dur_nanos, dt_nanos, measure}; pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096; -pub fn new_map(capacity: usize) -> Map { - Map::with_capacity_and_hasher(capacity, Default::default()) -} - /// Created this so I know what types can be passed through the /// `measure!` macro, which used to convert with `as i64` and /// `as f64` until I accidentally passed a function name, and it @@ -154,7 +140,7 @@ macro_rules! measure { (@kv v, $meas:ident, $k:expr) => { measure!(@ea tag, $meas, "version", $k) }; (@kv $t:tt, $meas:ident, $k:tt) => { measure!(@ea $t, $meas, stringify!($k), measure!(@as_expr $k)) }; (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; - (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; + (@ea t, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, &$v); }; (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) }; (@ea i, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer(AsI64::as_i64($v))) }; (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float(AsF64::as_f64($v))) }; @@ -816,14 +802,6 @@ fn it_checks_as_string_does_not_double_escape() { assert_eq!(escaped, format!("\"{}\"", raw).as_ref()); } -fn as_integer(i: &i64) -> String { - format!("{}i", i) -} - -fn as_float(f: &f64) -> String { - f.to_string() -} - fn as_boolean(b: &bool) -> &str { if *b { "t" } else { "f" } } @@ -832,62 +810,6 @@ pub fn now() -> i64 { nanos(Utc::now()) as i64 } -/// Serialize the measurement into influx line protocol -/// and append to the buffer. -/// -/// # Examples -/// -/// ``` -/// extern crate influent; -/// extern crate logging; -/// -/// use influent::measurement::{Measurement, Value}; -/// use std::string::String; -/// use logging::influx::serialize; -/// -/// fn main() { -/// let mut buf = String::new(); -/// let mut m = Measurement::new("test"); -/// m.add_field("x", Value::Integer(1)); -/// serialize(&m, &mut buf); -/// } -/// -/// ``` -/// -pub fn serialize(measurement: &Measurement, line: &mut String) { - line.push_str(&escape(measurement.key)); - - for (tag, value) in measurement.tags.iter() { - line.push_str(","); - line.push_str(&escape(tag)); - line.push_str("="); - line.push_str(&escape(value)); - } - - let mut was_spaced = false; - - for (field, value) in measurement.fields.iter() { - line.push_str({if !was_spaced { was_spaced = true; " " } else { "," }}); - line.push_str(&escape(field)); - line.push_str("="); - - match value { - &Value::String(ref s) => line.push_str(&as_string(s)), - &Value::Integer(ref i) => line.push_str(&as_integer(i)), - &Value::Float(ref f) => line.push_str(&as_float(f)), - &Value::Boolean(ref b) => line.push_str(as_boolean(b)) - }; - } - - match measurement.timestamp { - Some(t) => { - line.push_str(" "); - line.push_str(&t.to_string()); - } - _ => {} - } -} - /// Serializes an `&OwnedMeasurement` as influx line protocol into `line`. /// /// The serialized measurement is appended to the end of the string without @@ -1391,86 +1313,12 @@ mod tests { loop { if rx.recv().is_err() { break } } }); b.iter(|| { - measure!(tx, test, - tag[color; "red"], - tag[mood => "playful"], - tag [ ticker => "xmr_btc" ], - float[ price => 1.2345 ], - float[ amount => 56.323], - int[n; 1], - time[now()] - ); + measure!(tx, test, t(color, "red"), t(mood, "playful"), + t(ticker, "xmr_btc"), f(price, 1.2345), f(amount, 56.322), + i(n, 1), tm(now())); }); } - #[cfg(feature = "zmq")] - #[cfg(feature = "warnings")] - #[test] - #[ignore] - fn it_spawns_a_writer_thread_and_sends_dummy_measurement_to_influxdb() { - let ctx = zmq::Context::new(); - let socket = push(&ctx).unwrap(); - let (tx, rx) = bounded(1024); - let w = writer(tx.clone()); - let mut buf = String::with_capacity(4096); - let mut meas = Measurement::new("rust_test"); - meas.add_tag("a", "t"); - meas.add_field("c", Value::Float(1.23456)); - let now = now(); - meas.set_timestamp(now); - serialize(&meas, &mut buf); - socket.send_str(&buf, 0).unwrap(); - drop(w); - } - - #[test] - fn it_serializes_a_measurement_in_place() { - let mut buf = String::with_capacity(4096); - let mut meas = Measurement::new("rust_test"); - meas.add_tag("a", "b"); - meas.add_field("c", Value::Float(1.0)); - let now = now(); - meas.set_timestamp(now); - serialize(&meas, &mut buf); - let ans = format!("rust_test,a=b c=1 {}", now); - assert_eq!(buf, ans); - } - - #[test] - fn it_serializes_a_hard_to_serialize_message() { - let raw = r#"error encountered trying to send krkn order: Other("Failed to send http request: Other("Resource temporarily unavailable (os error 11)")")"#; - let mut buf = String::new(); - let mut server_resp = String::new(); - let mut m = Measurement::new("rust_test"); - m.add_field("s", Value::String(&raw)); - let now = now(); - m.set_timestamp(now); - serialize(&m, &mut buf); - println!("{}", buf); - buf.push_str("\n"); - let buf_copy = buf.clone(); - buf.push_str(&buf_copy); - println!("{}", buf); - - let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); - let client = Client::new(); - match client.post(url.clone()) - .body(&buf) - .send() { - - Ok(Response { status, .. }) if status == StatusCode::NoContent => {} - - Ok(mut resp) => { - resp.read_to_string(&mut server_resp).unwrap(); - panic!("{}", server_resp); - } - - Err(why) => { - panic!(why) - } - } - } - #[bench] fn serialize_owned_longer(b: &mut Bencher) { let mut buf = String::with_capacity(1024); diff --git a/src/warnings.rs b/src/warnings.rs index 34d7db0..750573e 100644 --- a/src/warnings.rs +++ b/src/warnings.rs @@ -9,8 +9,6 @@ use std::fmt::{self, Display, Error as FmtError, Formatter}; use std::io::{self, Write}; use std::fs; -#[cfg(feature = "zmq")] -use zmq; use chrono::{DateTime, Utc}; use termion::color::{self, Fg, Bg}; use influent::measurement::{Measurement, Value as InfluentValue}; @@ -414,71 +412,6 @@ pub struct WarningsManager { thread: Option> } -impl WarningsManager { - /// `measurement_name` is the name of the influxdb measurement - /// we will save log entries to. - /// - #[cfg(feature = "zmq")] - pub fn new(measurement_name: &'static str) -> Self { - let warnings = Arc::new(RwLock::new(VecDeque::new())); - let warnings_copy = warnings.clone(); - let (tx, rx) = channel(); - let mut buf = String::with_capacity(4096); - let ctx = zmq::Context::new(); - let socket = influx::push(&ctx).unwrap(); - let thread = thread::spawn(move || { - let path = format!("var/log/warnings-manager-{}.log", measurement_name); - let logger = file_logger(&path, Severity::Info); - info!(logger, "entering loop"); - loop { - if let Ok(msg) = rx.recv() { - match msg { - Warning::Terminate => { - debug!(logger, "terminating"); - break; - } - - Warning::Log { level, msg, kv, .. } => { - debug!(logger, "new Warning::Debug arrived"; - "msg" => &msg); - let mut meas = kv.to_measurement(measurement_name); - meas.add_field("msg", InfluentValue::String(msg.as_ref())); - meas.add_tag("category", level.as_short_str()); - influx::serialize(&meas, &mut buf); - let _ = socket.send_str(&buf, 0); - buf.clear(); - // and don't push to warnings - // bc it's debug - } - - other => { - debug!(logger, "new {} arrived", other.category_str(); - "msg" => other.category_str()); - let rec = Record::new(other); - { - let m = rec.to_measurement(measurement_name); - influx::serialize(&m, &mut buf); - let _ = socket.send_str(&buf, 0); - buf.clear(); - } - if let Ok(mut lock) = warnings.write() { - lock.push_front(rec); - lock.truncate(N_WARNINGS); - } - } - } - } - } - }); - - WarningsManager { - warnings: warnings_copy, - thread: Some(thread), - tx - } - } -} - impl Drop for WarningsManager { fn drop(&mut self) { let _ = self.tx.send(Warning::Terminate); @@ -488,127 +421,8 @@ impl Drop for WarningsManager { } } -#[cfg(feature = "zmq")] -#[allow(dead_code)] -pub struct ZmqDrain - where D: Drain, -{ - drain: D, - ctx: zmq::Context, - socket: zmq::Socket, - buf: Arc>> -} - -#[cfg(feature = "zmq")] -impl ZmqDrain - where D: Drain, -{ - pub fn new(drain: D) -> Self { - let _ = fs::create_dir("/tmp/mm"); - let ctx = zmq::Context::new(); - let socket = ctx.socket(zmq::PUB).unwrap(); - socket.bind("ipc:///tmp/mm/log").expect("zmq publisher bind failed"); - let buf = Arc::new(Mutex::new(Vec::with_capacity(4096))); - - ZmqDrain { - drain, - ctx, - socket, - buf - } - } -} - const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f"; -#[cfg(feature = "zmq")] -impl Drain for ZmqDrain - where D: Drain -{ - type Ok = D::Ok; - type Err = D::Err; - - fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result { - { - let mut buf = self.buf.lock().unwrap(); - let _ = write!(buf, "{time} {level}", - time = Utc::now().format(TIMESTAMP_FORMAT), - level = record.level().as_short_str()); - { - let mut thread_ser = ThreadSer(&mut buf); - let _ = record.kv().serialize(record, &mut thread_ser); - let _ = values.serialize(record, &mut thread_ser); - } - - let _ = write!(buf, " {file:<20} {line:<5} {msg}", - file = record.file(), - line = record.line(), - msg = record.msg()); - - { - let mut kv_ser = KvSer(&mut buf); - // discarding any errors here... - let _ = record.kv().serialize(record, &mut kv_ser); - let _ = values.serialize(record, &mut kv_ser); - } - - let _ = self.socket.send(&buf, 0); - buf.clear(); - } - self.drain.log(record, values) - } -} - -/// Can be used as a `Write` with `slog_term` and -/// other libraries. -/// -#[cfg(feature = "zmq")] -#[allow(dead_code)] -pub struct ZmqIo { - ctx: zmq::Context, - socket: zmq::Socket, - buf: Vec -} - -#[cfg(feature = "zmq")] -impl ZmqIo { - pub fn new(addr: &str) -> Self { - let _ = fs::create_dir("/tmp/mm"); - let ctx = zmq::Context::new(); - let socket = ctx.socket(zmq::PUB).unwrap(); - let addr = format!("ipc:///tmp/mm/{}", addr); - socket.bind(&addr).expect("zmq publisher bind failed"); - let buf = Vec::with_capacity(4096); - ZmqIo { ctx, socket, buf } - } -} - -#[cfg(feature = "zmq")] -impl Write for ZmqIo { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.buf.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - match self.buf.pop() { - Some(b'\n') => { - let _ = self.socket.send(&self.buf, 0); - } - - Some(other) => { - self.buf.push(other); - let _ = self.socket.send(&self.buf, 0); - } - - None => { - return Ok(()); - } - } - self.buf.clear(); - Ok(()) - } -} - /// Serializes *only* KV pair with `key == "thread"` /// struct ThreadSer<'a>(&'a mut Vec);