@@ -1,56 +1,33 @@ | |||
[package] | |||
name = "logging" | |||
version = "0.5.2" | |||
name = "influx-writer" | |||
version = "0.6.0" | |||
authors = ["Jonathan Strong <jonathan.strong@gmail.com>"] | |||
edition = "2018" | |||
[[example]] | |||
name = "hist-interval" | |||
path = "examples/hist-interval.rs" | |||
[lib] | |||
name = "influx_writer" | |||
path = "src/lib.rs" | |||
[dependencies] | |||
chrono = { version = "0.4", features = ["serde"] } | |||
hyper = "0.10" | |||
termion = "1.4.0" | |||
slog = "2.0.6" | |||
slog = "2" | |||
slog-term = "2" | |||
uuid = { version = "0.8", features = ["serde", "v4", "slog"] } | |||
hdrhistogram = "6" | |||
slog-async = "2" | |||
smallvec = "0.6" | |||
num = "0.1" | |||
dirs = "1" | |||
crossbeam-channel = "0.3" | |||
pretty_toa = "1.0.0" | |||
sloggers = "0.3" | |||
#sloggers = { path = "../sloggers" } | |||
#decimal = { path = "../decimal", version = "2" } | |||
#decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } | |||
decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } | |||
decimal-macros = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } | |||
#windows = { path = "../windows", version = "0.2" } | |||
money = { path = "../money", version = "0.3" } | |||
pubsub = { path = "../pubsub", optional = true } | |||
[features] | |||
default = ["inlines"] | |||
no-thrash = [] | |||
default = ["string-tags"] | |||
trace = ["slog/release_max_level_trace", "slog/max_level_trace"] | |||
debug = ["slog/release_max_level_debug", "slog/max_level_debug"] | |||
test = [] | |||
localhost = [] | |||
harrison = [] | |||
washington = [] | |||
scholes = [] | |||
no-influx-buffer = [] | |||
disable-short-uuid = [] | |||
warnings = [] | |||
inlines = [] | |||
latency = ["pubsub"] | |||
string-tags = [] | |||
unstable = [] | |||
[profile.bench] | |||
lto = true |
@@ -0,0 +1,30 @@ | |||
# influx-writer | |||
An opinionated influxdb client in rust: | |||
- http write requests are managed by a worker thread, so "writing" to influx is wait free on the hot path | |||
- worker thread recovers from failed writes/requests with robust retry and failure handling | |||
- library includes `measure!`, a mini-dsl for creating and sending measurements to influxdb | |||
## `measure!` | |||
`measure!` is a macro that allows highly ergonomic creation and sending of `OwnedMeasurement` instances, at the cost of a bit of a learning curve. | |||
A single letter is used to denote the type of a field, and `t` is used to denote a tag: | |||
```rust | |||
measure!(influx, "measurement_name", t(color, "red"), i(n, 1), tm(now())); | |||
``` | |||
In the above example, `influx` is the identifier for a `InfluxWriter` instance. `InfluxWriter` is a handle to the worker thread that is sending data to the influxdb server. | |||
Now for a more advanced example: if only a single argument is passed to `t`, `f`, `i`, `b`, and other field type short-hands, it creates a field or tag by that name, and uses value in a variable binding of the same name as the field or tag's value: | |||
```rust | |||
let color = "red"; | |||
let n = 1; | |||
let t = now(); // in this example, now() would return integer timestamp | |||
measure!(influx, "even_faster", t(color), i(n), tm(t)); | |||
``` | |||
In the example above, `measure!` creates a `OwnedMeasurement` instance, fills it with a tag named "color" with the value "red", an integer field "n" with the value 1, and a timestamp with the value that's in the variable `t`. Then it uses `influx` to send the measurement to the worker thread. |
@@ -1,127 +0,0 @@ | |||
#![allow(unused)] | |||
extern crate logging; | |||
use std::time::{Instant, Duration}; | |||
use std::thread; | |||
use logging::hist::{Entry, HistLog, nanos}; | |||
const N_SECS: u64 = 30; | |||
//const N_THREADS: usize = 48; | |||
macro_rules! n_threads { | |||
($n:expr) => { | |||
const N_THREADS: usize = $n; | |||
const NAME: &'static str = concat!("sleeptest_", stringify!($n)); | |||
} | |||
} | |||
n_threads!(96); | |||
fn main() { | |||
::std::process::exit(test_n()); | |||
} | |||
fn test_n() -> i32 { | |||
let start = Instant::now(); | |||
let hist = HistLog::new(NAME, "master", Duration::from_millis(100)); | |||
for _ in 0..N_THREADS { | |||
let mut a = hist.clone_with_tag("sleep_1ns"); | |||
thread::spawn(move || { | |||
let mut prev = Instant::now(); | |||
let mut loop_time = Instant::now(); | |||
loop { | |||
prev = loop_time; | |||
loop_time = Instant::now(); | |||
a.record(nanos(loop_time - prev)); | |||
a.check_send(loop_time); | |||
if loop_time - start > Duration::from_secs(N_SECS) { break } | |||
thread::sleep(Duration::new(0, 1)); | |||
//thread::yield_now(); | |||
} | |||
}); | |||
} | |||
thread::sleep(Duration::from_secs(N_SECS)); | |||
0 | |||
} | |||
fn test_3() -> i32 { | |||
let start = Instant::now(); | |||
let hist = HistLog::new("sleeptest", "master", Duration::from_millis(100)); | |||
let mut a = hist.clone_with_tag("yield"); | |||
let mut b = hist.clone_with_tag("sleep_0"); | |||
let mut c = hist.clone_with_tag("sleep_100ns"); | |||
thread::spawn(move || { | |||
let mut prev = Instant::now(); | |||
let mut loop_time = Instant::now(); | |||
let mut i = 0; | |||
loop { | |||
prev = loop_time; | |||
loop_time = Instant::now(); | |||
a.record(nanos(loop_time - prev)); | |||
a.check_send(loop_time); | |||
if loop_time - start > Duration::from_secs(N_SECS) { break } | |||
i += 1; | |||
//if i % 100 == 0 { thread::sleep(Duration::new(0, 0)); } | |||
//thread::sleep(Duration::new(0, 0)); | |||
thread::yield_now(); | |||
} | |||
}); | |||
thread::spawn(move || { | |||
let mut prev = Instant::now(); | |||
let mut loop_time = Instant::now(); | |||
let mut i = 0; | |||
loop { | |||
prev = loop_time; | |||
loop_time = Instant::now(); | |||
b.record(nanos(loop_time - prev)); | |||
b.check_send(loop_time); | |||
if loop_time - start > Duration::from_secs(N_SECS) { break } | |||
i += 1; | |||
//if i % 1_000 == 0 { thread::sleep(Duration::new(0, 0)); } | |||
//if i % 100 == 0 { thread::sleep(Duration::new(0, 0)); } | |||
thread::sleep(Duration::new(0, 0)); | |||
} | |||
}); | |||
let mut prev = Instant::now(); | |||
let mut loop_time = Instant::now(); | |||
let mut i = 0; | |||
loop { | |||
prev = loop_time; | |||
loop_time = Instant::now(); | |||
c.record(nanos(loop_time - prev)); | |||
c.check_send(loop_time); | |||
if loop_time - start > Duration::from_secs(N_SECS) { break } | |||
i += 1; | |||
//if i % 100_000 == 0 { thread::sleep(Duration::from_millis(10)); } | |||
//if i % 100 == 0 { thread::sleep(Duration::new(0, 0)); } | |||
thread::sleep(Duration::new(0, 100)); | |||
} | |||
0 | |||
} | |||
@@ -1,62 +0,0 @@ | |||
#![allow(unused_imports)] | |||
#[macro_use] | |||
extern crate slog; | |||
#[macro_use] | |||
extern crate logging; | |||
use std::io::{self, prelude::*}; | |||
use std::thread; | |||
use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; | |||
use std::time::*; | |||
use chrono::Utc; | |||
use slog::Drain; | |||
use pretty_toa::ThousandsSep; | |||
use logging::influx::InfluxWriter; | |||
const INTERVAL: Duration = Duration::from_micros(1); //from_millis(1); | |||
const HB_EVERY: usize = 1_000_000; | |||
fn main() { | |||
let to_file = logging::truncating_file_logger("var/log/precipice.log", sloggers::types::Severity::Debug); | |||
let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); | |||
let drain = slog_term::CompactFormat::new(decorator).use_utc_timestamp().build().fuse(); | |||
let drain = slog_async::Async::new(drain).chan_size(8192).thread_name("recv".into()).build().fuse(); | |||
let drain = slog::Duplicate::new(drain, to_file).fuse(); | |||
let root = slog::Logger::root(drain, o!()); | |||
let logger = root.new(o!("thread" => "main")); | |||
info!(logger, "initializing..."); | |||
let influx = InfluxWriter::with_logger("localhost", "precipice", 1024, root.new(o!("thread" => "InfluxWriter"))); | |||
let stop = Arc::new(AtomicBool::new(false)); | |||
let thread = { | |||
let stop = Arc::clone(&stop); | |||
let logger = root.new(o!("thread" => "blaster")); | |||
let influx = influx.clone(); | |||
thread::spawn(move || { | |||
let mut i = 0; | |||
let mut sum = 0; | |||
while !stop.load(Ordering::Relaxed) { | |||
measure!(influx, xs, i(i), tm(logging::inanos(Utc::now()))); | |||
sum += i; | |||
i += 1; | |||
if i % HB_EVERY == 0 { | |||
info!(logger, "sent {} measurements", i.thousands_sep()); | |||
} | |||
thread::sleep(INTERVAL); | |||
} | |||
info!(logger, "exiting"; "n_sent" => i, "sum" => sum); | |||
}) | |||
}; | |||
let mut keys = String::new(); | |||
loop { | |||
if let Ok(_) = io::stdin().read_line(&mut keys) { | |||
break | |||
} | |||
thread::sleep(Duration::from_millis(1)); | |||
} | |||
stop.store(true, Ordering::Relaxed); | |||
let _ = thread.join(); | |||
} | |||
@@ -1,195 +0,0 @@ | |||
use std::sync::mpsc::{Sender, Receiver, channel}; | |||
use std::sync::Arc; | |||
use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; | |||
use std::path::PathBuf; | |||
use std::thread::{self, JoinHandle}; | |||
use std::io; | |||
use std::{mem, fs}; | |||
use dirs::home_dir; | |||
use hdrhistogram::{Histogram}; | |||
use hdrhistogram::serialization::V2DeflateSerializer; | |||
use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder, Tag}; | |||
pub type C = u64; | |||
pub fn nanos(d: Duration) -> u64 { | |||
d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64) | |||
} | |||
pub struct HistLog { | |||
series: &'static str, | |||
tag: &'static str, | |||
freq: Duration, | |||
last_sent: Instant, | |||
tx: Sender<Option<Entry>>, | |||
hist: Histogram<C>, | |||
thread: Option<Arc<thread::JoinHandle<()>>>, | |||
} | |||
pub struct Entry { | |||
pub tag: &'static str, | |||
pub start: SystemTime, | |||
pub end: SystemTime, | |||
pub hist: Histogram<C>, | |||
} | |||
impl Clone for HistLog { | |||
fn clone(&self) -> Self { | |||
let thread = self.thread.as_ref().map(|x| Arc::clone(x)); | |||
Self { | |||
series: self.series.clone(), | |||
tag: self.tag.clone(), | |||
freq: self.freq.clone(), | |||
last_sent: Instant::now(), | |||
tx: self.tx.clone(), | |||
hist: self.hist.clone(), | |||
thread, | |||
} | |||
} | |||
} | |||
impl HistLog { | |||
pub fn new(series: &'static str, tag: &'static str, freq: Duration) -> Self { | |||
let (tx, rx) = channel(); | |||
let mut dir = home_dir().expect("home_dir"); | |||
dir.push("src/market-maker/var/hist"); | |||
fs::create_dir_all(&dir).unwrap(); | |||
let thread = Some(Arc::new(Self::scribe(series, rx, dir))); | |||
let last_sent = Instant::now(); | |||
let hist = Histogram::new(3).unwrap(); | |||
Self { series, tag, freq, last_sent, tx, hist, thread } | |||
} | |||
/// Create a new `HistLog` that will save results in a specified | |||
/// directory (`path`). | |||
pub fn with_path( | |||
path: &str, | |||
series: &'static str, | |||
tag: &'static str, | |||
freq: Duration, | |||
) -> Self { | |||
let (tx, rx) = channel(); | |||
let dir = PathBuf::from(path); | |||
// let mut dir = env::home_dir().unwrap(); | |||
// dir.push("src/market-maker/var/hist"); | |||
fs::create_dir_all(&dir).ok(); | |||
let thread = Some(Arc::new(Self::scribe(series, rx, dir))); | |||
let last_sent = Instant::now(); | |||
let hist = Histogram::new(3).unwrap(); | |||
Self { series, tag, freq, last_sent, tx, hist, thread } | |||
} | |||
pub fn new_with_tag(&self, tag: &'static str) -> Self { | |||
Self::new(self.series, tag, self.freq) | |||
} | |||
pub fn clone_with_tag(&self, tag: &'static str) -> Self { | |||
let thread = self.thread.as_ref().map(|x| Arc::clone(x)).unwrap(); | |||
assert!(self.thread.is_some(), "self.thread is {:?}", self.thread); | |||
let tx = self.tx.clone(); | |||
Self { | |||
series: self.series, | |||
tag, | |||
freq: self.freq, | |||
last_sent: Instant::now(), | |||
tx, | |||
hist: self.hist.clone(), | |||
thread: Some(thread), | |||
} | |||
} | |||
pub fn clone_with_tag_and_freq(&self, tag: &'static str, freq: Duration) -> HistLog { | |||
let mut clone = self.clone_with_tag(tag); | |||
clone.freq = freq; | |||
clone | |||
} | |||
pub fn record(&mut self, value: u64) { | |||
let _ = self.hist.record(value); | |||
} | |||
/// If for some reason there was a pause in between using the struct, | |||
/// this resets the internal state of both the values recorded to the | |||
/// `Histogram` and the value of when it last sent a `Histogram` onto | |||
/// the writing thread. | |||
/// | |||
pub fn reset(&mut self) { | |||
self.hist.clear(); | |||
self.last_sent = Instant::now(); | |||
} | |||
fn send(&mut self, loop_time: Instant) { | |||
let end = SystemTime::now(); | |||
let start = end - (loop_time - self.last_sent); | |||
assert!(end > start, "end <= start!"); | |||
let mut next = Histogram::new_from(&self.hist); | |||
mem::swap(&mut self.hist, &mut next); | |||
self.tx.send(Some(Entry { tag: self.tag, start, end, hist: next })).expect("sending entry failed"); | |||
self.last_sent = loop_time; | |||
} | |||
pub fn check_send(&mut self, loop_time: Instant) { | |||
//let since = loop_time - self.last_sent; | |||
if loop_time > self.last_sent && loop_time - self.last_sent >= self.freq { | |||
// send sets self.last_sent to loop_time fyi | |||
self.send(loop_time); | |||
} | |||
} | |||
fn scribe( | |||
series : &'static str, | |||
rx : Receiver<Option<Entry>>, | |||
dir : PathBuf, | |||
) -> JoinHandle<()> { | |||
let mut ser = V2DeflateSerializer::new(); | |||
let start_time = SystemTime::now(); | |||
let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs(); | |||
let path = dir.join(&format!("{}-interval-log-{}.v2z", series, seconds)); | |||
let file = fs::File::create(&path).unwrap(); | |||
thread::Builder::new().name(format!("mm:hist:{}", series)).spawn(move || { | |||
let mut buf = io::LineWriter::new(file); | |||
let mut wtr = | |||
IntervalLogWriterBuilder::new() | |||
.with_base_time(UNIX_EPOCH) | |||
.with_start_time(start_time) | |||
.begin_log_with(&mut buf, &mut ser) | |||
.unwrap(); | |||
loop { | |||
match rx.recv() { //.recv_timeout(Duration::from_millis(1)) { | |||
//match rx.recv_timeout(Duration::new(1, 0)) { | |||
Ok(Some(Entry { tag, start, end, hist })) => { | |||
wtr.write_histogram(&hist, start.duration_since(UNIX_EPOCH).unwrap(), | |||
end.duration_since(start).unwrap(), Tag::new(tag)) | |||
.ok(); | |||
//.map_err(|e| { println!("{:?}", e); e }).ok(); | |||
} | |||
// `None` used as terminate signal from `Drop` | |||
Ok(None) => break, | |||
_ => { | |||
thread::sleep(Duration::new(0, 0)); | |||
} | |||
} | |||
} | |||
}).unwrap() | |||
} | |||
} | |||
impl Drop for HistLog { | |||
fn drop(&mut self) { | |||
if !self.hist.is_empty() { self.send(Instant::now()) } | |||
if let Some(arc) = self.thread.take() { | |||
//println!("in Drop, strong count is {}", Arc::strong_count(&arc)); | |||
if let Ok(thread) = Arc::try_unwrap(arc) { | |||
let _ = self.tx.send(None); | |||
let _ = thread.join(); | |||
} | |||
} | |||
} | |||
} |
@@ -1,774 +0,0 @@ | |||
#![allow(unused)] | |||
use std::thread::{self, JoinHandle}; | |||
use std::sync::mpsc::{Sender, channel}; | |||
use std::fmt; | |||
use std::time::{Instant, Duration}; | |||
use chrono::{self, DateTime, Utc}; | |||
use pub_sub::PubSub; | |||
use sloggers::types::Severity; | |||
//use windows::{DurationWindow, Incremental, Window}; | |||
use money::{Ticker, Side, Exchange}; | |||
use super::file_logger; | |||
use influx::{self, OwnedMeasurement, OwnedValue}; | |||
use self::windows::Incremental; | |||
pub type Nanos = u64; | |||
pub const SECOND: u64 = 1e9 as u64; | |||
pub const MINUTE: u64 = SECOND * 60; | |||
pub const HOUR: u64 = MINUTE * 60; | |||
pub const MILLISECOND: u64 = SECOND / 1000; | |||
pub const MICROSECOND: u64 = MILLISECOND / 1000; | |||
pub fn nanos(d: Duration) -> Nanos { | |||
d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) | |||
} | |||
pub fn dt_nanos(t: DateTime<Utc>) -> i64 { | |||
(t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64) | |||
} | |||
pub fn now() -> i64 { dt_nanos(Utc::now()) } | |||
pub fn tfmt(ns: Nanos) -> String { | |||
match ns { | |||
t if t <= MICROSECOND => { | |||
format!("{}ns", t) | |||
} | |||
t if t > MICROSECOND && t < MILLISECOND => { | |||
format!("{}u", t / MICROSECOND) | |||
} | |||
t if t > MILLISECOND && t < SECOND => { | |||
format!("{}ms", t / MILLISECOND) | |||
} | |||
t => { | |||
format!("{}.{}sec", t / SECOND, t / MILLISECOND) | |||
} | |||
} | |||
} | |||
pub fn tfmt_dur(d: Duration) -> String { | |||
tfmt(nanos(d)) | |||
} | |||
pub fn tfmt_dt(dt: DateTime<Utc>) -> String { | |||
Utc::now().signed_duration_since(dt) | |||
.to_std() | |||
.map(|dur| { | |||
tfmt_dur(dur) | |||
}).unwrap_or("?".into()) | |||
} | |||
pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) -> fmt::Result { | |||
match ns { | |||
t if t <= MICROSECOND => { | |||
write!(f, "{}ns", t) | |||
} | |||
t if t > MICROSECOND && t < MILLISECOND => { | |||
write!(f, "{}u", t / MICROSECOND) | |||
} | |||
t if t > MILLISECOND && t < SECOND => { | |||
write!(f, "{}ms", t / MILLISECOND) | |||
} | |||
t => { | |||
write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND) | |||
} | |||
} | |||
} | |||
#[doc(hide)] | |||
mod windows { | |||
use super::*; | |||
use std::ops::{Div, Mul, Sub, SubAssign, AddAssign}; | |||
use std::collections::VecDeque; | |||
use num::Float; | |||
const INITIAL_CAPACITY: usize = 1000; | |||
#[derive(Clone, Debug)] | |||
pub struct Point<T> | |||
//where T: Default | |||
{ | |||
time: Instant, | |||
value: T | |||
} | |||
#[derive(Debug, Clone)] | |||
pub struct Window<T> | |||
where T: Default | |||
{ | |||
pub size: Duration, // window size | |||
mean: T, | |||
ps: T, | |||
psa: T, | |||
var: T, | |||
sum: T, | |||
count: u32, | |||
items: VecDeque<Point<T>>, | |||
} | |||
#[derive(Default)] | |||
pub struct DurationWindow { | |||
pub size: Duration, | |||
mean: Duration, | |||
sum: Duration, | |||
count: u32, | |||
items: VecDeque<Point<Duration>> | |||
} | |||
impl<T> Point<T> | |||
where T: Default + Copy | |||
{ | |||
fn new(time: Instant, value: T) -> Self { | |||
Point { time, value } | |||
} | |||
fn value(&self) -> T { | |||
self.value | |||
} | |||
} | |||
impl<T> Window<T> | |||
where T: Default + Zero | |||
{ | |||
pub fn new(size: Duration) -> Self { | |||
Window { | |||
size, | |||
mean: T::default(), | |||
psa: T::default(), | |||
ps: T::default(), | |||
sum: T::default(), | |||
count: 0, | |||
var: T::default(), | |||
items: VecDeque::with_capacity(INITIAL_CAPACITY), | |||
} | |||
} | |||
pub fn with_size_and_capacity(size: Duration, capacity: usize) -> Self { | |||
Window { | |||
size, | |||
mean: T::default(), | |||
psa: T::default(), | |||
ps: T::default(), | |||
sum: T::default(), | |||
count: 0, | |||
var: T::default(), | |||
items: VecDeque::with_capacity(capacity), | |||
} | |||
} | |||
} | |||
impl<T> From<Duration> for Window<T> | |||
where T: Default + Zero | |||
{ | |||
fn from(size: Duration) -> Self { | |||
Window::new(size) | |||
} | |||
} | |||
impl From<Duration> for DurationWindow { | |||
fn from(size: Duration) -> Self { | |||
DurationWindow::new(size) | |||
} | |||
} | |||
pub trait Incremental<T> { | |||
/// Purge expired items. | |||
/// | |||
#[inline] | |||
fn refresh(&mut self, t: Instant) -> &Self; | |||
/// Add a new item. | |||
/// | |||
#[inline] | |||
fn add(&mut self, time: Instant, value: T); | |||
/// Add a new item and purge expired items. | |||
/// | |||
#[inline] | |||
fn update(&mut self, time: Instant, value: T) { | |||
self.refresh(time); | |||
self.add(time, value); | |||
} | |||
} | |||
pub trait Zero { | |||
fn zero() -> Self; | |||
} | |||
pub trait One { | |||
fn one() -> Self; | |||
} | |||
macro_rules! zero { | |||
($t:ty, $body:expr) => { | |||
impl Zero for $t { | |||
fn zero() -> $t { $body } | |||
} | |||
} | |||
} | |||
macro_rules! one { | |||
($t:ty, $body:expr) => { | |||
impl One for $t { | |||
fn one() -> $t { $body } | |||
} | |||
} | |||
} | |||
zero!(f64, 0.0); | |||
zero!(f32, 0.0); | |||
zero!(u128, 0); | |||
zero!(i128, 0); | |||
zero!(u64, 0); | |||
zero!(i64, 0); | |||
zero!(i32, 0); | |||
zero!(u32, 0); | |||
zero!(u16, 0); | |||
one!(f64, 1.0); | |||
one!(f32, 1.0); | |||
one!(u128, 1); | |||
one!(i128, 1); | |||
one!(u64, 1); | |||
one!(i64, 1); | |||
one!(i32, 1); | |||
one!(u32, 1); | |||
one!(u16, 1); | |||
impl<T> Incremental<T> for Window<T> | |||
where T: Default + AddAssign<T> + SubAssign<T> + From<u32> + Div<Output = T> + | |||
Mul<Output = T> + Sub<Output = T> + Copy | |||
{ | |||
#[inline] | |||
fn refresh(&mut self, t: Instant) -> &Self { | |||
if !self.items.is_empty() { | |||
let (n_remove, sum, ps, count) = | |||
self.items.iter() | |||
.take_while(|x| t - x.time > self.size) | |||
.fold((0, self.sum, self.ps, self.count), |(n_remove, sum, ps, count), x| { | |||
(n_remove + 1, sum - x.value, ps - x.value * x.value, count - 1) | |||
}); | |||
self.sum = sum; | |||
self.ps = ps; | |||
self.count = count; | |||
for _ in 0..n_remove { | |||
self.items.pop_front(); | |||
} | |||
} | |||
if self.count > 0 { | |||
self.mean = self.sum / self.count.into(); | |||
self.psa = self.ps / self.count.into(); | |||
let c: T = self.count.into(); | |||
self.var = (self.psa * c - c * self.mean * self.mean) / c; | |||
} | |||
self | |||
} | |||
/// Creates `Point { time, value }` and pushes to `self.items`. | |||
/// | |||
#[inline] | |||
fn add(&mut self, time: Instant, value: T) { | |||
let p = Point::new(time, value); | |||
self.sum += p.value; | |||
self.ps += p.value * p.value; | |||
self.count += 1; | |||
self.items.push_back(p); | |||
} | |||
#[inline] | |||
fn update(&mut self, time: Instant, value: T) { | |||
self.add(time, value); | |||
self.refresh(time); | |||
} | |||
} | |||
impl Incremental<Duration> for DurationWindow { | |||
#[inline] | |||
fn refresh(&mut self, t: Instant) -> &Self { | |||
if !self.items.is_empty() { | |||
let (n_remove, sum, count) = | |||
self.items.iter() | |||
.take_while(|x| t - x.time > self.size) | |||
.fold((0, self.sum, self.count), |(n_remove, sum, count), x| { | |||
(n_remove + 1, sum - x.value, count - 1) | |||
}); | |||
self.sum = sum; | |||
self.count = count; | |||
for _ in 0..n_remove { | |||
self.items.pop_front(); | |||
} | |||
} | |||
if self.count > 0 { | |||
self.mean = self.sum / self.count.into(); | |||
} | |||
self | |||
} | |||
#[inline] | |||
fn add(&mut self, time: Instant, value: Duration) { | |||
let p = Point::new(time, value); | |||
self.sum += p.value; | |||
self.count += 1; | |||
self.items.push_back(p); | |||
} | |||
} | |||
impl<T> Window<T> | |||
where T: Default + Copy | |||
{ | |||
pub fn mean(&self) -> T { self.mean } | |||
pub fn var(&self) -> T { self.var } | |||
pub fn psa(&self) -> T { self.psa } | |||
pub fn ps(&self) -> T { self.ps } | |||
pub fn count(&self) -> u32 { self.count } | |||
pub fn len(&self) -> usize { self.items.len() } | |||
pub fn is_empty(&self) -> bool { self.items.is_empty() } | |||
/// Returns the `Duration` between `t` and the first `Point` in `self.items`. | |||
/// | |||
/// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. | |||
/// | |||
/// # Panics | |||
/// | |||
/// This function will panic if `t` is earlier than the first `Point`'s `Instant`. | |||
/// | |||
#[inline] | |||
pub fn elapsed(&self, t: Instant) -> Duration { | |||
self.items.front() | |||
.map(|p| { | |||
t - p.time | |||
}).unwrap_or_else(|| Duration::new(0, 0)) | |||
} | |||
} | |||
impl<T> Window<T> | |||
where T: Float + Default | |||
{ | |||
#[inline] | |||
pub fn std(&self) -> T { self.var.sqrt() } | |||
} | |||
impl DurationWindow { | |||
pub fn new(size: Duration) -> Self { DurationWindow { size, ..Default::default() } } | |||
pub fn mean(&self) -> Duration { self.mean } | |||
pub fn count(&self) -> u32 { self.count } | |||
pub fn len(&self) -> usize { self.items.len() } | |||
pub fn is_empty(&self) -> bool { self.items.is_empty() } | |||
#[inline] | |||
pub fn nanos(d: Duration) -> u64 { d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) } | |||
#[inline] | |||
pub fn micros(d: Duration) -> Option<u32> { | |||
if d <= Duration::new(4_294, 967_295_000) { | |||
Some((d.as_secs() * 1_000_000) as u32 + d.subsec_nanos() / 1_000u32) | |||
} else { | |||
None | |||
} | |||
} | |||
#[inline] | |||
pub fn mean_nanos(&self) -> u64 { DurationWindow::nanos(self.mean()) } | |||
#[inline] | |||
pub fn max(&self) -> Option<Duration> { | |||
self.items.iter() | |||
.map(|p| p.value) | |||
.max() | |||
} | |||
#[inline] | |||
pub fn max_nanos(&self) -> Option<u64> { | |||
self.max() | |||
.map(|x| DurationWindow::nanos(x)) | |||
} | |||
#[inline] | |||
pub fn first(&self) -> Option<Duration> { | |||
self.items | |||
.front() | |||
.map(|pt| pt.value()) | |||
} | |||
/// Returns the `Duration` between `t` and the first `Point` in `self.items`. | |||
/// | |||
/// If there are no items, returns `Duration { secs: 0, nanos: 0 }`. | |||
/// | |||
/// # Panics | |||
/// | |||
/// This function will panic if `t` is earlier than the first `Point`'s `Instant`. | |||
/// | |||
#[inline] | |||
pub fn elapsed(&self, t: Instant) -> Duration { | |||
self.items.front() | |||
.map(|p| { | |||
t - p.time | |||
}).unwrap_or_else(|| Duration::new(0, 0)) | |||
} | |||
} | |||
} | |||
#[derive(Debug)] | |||
pub enum Latency { | |||
Ws(Exchange, Ticker, Duration), | |||
Http(Exchange, Duration), | |||
Trade(Exchange, Ticker, Duration), | |||
Terminate | |||
} | |||
#[derive(Debug)] | |||
pub enum ExperiencedLatency { | |||
GdaxWebsocket(Duration), | |||
GdaxHttpPublic(Duration), | |||
GdaxHttpPrivate(Duration), | |||
PlnxHttpPublic(Duration), | |||
PlnxHttpPrivate(Duration), | |||
PlnxOrderBook(Duration), | |||
KrknHttpPublic(Duration), | |||
KrknHttpPrivate(Duration), | |||
KrknTrade(Duration, &'static str, Option<Ticker>, Option<Side>), | |||
PlnxWs(Ticker), | |||
Terminate | |||
} | |||
#[derive(Debug, Clone)] | |||
pub struct Update { | |||
pub gdax_ws: Nanos, | |||
pub gdax_trade: Nanos, | |||
pub gdax_last: DateTime<Utc> | |||
} | |||
impl Default for Update { | |||
fn default() -> Self { | |||
Update { | |||
gdax_ws: 0, | |||
gdax_trade: 0, | |||
gdax_last: Utc::now(), | |||
} | |||
} | |||
} | |||
#[derive(Debug, Clone)] | |||
pub struct LatencyUpdate { | |||
pub gdax_ws: Nanos, | |||
pub krkn_pub: Nanos, | |||
pub krkn_priv: Nanos, | |||
pub plnx_pub: Nanos, | |||
pub plnx_priv: Nanos, | |||
pub plnx_order: Nanos, | |||
pub krkn_trade_30_mean: Nanos, | |||
pub krkn_trade_30_max: Nanos, | |||
pub krkn_trade_300_mean: Nanos, | |||
pub krkn_trade_300_max: Nanos, | |||
pub plnx_last: DateTime<Utc>, | |||
pub krkn_last: DateTime<Utc>, | |||
pub plnx_ws_count: u64, | |||
} | |||
impl Default for LatencyUpdate { | |||
fn default() -> Self { | |||
LatencyUpdate { | |||
gdax_ws : 0, | |||
krkn_pub : 0, | |||
krkn_priv : 0, | |||
plnx_pub : 0, | |||
plnx_priv : 0, | |||
plnx_order : 0, | |||
krkn_trade_30_mean : 0, | |||
krkn_trade_30_max : 0, | |||
krkn_trade_300_mean : 0, | |||
krkn_trade_300_max : 0, | |||
plnx_ws_count : 0, | |||
plnx_last : Utc::now(), | |||
krkn_last : Utc::now(), | |||
} | |||
} | |||
} | |||
pub struct Manager { | |||
pub tx: Sender<Latency>, | |||
pub channel: PubSub<Update>, | |||
thread: Option<JoinHandle<()>>, | |||
} | |||
pub struct LatencyManager { | |||
pub tx: Sender<ExperiencedLatency>, | |||
pub channel: PubSub<LatencyUpdate>, | |||
thread: Option<JoinHandle<()>>, | |||
} | |||
/// returns a DateTime equal to now - `dur` | |||
/// | |||
pub fn dt_from_dur(dur: Duration) -> DateTime<Utc> { | |||
let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64); | |||
Utc::now() - old_dur | |||
} | |||
struct Last { | |||
broadcast: Instant, | |||
plnx: Instant, | |||
krkn: Instant, | |||
gdax: Instant, | |||
} | |||
impl Default for Last { | |||
fn default() -> Self { | |||
Last { | |||
broadcast: Instant::now(), | |||
plnx: Instant::now(), | |||
krkn: Instant::now(), | |||
gdax: Instant::now(), | |||
} | |||
} | |||
} | |||
impl Manager { | |||
pub fn new(window: Duration, | |||
log_path: &'static str, | |||
measurements: Sender<OwnedMeasurement>) -> Self { | |||
let (tx, rx) = channel(); | |||
let channel = PubSub::new(); | |||
let channel_copy = channel.clone(); | |||
let logger = file_logger(log_path, Severity::Info); | |||
info!(logger, "initializing"); | |||
let mut gdax_ws = windows::DurationWindow::new(window); | |||
let mut gdax_trade = windows::DurationWindow::new(window); | |||
let mut last = Last::default(); | |||
info!(logger, "entering loop"); | |||
let thread = Some(thread::spawn(move || { | |||
loop { | |||
let loop_time = Instant::now(); | |||
if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1)) { | |||
debug!(logger, "rcvd {:?}", msg); | |||
match msg { | |||
Latency::Ws(_, _, dur) => { | |||
gdax_ws.update(loop_time, dur); | |||
last.gdax = loop_time; | |||
} | |||
Latency::Trade(_, ticker, dur) => { | |||
gdax_trade.update(loop_time, dur); | |||
last.gdax = loop_time; | |||
let nanos = windows::DurationWindow::nanos(dur); | |||
measurements.send( | |||
OwnedMeasurement::new("gdax_trade_api") | |||
.add_tag("ticker", ticker.as_str()) | |||
.add_field("nanos", OwnedValue::Integer(nanos as i64)) | |||
.set_timestamp(influx::now())).unwrap(); | |||
} | |||
Latency::Terminate => break, | |||
_ => {} | |||
} | |||
} | |||
if loop_time - last.broadcast > Duration::from_millis(100) { | |||
debug!(logger, "initalizing broadcast"); | |||
let update = Update { | |||
gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(), | |||
gdax_trade: gdax_trade.refresh(loop_time).mean_nanos(), | |||
gdax_last: dt_from_dur(loop_time - last.gdax) | |||
}; | |||
channel.send(update).unwrap(); | |||
last.broadcast = loop_time; | |||
debug!(logger, "sent broadcast"); | |||
} | |||
} | |||
debug!(logger, "latency manager terminating"); | |||
})); | |||
Manager { | |||
tx, | |||
channel: channel_copy, | |||
thread, | |||
} | |||
} | |||
} | |||
impl Drop for LatencyManager { | |||
fn drop(&mut self) { | |||
for _ in 0..100 { self.tx.send(ExperiencedLatency::Terminate).unwrap(); } | |||
if let Some(thread) = self.thread.take() { | |||
let _ = thread.join(); | |||
} | |||
} | |||
} | |||
impl Drop for Manager { | |||
fn drop(&mut self) { | |||
for _ in 0..100 { self.tx.send(Latency::Terminate).unwrap(); } | |||
if let Some(thread) = self.thread.take() { | |||
let _ = thread.join(); | |||
} | |||
} | |||
} | |||
impl LatencyManager { | |||
pub fn new(d: Duration) -> Self { | |||
let (tx, rx) = channel(); | |||
let tx_copy = tx.clone(); | |||
let channel = PubSub::new(); | |||
let channel_copy = channel.clone(); | |||
//let w = w.clone(); | |||
let thread = Some(thread::spawn(move || { | |||
let logger = file_logger("var/log/latency-manager.log", Severity::Info); | |||
info!(logger, "initializing DurationWindows"); | |||
let mut gdax_ws = windows::DurationWindow::new(d); | |||
let mut gdax_priv = windows::DurationWindow::new(d); | |||
let mut krkn_pub = windows::DurationWindow::new(d); | |||
let mut krkn_priv = windows::DurationWindow::new(d); | |||
let mut plnx_pub = windows::DurationWindow::new(d); | |||
let mut plnx_priv = windows::DurationWindow::new(d); | |||
let mut plnx_order = windows::DurationWindow::new(d); | |||
let mut plnx_ws_count: windows::Window<u32> = windows::Window::new(d); | |||
// yes I am intentionally breaking from the hard-typed duration | |||
// window ... that was a stupid idea | |||
// | |||
let mut krkn_trade_30 = windows::DurationWindow::new(Duration::from_secs(30)); | |||
let mut krkn_trade_300 = windows::DurationWindow::new(Duration::from_secs(300)); | |||
let mut last = Last::default(); | |||
thread::sleep(Duration::from_millis(1)); | |||
info!(logger, "entering loop"); | |||
loop { | |||
let loop_time = Instant::now(); | |||
if let Ok(msg) = rx.recv() { | |||
debug!(logger, "new msg: {:?}", msg); | |||
match msg { | |||
ExperiencedLatency::Terminate => { | |||
crit!(logger, "terminating"); | |||
break; | |||
} | |||
ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d), | |||
ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d), | |||
ExperiencedLatency::KrknHttpPublic(d) => { | |||
last.krkn = loop_time; | |||
krkn_pub.update(loop_time, d) | |||
} | |||
ExperiencedLatency::KrknHttpPrivate(d) => { | |||
last.krkn = loop_time; | |||
krkn_priv.update(loop_time, d) | |||
} | |||
ExperiencedLatency::PlnxHttpPublic(d) => { | |||
last.plnx = loop_time; | |||
plnx_pub.update(loop_time, d) | |||
} | |||
ExperiencedLatency::PlnxHttpPrivate(d) => { | |||
last.plnx = loop_time; | |||
plnx_priv.update(loop_time, d) | |||
} | |||
ExperiencedLatency::PlnxOrderBook(d) => { | |||
last.plnx = loop_time; | |||
plnx_order.update(loop_time, d) | |||
} | |||
ExperiencedLatency::PlnxWs(_) => { | |||
last.plnx = loop_time; | |||
plnx_ws_count.update(loop_time, 1_u32); | |||
} | |||
ExperiencedLatency::KrknTrade(d, cmd, _, _) => { | |||
debug!(logger, "new KrknTrade"; | |||
"cmd" => cmd); | |||
last.krkn = loop_time; | |||
krkn_trade_30.update(loop_time, d); | |||
krkn_trade_300.update(loop_time, d); | |||
} | |||
other => { | |||
warn!(logger, "unexpected msg: {:?}", other); | |||
} | |||
} | |||
} | |||
if loop_time - last.broadcast > Duration::from_millis(100) { | |||
debug!(logger, "initalizing broadcast"); | |||
// note - because we mutated the Window instances | |||
// above, we need a fresh Instant to avoid less than other | |||
// panic | |||
// | |||
krkn_trade_30.refresh(loop_time); | |||
krkn_trade_300.refresh(loop_time); | |||
let update = LatencyUpdate { | |||
gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(), | |||
krkn_pub: krkn_pub.refresh(loop_time).mean_nanos(), | |||
krkn_priv: krkn_priv.refresh(loop_time).mean_nanos(), | |||
plnx_pub: plnx_pub.refresh(loop_time).mean_nanos(), | |||
plnx_priv: plnx_priv.refresh(loop_time).mean_nanos(), | |||
plnx_order: plnx_order.refresh(loop_time).mean_nanos(), | |||
krkn_trade_30_mean: krkn_trade_30.mean_nanos(), | |||
krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0), | |||
krkn_trade_300_mean: krkn_trade_300.mean_nanos(), | |||
krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0), | |||
plnx_last: dt_from_dur(loop_time - last.plnx), | |||
krkn_last: dt_from_dur(loop_time - last.krkn), | |||
plnx_ws_count: plnx_ws_count.refresh(loop_time).count() as u64, | |||
}; | |||
channel.send(update).unwrap(); | |||
last.broadcast = loop_time; | |||
debug!(logger, "sent broadcast"); | |||
} | |||
} | |||
crit!(logger, "goodbye"); | |||
})); | |||
LatencyManager { | |||
tx: tx_copy, | |||
channel: channel_copy, | |||
thread | |||
} | |||
} | |||
} |
@@ -1,584 +0,0 @@ | |||
//! An object to handle everyone's errors | |||
//! | |||
use std::thread::{self, JoinHandle}; | |||
use std::sync::{Arc, Mutex, RwLock}; | |||
use std::sync::mpsc::{Sender, channel}; | |||
use std::collections::{BTreeMap, VecDeque}; | |||
use std::fmt::{self, Display, Error as FmtError, Formatter}; | |||
use std::io::{self, Write}; | |||
use std::fs; | |||
use chrono::{DateTime, Utc}; | |||
use termion::color::{self, Fg, Bg}; | |||
use influent::measurement::{Measurement, Value as InfluentValue}; | |||
use slog::{self, OwnedKVList, Drain, Key, KV, Level, Logger}; | |||
use sloggers::types::Severity; | |||
use super::{nanos, file_logger}; | |||
use influx; | |||
const N_WARNINGS: usize = 500; | |||
#[macro_export] | |||
macro_rules! confirmed { | |||
($warnings:ident, $($args:tt)*) => ( | |||
{ | |||
let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap(); | |||
} | |||
) | |||
} | |||
/// logs a `Warning::Awesome` message to the `WarningsManager` | |||
#[macro_export] | |||
macro_rules! awesome { | |||
($warnings:ident, $($args:tt)*) => ( | |||
{ | |||
let _ = $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap(); | |||
} | |||
) | |||
} | |||
#[macro_export] | |||
macro_rules! critical { | |||
($warnings:ident, $($args:tt)*) => ( | |||
{ | |||
let _ = $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap(); | |||
} | |||
) | |||
} | |||
#[macro_export] | |||
macro_rules! notice { | |||
($warnings:ident, $($args:tt)*) => ( | |||
{ | |||
let _ = $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap(); | |||
} | |||
) | |||
} | |||
#[macro_export] | |||
macro_rules! error_w { | |||
($warnings:ident, $($args:tt)*) => ( | |||
{ | |||
$warnings.send(Warning::Error( ( format!($($args)*) ) ) ).unwrap(); | |||
} | |||
) | |||
} | |||
/// represents a non-fatal error somewhere in | |||
/// the system to report either to the program interface | |||
/// or in logs. | |||
/// | |||
#[derive(Debug, Clone, PartialEq)] | |||
pub enum Warning { | |||
Notice(String), | |||
Error(String), | |||
DegradedService(String), | |||
Critical(String), | |||
Confirmed(String), | |||
Awesome(String), | |||
Log { | |||
level: Level, | |||
module: &'static str, | |||
function: &'static str, | |||
line: u32, | |||
msg: String, | |||
kv: MeasurementRecord, | |||
}, | |||
Terminate | |||
} | |||
impl Warning { | |||
pub fn msg(&self) -> String { | |||
match *self { | |||
Warning::Notice(ref s) | Warning::Error(ref s) | | |||
Warning::DegradedService(ref s) | Warning::Critical(ref s) | | |||
Warning::Awesome(ref s) | Warning::Confirmed(ref s) | | |||
Warning::Log { msg: ref s, .. } => | |||
s.clone(), | |||
Warning::Terminate => "".to_owned() | |||
} | |||
} | |||
pub fn msg_str(&self) -> &str { | |||
match *self { | |||
Warning::Notice(ref s) | Warning::Error(ref s) | | |||
Warning::DegradedService(ref s) | Warning::Critical(ref s) | | |||
Warning::Awesome(ref s) | Warning::Confirmed(ref s) | | |||
Warning::Log { msg: ref s, .. } => | |||
s.as_ref(), | |||
Warning::Terminate => "Terminate" | |||
} | |||
} | |||
pub fn category_str(&self) -> &str { | |||
match self { | |||
&Warning::Notice(_) => "NOTC", | |||
&Warning::Error(_) => "ERRO", | |||
&Warning::Critical(_) => "CRIT", | |||
&Warning::DegradedService(_) => "DGRD", | |||
&Warning::Confirmed(_) => "CNFD", | |||
&Warning::Awesome(_) => "AWSM", | |||
&Warning::Log { ref level, .. } => level.as_short_str(), | |||
&Warning::Terminate => "TERM", | |||
} | |||
} | |||
pub fn category(&self, f: &mut Formatter) -> fmt::Result { | |||
match *self { | |||
Warning::Notice(_) => { | |||
write!(f, "[ Notice ]") | |||
} | |||
Warning::Error(_) => { | |||
write!(f, "{yellow}[{title}]{reset}", | |||
yellow = Fg(color::LightYellow), | |||
title = " Error--", | |||
reset = Fg(color::Reset)) | |||
} | |||
Warning::Critical(_) => { | |||
write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", | |||
bg = Bg(color::Red), | |||
fg = Fg(color::White), | |||
title = " CRITICAL ", | |||
resetbg = Bg(color::Reset), | |||
resetfg = Fg(color::Reset)) | |||
} | |||
Warning::Awesome(_) => { | |||
write!(f, "{color}[{title}]{reset}", | |||
color = Fg(color::Green), | |||
title = "Awesome!", | |||
reset = Fg(color::Reset)) | |||
} | |||
Warning::DegradedService(_) => { | |||
write!(f, "{color}[{title}] {reset}", | |||
color = Fg(color::Blue), | |||
title = "Degraded Service ", | |||
reset = Fg(color::Reset)) | |||
} | |||
Warning::Confirmed(_) => { | |||
write!(f, "{bg}{fg}{title}{resetbg}{resetfg}", | |||
bg = Bg(color::Blue), | |||
fg = Fg(color::White), | |||
title = "Confirmed ", | |||
resetbg = Bg(color::Reset), | |||
resetfg = Fg(color::Reset)) | |||
} | |||
_ => Ok(()) | |||
} | |||
} | |||
} | |||
impl Display for Warning { | |||
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { | |||
self.category(f)?; | |||
write!(f, " {}", self.msg()) | |||
} | |||
} | |||
#[derive(Debug, Clone)] | |||
pub struct Record { | |||
pub time: DateTime<Utc>, | |||
pub msg: Warning | |||
} | |||
impl Record { | |||
pub fn new(msg: Warning) -> Self { | |||
let time = Utc::now(); | |||
Record { time, msg } | |||
} | |||
pub fn to_measurement(&self, name: &'static str) -> Measurement { | |||
let cat = self.msg.category_str(); | |||
let body = self.msg.msg_str(); | |||
let mut m = Measurement::new(name); | |||
m.add_tag("category", cat); | |||
m.add_field("msg", InfluentValue::String(body)); | |||
m.set_timestamp(nanos(self.time) as i64); | |||
m | |||
} | |||
} | |||
impl Display for Record { | |||
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { | |||
write!(f, "{} | {}", self.time.format("%H:%M:%S"), self.msg) | |||
} | |||
} | |||
pub type SlogResult = Result<(), slog::Error>; | |||
#[derive(Debug, Clone, PartialEq)] | |||
pub enum Value { | |||
String(String), | |||
Float(f64), | |||
Integer(i64), | |||
Boolean(bool) | |||
} | |||
impl Value { | |||
pub fn to_influent<'a>(&'a self) -> InfluentValue<'a> { | |||
match self { | |||
&Value::String(ref s) => InfluentValue::String(s), | |||
&Value::Float(n) => InfluentValue::Float(n), | |||
&Value::Integer(i) => InfluentValue::Integer(i), | |||
&Value::Boolean(b) => InfluentValue::Boolean(b), | |||
} | |||
} | |||
} | |||
#[derive(Debug, Clone, PartialEq)] | |||
pub struct MeasurementRecord { | |||
fields: Vec<(Key, Value)>, | |||
tags: Vec<(Key, String)>, | |||
} | |||
impl MeasurementRecord { | |||
pub fn new() -> Self { | |||
MeasurementRecord { | |||
fields: Vec::new(), | |||
tags: Vec::new(), | |||
} | |||
} | |||
pub fn add_field(&mut self, key: Key, val: Value) -> SlogResult { | |||
self.fields.push((key, val)); | |||
Ok(()) | |||
} | |||
pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult { | |||
match key { | |||
"exchange" | "thread" | "ticker" | "category" => { | |||
self.tags.push((key, val)); | |||
} | |||
other => { | |||
self.add_field(other, Value::String(val)).unwrap(); | |||
} | |||
} | |||
Ok(()) | |||
} | |||
pub fn serialize_values(&mut self, record: &slog::Record, values: &OwnedKVList) { | |||
let mut builder = TagBuilder { mrec: self }; | |||
let _ = values.serialize(record, &mut builder); | |||
} | |||
pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> { | |||
let fields: BTreeMap<&'a str, InfluentValue<'a>> = | |||
self.fields.iter() | |||
.map(|&(k, ref v)| { | |||
(k, v.to_influent()) | |||
}).collect(); | |||
let tags: BTreeMap<&'a str, &'a str> = | |||
self.tags.iter() | |||
.map(|&(k, ref v)| { | |||
(k, v.as_ref()) | |||
}).collect(); | |||
Measurement { | |||
key: name, | |||
timestamp: Some(nanos(Utc::now()) as i64), | |||
fields, | |||
tags, | |||
} | |||
} | |||
} | |||
impl slog::Serializer for MeasurementRecord { | |||
fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)) } | |||
fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_i16(&mut self, key: Key, val: i16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_u32(&mut self, key: Key, val: u32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_i32(&mut self, key: Key, val: i32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_f32(&mut self, key: Key, val: f32) -> SlogResult { self.add_field(key, Value::Float(val as f64)) } | |||
fn emit_u64(&mut self, key: Key, val: u64) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) } | |||
fn emit_i64(&mut self, key: Key, val: i64) -> SlogResult { self.add_field(key, Value::Integer(val)) } | |||
fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) } | |||
fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_field(key, Value::String(val.to_string())) } | |||
fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) } | |||
fn emit_none(&mut self, _: Key) -> SlogResult { Ok(()) } //self.add_field(key, Value::String("none".into())) } | |||
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_field(key, Value::String(val.to_string())) } | |||
} | |||
pub struct TagBuilder<'a> { | |||
mrec: &'a mut MeasurementRecord | |||
} | |||
impl<'a> slog::Serializer for TagBuilder<'a> { | |||
fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { | |||
match key { | |||
"exchange" | "thread" | "ticker" | "category" => { | |||
self.mrec.add_tag(key, val.to_string()) | |||
} | |||
other => { | |||
self.mrec.add_field(other, Value::String(val.to_string())) | |||
} | |||
} | |||
} | |||
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { | |||
match key { | |||
"exchange" | "thread" | "ticker" | "category" => { | |||
self.mrec.add_tag(key, val.to_string()) | |||
} | |||
other => { | |||
self.mrec.add_field(other, Value::String(val.to_string())) | |||
} | |||
} | |||
} | |||
} | |||
pub struct WarningsDrain<D: Drain> { | |||
level: Level, | |||
tx: Arc<Mutex<Sender<Warning>>>, | |||
drain: D, | |||
to_file: Logger, | |||
} | |||
impl<D> WarningsDrain<D> | |||
where D: Drain | |||
{ | |||
pub fn new(tx: Sender<Warning>, level: Level, drain: D) -> Self { | |||
let tx = Arc::new(Mutex::new(tx)); | |||
let to_file = file_logger("var/log/mm.log", Severity::Warning); | |||
WarningsDrain { tx, drain, level, to_file } | |||
} | |||
} | |||
impl From<Sender<Warning>> for WarningsDrain<slog::Fuse<slog::Discard>> { | |||
fn from(tx: Sender<Warning>) -> Self { | |||
WarningsDrain::new(tx, Level::Debug, slog::Discard.fuse()) | |||
} | |||
} | |||
impl<D: Drain> Drain for WarningsDrain<D> { | |||
type Ok = (); | |||
type Err = D::Err; | |||
fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> { | |||
if record.level() <= self.level { | |||
let mut ser = MeasurementRecord::new(); | |||
ser.serialize_values(record, values); | |||
let _ = record.kv().serialize(record, &mut ser); | |||
let msg = record.msg().to_string(); | |||
if let Ok(lock) = self.tx.lock() { | |||
let _ = lock.send(Warning::Log { | |||
level: record.level(), | |||
module: record.module(), | |||
function: record.function(), | |||
line: record.line(), | |||
msg, | |||
kv: ser | |||
}); | |||
} | |||
} | |||
if record.level() <= Level::Warning { | |||
let _ = self.to_file.log(record); | |||
} | |||
let _ = self.drain.log(record, values)?; | |||
Ok(()) | |||
} | |||
} | |||
#[derive(Debug)] | |||
pub struct WarningsManager { | |||
pub tx: Sender<Warning>, | |||
pub warnings: Arc<RwLock<VecDeque<Record>>>, | |||
thread: Option<JoinHandle<()>> | |||
} | |||
impl Drop for WarningsManager { | |||
fn drop(&mut self) { | |||
let _ = self.tx.send(Warning::Terminate); | |||
if let Some(thread) = self.thread.take() { | |||
thread.join().unwrap(); | |||
} | |||
} | |||
} | |||
const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f"; | |||
/// Serializes *only* KV pair with `key == "thread"` | |||
/// | |||
struct ThreadSer<'a>(&'a mut Vec<u8>); | |||
impl<'a> slog::ser::Serializer for ThreadSer<'a> { | |||
fn emit_arguments(&mut self, _: &str, _: &fmt::Arguments) -> slog::Result { | |||
Ok(()) | |||
} | |||
fn emit_str(&mut self, key: &str, val: &str) -> slog::Result { | |||
if key == "thread" { | |||
write!(self.0, " {:<20}", val)?; | |||
} | |||
Ok(()) | |||
} | |||
} | |||
/// Serializes KV pairs as ", k: v" | |||
/// | |||
struct KvSer<'a>(&'a mut Vec<u8>); | |||
macro_rules! s( | |||
($s:expr, $k:expr, $v:expr) => { | |||
try!(write!($s.0, ", {}: {}", $k, $v)); | |||
}; | |||
); | |||
impl<'a> slog::ser::Serializer for KvSer<'a> { | |||
fn emit_none(&mut self, key: &str) -> slog::Result { | |||
s!(self, key, "None"); | |||
Ok(()) | |||
} | |||
fn emit_unit(&mut self, key: &str) -> slog::Result { | |||
s!(self, key, "()"); | |||
Ok(()) | |||
} | |||
fn emit_bool(&mut self, key: &str, val: bool) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_char(&mut self, key: &str, val: char) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_usize(&mut self, key: &str, val: usize) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_isize(&mut self, key: &str, val: isize) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_u8(&mut self, key: &str, val: u8) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_i8(&mut self, key: &str, val: i8) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_u16(&mut self, key: &str, val: u16) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_i16(&mut self, key: &str, val: i16) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_u32(&mut self, key: &str, val: u32) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_i32(&mut self, key: &str, val: i32) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_f32(&mut self, key: &str, val: f32) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_u64(&mut self, key: &str, val: u64) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_i64(&mut self, key: &str, val: i64) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_f64(&mut self, key: &str, val: f64) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_str(&mut self, key: &str, val: &str) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
fn emit_arguments( | |||
&mut self, | |||
key: &str, | |||
val: &fmt::Arguments, | |||
) -> slog::Result { | |||
s!(self, key, val); | |||
Ok(()) | |||
} | |||
} | |||
#[allow(unused_variables, unused_imports)] | |||
#[cfg(test)] | |||
mod tests { | |||
use super::*; | |||
use test::{black_box, Bencher}; | |||
#[test] | |||
#[ignore] | |||
fn it_creates_a_logger() { | |||
let wm = WarningsManager::new("rust-test"); | |||
let im = influx::writer(wm.tx.clone()); | |||
let drain = | |||
WarningsDrain { | |||
tx: Arc::new(Mutex::new(wm.tx.clone())), | |||
drain: slog::Discard, | |||
to_file: Logger::root(slog::Discard, o!()), | |||
level: Level::Trace, | |||
}; | |||
let logger = slog::Logger::root(drain, o!()); | |||
} | |||
#[bench] | |||
fn it_sends_integers_with_a_sender_behind_a_mutex(b: &mut Bencher) { | |||
let (tx, rx) = channel(); | |||
enum Msg { | |||
Val(usize), | |||
Terminate | |||
} | |||
let worker = thread::spawn(move || { | |||
let mut xs = Vec::new(); | |||
loop { | |||
match rx.recv().unwrap() { | |||
Msg::Val(x) => { xs.push(x); } | |||
Msg::Terminate => break, | |||
} | |||
} | |||
xs.len() | |||
}); | |||
let tx = Arc::new(Mutex::new(tx)); | |||
b.iter(|| { | |||
let lock = tx.lock().unwrap(); | |||
let _ = lock.send(Msg::Val(1)); | |||
}); | |||
let _ = tx.lock().unwrap().send(Msg::Terminate); | |||
let len = worker.join().unwrap(); | |||
//println!("{}", len); | |||
} | |||
} |