Browse Source

bunch of changes to allow mm/tali simultaneously

master
Jonathan Strong 7 years ago
parent
commit
b3d364caeb
4 changed files with 152 additions and 12 deletions
  1. +2
    -2
      src/influx.rs
  2. +144
    -5
      src/latency.rs
  3. +1
    -1
      src/lib.rs
  4. +5
    -4
      src/warnings.rs

+ 2
- 2
src/influx.rs View File

@@ -292,10 +292,10 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
/// incoming `Measurement`s that way *in addition* to the old socket/`String` /// incoming `Measurement`s that way *in addition* to the old socket/`String`
/// method /// method
/// ///
pub fn writer_str_or_meas(warnings: Sender<Warning>) -> (thread::JoinHandle<()>, Sender<OwnedMeasurement>) {
pub fn writer_str_or_meas(log_path: &str, warnings: Sender<Warning>) -> (thread::JoinHandle<()>, Sender<OwnedMeasurement>) {
let (tx, rx) = channel(); let (tx, rx) = channel();
let logger = file_logger(log_path);
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let logger = file_logger("var/log/influx.log");
info!(logger, "initializing zmq"); info!(logger, "initializing zmq");
let _ = fs::create_dir("/tmp/mm"); let _ = fs::create_dir("/tmp/mm");
let ctx = zmq::Context::new(); let ctx = zmq::Context::new();


+ 144
- 5
src/latency.rs View File

@@ -2,7 +2,7 @@ use std::thread::{self, JoinHandle};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{self, Sender, Receiver, channel}; use std::sync::mpsc::{self, Sender, Receiver, channel};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fmt::{self, Display, Error as FmtError, Formatter, Write};
use std::fmt::{self, Display, Write};
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};


use chrono::{self, DateTime, Utc, TimeZone}; use chrono::{self, DateTime, Utc, TimeZone};
@@ -11,10 +11,10 @@ use zmq;
use influent::measurement::{Measurement, Value}; use influent::measurement::{Measurement, Value};


use windows::{DurationWindow, Incremental}; use windows::{DurationWindow, Incremental};
use money::{Ticker, Side};
use money::{Ticker, Side, ByExchange, Exchange};


use super::file_logger; use super::file_logger;
use influx;
use influx::{self, OwnedMeasurement, OwnedValue};






@@ -71,7 +71,7 @@ pub fn tfmt_dt(dt: DateTime<Utc>) -> String {
} }




pub fn tfmt_write(ns: Nanos, f: &mut Formatter) {
pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) {
match ns { match ns {
t if t <= MICROSECOND => { t if t <= MICROSECOND => {
write!(f, "{}ns", t); write!(f, "{}ns", t);
@@ -90,6 +90,14 @@ pub fn tfmt_write(ns: Nanos, f: &mut Formatter) {
} }
} }


#[derive(Debug)]
pub enum Latency {
Ws(Exchange, Ticker, Duration),
Http(Exchange, Duration),
Trade(Exchange, Ticker, Duration),
Terminate
}

#[derive(Debug)] #[derive(Debug)]
pub enum ExperiencedLatency { pub enum ExperiencedLatency {


@@ -154,6 +162,42 @@ impl MeasurementWindow for WTen {
fn duration(&self) -> Duration { Duration::from_secs(10) } fn duration(&self) -> Duration { Duration::from_secs(10) }
} }



#[derive(Debug, Clone)]
pub struct Update {
pub gdax_ws: Nanos,
pub gdax_trade: Nanos,
pub gdax_last: DateTime<Utc>
}

impl Default for Update {
fn default() -> Self {
Update {
gdax_ws: 0,
gdax_trade: 0,
gdax_last: Utc::now(),
}
}
}

// impl Update {
// pub fn new(window: Duration) -> Self {
// Update {
// window,
// ws: OrderMap::new(),
// http: 0,
// trade: OrderMap::new(),
// }
// }
// }

// #[derive(Clone)]
// pub struct Updates {
// pub gdax_5: Update,
// pub gdax_30: Update,
// pub last: ByExchange<DateTime<Utc>>
// }

#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct LatencyUpdate<W> pub struct LatencyUpdate<W>
where W: MeasurementWindow where W: MeasurementWindow
@@ -206,7 +250,7 @@ impl<W> Default for LatencyUpdate<W>
impl<W> Display for LatencyUpdate<W> impl<W> Display for LatencyUpdate<W>
where W: MeasurementWindow where W: MeasurementWindow
{ {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, " gdax ws: "); write!(f, " gdax ws: ");
tfmt_write(self.gdax_ws, f); tfmt_write(self.gdax_ws, f);
write!(f, "\n krkn pub: "); write!(f, "\n krkn pub: ");
@@ -248,6 +292,12 @@ impl<W: MeasurementWindow> LatencyUpdate<W> {


} }


pub struct Manager {
pub tx: Sender<Latency>,
pub channel: PubSub<Update>,
thread: Option<JoinHandle<()>>,
}

pub struct LatencyManager<W> pub struct LatencyManager<W>
where W: MeasurementWindow + Clone + Send + Sync where W: MeasurementWindow + Clone + Send + Sync
{ {
@@ -267,6 +317,7 @@ struct Last {
broadcast: Instant, broadcast: Instant,
plnx: Instant, plnx: Instant,
krkn: Instant, krkn: Instant,
gdax: Instant,
} }


impl Default for Last { impl Default for Last {
@@ -275,10 +326,98 @@ impl Default for Last {
broadcast: Instant::now(), broadcast: Instant::now(),
plnx: Instant::now(), plnx: Instant::now(),
krkn: Instant::now(), krkn: Instant::now(),
gdax: Instant::now(),
}
}
}

impl Manager {
pub fn new(window: Duration,
log_path: &'static str,
measurements: Sender<OwnedMeasurement>) -> Self {

let (tx, rx) = channel();
let tx_copy = tx.clone();
let channel = PubSub::new();
let channel_copy = channel.clone();
let logger = file_logger(log_path);
info!(logger, "initializing");
let mut gdax_ws = DurationWindow::new(window);
let mut gdax_trade = DurationWindow::new(window);

let mut last = Last::default();

info!(logger, "entering loop");
let mut terminate = false;

let thread = Some(thread::spawn(move || {
loop {

let loop_time = Instant::now();

rx.try_recv().map(|msg| {
debug!(logger, "rcvd {:?}", msg);

match msg {
Latency::Ws(exch, ticker, dur) => {
// shortcut
gdax_ws.update(loop_time, dur);
last.gdax = loop_time;
}

Latency::Trade(exch, ticker, dur) => {
//shorcut
gdax_trade.update(loop_time, dur);
last.gdax = loop_time;
let nanos = DurationWindow::nanos(dur);
measurements.send(
OwnedMeasurement::new("gdax_trade_api")
.add_string_tag("ticker", ticker.to_string())
.add_field("nanos", OwnedValue::Integer(nanos as i64))
.set_timestamp(influx::now()));
}

Latency::Terminate => {
crit!(logger, "rcvd Terminate order");
terminate = true;
}

_ => {}
}
});

if loop_time - last.broadcast > Duration::from_millis(100) {
debug!(logger, "initalizing broadcast");

let update = Update {
gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
gdax_trade: gdax_trade.refresh(&loop_time).mean_nanos(),
gdax_last: dt_from_dur(loop_time - last.gdax)
};
channel.send(update);
last.broadcast = loop_time;
debug!(logger, "sent broadcast");
} else {
thread::sleep(Duration::from_millis(1) / 10);
}

if terminate { break }
}
crit!(logger, "goodbye");
}));

Manager {
tx,
channel: channel_copy,
thread,
} }
} }
} }




//impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> { //impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> {
impl LatencyManager<WTen> { impl LatencyManager<WTen> {
pub fn new(w: WTen) -> Self { pub fn new(w: WTen) -> Self {


+ 1
- 1
src/lib.rs View File

@@ -36,7 +36,7 @@ pub fn nanos(t: DateTime<Utc>) -> u64 {
(t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64) (t.timestamp() as u64) * 1_000_000_000_u64 + (t.timestamp_subsec_nanos() as u64)
} }


pub fn file_logger(path: &'static str) -> slog::Logger {
pub fn file_logger(path: &str) -> slog::Logger {
let mut builder = FileLoggerBuilder::new(path); let mut builder = FileLoggerBuilder::new(path);
builder.level(Severity::Debug); builder.level(Severity::Debug);
builder.timezone(TimeZone::Utc); builder.timezone(TimeZone::Utc);


+ 5
- 4
src/warnings.rs View File

@@ -271,7 +271,7 @@ impl MeasurementRecord {
values.serialize(record, &mut builder); values.serialize(record, &mut builder);
} }


pub fn meas<'a>(&'a self) -> Measurement<'a> {
pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> {
let fields: BTreeMap<&'a str, InfluentValue<'a>> = let fields: BTreeMap<&'a str, InfluentValue<'a>> =
self.fields.iter() self.fields.iter()
.map(|&(k, ref v)| { .map(|&(k, ref v)| {
@@ -285,7 +285,7 @@ impl MeasurementRecord {
}).collect(); }).collect();


Measurement { Measurement {
key: "log",
key: name,
timestamp: Some(nanos(Utc::now()) as i64), timestamp: Some(nanos(Utc::now()) as i64),
fields, fields,
tags, tags,
@@ -381,7 +381,8 @@ impl WarningsManager {
let ctx = zmq::Context::new(); let ctx = zmq::Context::new();
let socket = influx::push(&ctx).unwrap(); let socket = influx::push(&ctx).unwrap();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let logger = file_logger("var/log/warnings-manager.log");
let path = format!("var/log/warnings-manager-{}.log", measurement_name);
let logger = file_logger(&path);
info!(logger, "entering loop"); info!(logger, "entering loop");
loop { loop {
if let Ok(msg) = rx.recv() { if let Ok(msg) = rx.recv() {
@@ -394,7 +395,7 @@ impl WarningsManager {
Warning::Debug { msg, kv } => { Warning::Debug { msg, kv } => {
debug!(logger, "new Warning::Debug arrived"; debug!(logger, "new Warning::Debug arrived";
"msg" => &msg); "msg" => &msg);
let mut meas = kv.meas();
let mut meas = kv.to_measurement(measurement_name);
meas.add_field("msg", InfluentValue::String(msg.as_ref())); meas.add_field("msg", InfluentValue::String(msg.as_ref()));
meas.add_tag("category", "debug"); meas.add_tag("category", "debug");
influx::serialize(&meas, &mut buf); influx::serialize(&meas, &mut buf);


Loading…
Cancel
Save