@@ -6,14 +6,14 @@ authors = ["Jonathan Strong <jonathan.strong@gmail.com>"] | |||||
[[example]] | [[example]] | ||||
name = "zmq-logger" | name = "zmq-logger" | ||||
path = "examples/zmq-logger.rs" | path = "examples/zmq-logger.rs" | ||||
required-features = ["warnings"] | |||||
required-features = ["warnings", "zmq"] | |||||
[[example]] | [[example]] | ||||
name = "hist-interval" | name = "hist-interval" | ||||
path = "examples/hist-interval.rs" | path = "examples/hist-interval.rs" | ||||
[dependencies] | [dependencies] | ||||
zmq = "0.8" | |||||
zmq = { version = "0.8", optional = true } | |||||
influent = "0.4" | influent = "0.4" | ||||
chrono = { version = "0.4", features = ["serde"] } | chrono = { version = "0.4", features = ["serde"] } | ||||
hyper = "0.10" | hyper = "0.10" | ||||
@@ -15,6 +15,7 @@ use hyper::client::response::Response; | |||||
use hyper::Url; | use hyper::Url; | ||||
use hyper::client::Client; | use hyper::client::Client; | ||||
use influent::measurement::{Measurement, Value}; | use influent::measurement::{Measurement, Value}; | ||||
#[cfg(feature = "zmq")] | |||||
use zmq; | use zmq; | ||||
#[allow(unused_imports)] | #[allow(unused_imports)] | ||||
use chrono::{DateTime, Utc}; | use chrono::{DateTime, Utc}; | ||||
@@ -398,8 +399,10 @@ impl Drop for InfluxWriter { | |||||
} | } | ||||
} | } | ||||
#[cfg(feature = "zmq")] | |||||
const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; | const WRITER_ADDR: &'static str = "ipc:///tmp/mm/influx"; | ||||
#[cfg(feature = "zmq")] | |||||
pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { | pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { | ||||
let socket = ctx.socket(zmq::PULL)?; | let socket = ctx.socket(zmq::PULL)?; | ||||
socket.bind(WRITER_ADDR)?; | socket.bind(WRITER_ADDR)?; | ||||
@@ -407,6 +410,7 @@ pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { | |||||
Ok(socket) | Ok(socket) | ||||
} | } | ||||
#[cfg(feature = "zmq")] | |||||
pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { | pub fn push(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { | ||||
let socket = ctx.socket(zmq::PUSH)?; | let socket = ctx.socket(zmq::PUSH)?; | ||||
socket.connect(WRITER_ADDR)?; | socket.connect(WRITER_ADDR)?; | ||||
@@ -583,6 +587,7 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { | |||||
#[cfg(feature = "warnings")] | #[cfg(feature = "warnings")] | ||||
#[deprecated(since="0.4", note="Replace with InfluxWriter")] | #[deprecated(since="0.4", note="Replace with InfluxWriter")] | ||||
#[cfg(feature = "zmq")] | |||||
pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> { | pub fn writer(warnings: Sender<Warning>) -> thread::JoinHandle<()> { | ||||
assert!(false); | assert!(false); | ||||
thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || { | thread::Builder::new().name("mm:inflx-wtr".into()).spawn(move || { | ||||
@@ -998,6 +1003,7 @@ mod tests { | |||||
}); | }); | ||||
} | } | ||||
#[cfg(feature = "zmq")] | |||||
#[cfg(feature = "warnings")] | #[cfg(feature = "warnings")] | ||||
#[test] | #[test] | ||||
#[ignore] | #[ignore] | ||||
@@ -647,7 +647,6 @@ impl LatencyManager { | |||||
let thread = Some(thread::spawn(move || { | let thread = Some(thread::spawn(move || { | ||||
let logger = file_logger("var/log/latency-manager.log", Severity::Info); | let logger = file_logger("var/log/latency-manager.log", Severity::Info); | ||||
info!(logger, "initializing zmq"); | |||||
info!(logger, "initializing DurationWindows"); | info!(logger, "initializing DurationWindows"); | ||||
let mut gdax_ws = windows::DurationWindow::new(d); | let mut gdax_ws = windows::DurationWindow::new(d); | ||||
@@ -9,7 +9,6 @@ | |||||
#[macro_use] extern crate money; | #[macro_use] extern crate money; | ||||
extern crate test; | extern crate test; | ||||
extern crate zmq; | |||||
extern crate influent; | extern crate influent; | ||||
extern crate chrono; | extern crate chrono; | ||||
extern crate hyper; | extern crate hyper; | ||||
@@ -24,6 +23,8 @@ extern crate uuid; | |||||
extern crate hdrhistogram; | extern crate hdrhistogram; | ||||
extern crate smallvec; | extern crate smallvec; | ||||
extern crate num; | extern crate num; | ||||
#[cfg(feature = "zmq")] | |||||
extern crate zmq; | |||||
extern crate pubsub as pub_sub; | extern crate pubsub as pub_sub; | ||||
@@ -9,6 +9,7 @@ use std::fmt::{self, Display, Error as FmtError, Formatter}; | |||||
use std::io::{self, Write}; | use std::io::{self, Write}; | ||||
use std::fs; | use std::fs; | ||||
#[cfg(feature = "zmq")] | |||||
use zmq; | use zmq; | ||||
use chrono::{DateTime, Utc}; | use chrono::{DateTime, Utc}; | ||||
use termion::color::{self, Fg, Bg}; | use termion::color::{self, Fg, Bg}; | ||||
@@ -417,6 +418,7 @@ impl WarningsManager { | |||||
/// `measurement_name` is the name of the influxdb measurement | /// `measurement_name` is the name of the influxdb measurement | ||||
/// we will save log entries to. | /// we will save log entries to. | ||||
/// | /// | ||||
#[cfg(feature = "zmq")] | |||||
pub fn new(measurement_name: &'static str) -> Self { | pub fn new(measurement_name: &'static str) -> Self { | ||||
let warnings = Arc::new(RwLock::new(VecDeque::new())); | let warnings = Arc::new(RwLock::new(VecDeque::new())); | ||||
let warnings_copy = warnings.clone(); | let warnings_copy = warnings.clone(); | ||||
@@ -486,6 +488,7 @@ impl Drop for WarningsManager { | |||||
} | } | ||||
} | } | ||||
#[cfg(feature = "zmq")] | |||||
#[allow(dead_code)] | #[allow(dead_code)] | ||||
pub struct ZmqDrain<D> | pub struct ZmqDrain<D> | ||||
where D: Drain, | where D: Drain, | ||||
@@ -496,6 +499,7 @@ pub struct ZmqDrain<D> | |||||
buf: Arc<Mutex<Vec<u8>>> | buf: Arc<Mutex<Vec<u8>>> | ||||
} | } | ||||
#[cfg(feature = "zmq")] | |||||
impl<D> ZmqDrain<D> | impl<D> ZmqDrain<D> | ||||
where D: Drain, | where D: Drain, | ||||
{ | { | ||||
@@ -517,6 +521,7 @@ impl<D> ZmqDrain<D> | |||||
const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f"; | const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f"; | ||||
#[cfg(feature = "zmq")] | |||||
impl<D> Drain for ZmqDrain<D> | impl<D> Drain for ZmqDrain<D> | ||||
where D: Drain | where D: Drain | ||||
{ | { | ||||
@@ -557,6 +562,7 @@ impl<D> Drain for ZmqDrain<D> | |||||
/// Can be used as a `Write` with `slog_term` and | /// Can be used as a `Write` with `slog_term` and | ||||
/// other libraries. | /// other libraries. | ||||
/// | /// | ||||
#[cfg(feature = "zmq")] | |||||
#[allow(dead_code)] | #[allow(dead_code)] | ||||
pub struct ZmqIo { | pub struct ZmqIo { | ||||
ctx: zmq::Context, | ctx: zmq::Context, | ||||
@@ -564,6 +570,7 @@ pub struct ZmqIo { | |||||
buf: Vec<u8> | buf: Vec<u8> | ||||
} | } | ||||
#[cfg(feature = "zmq")] | |||||
impl ZmqIo { | impl ZmqIo { | ||||
pub fn new(addr: &str) -> Self { | pub fn new(addr: &str) -> Self { | ||||
let _ = fs::create_dir("/tmp/mm"); | let _ = fs::create_dir("/tmp/mm"); | ||||
@@ -576,6 +583,7 @@ impl ZmqIo { | |||||
} | } | ||||
} | } | ||||
#[cfg(feature = "zmq")] | |||||
impl Write for ZmqIo { | impl Write for ZmqIo { | ||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { | ||||
self.buf.write(buf) | self.buf.write(buf) | ||||