You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

582 lines
17KB

  1. use std::thread::{self, JoinHandle};
  2. use std::sync::{Arc, Mutex, RwLock};
  3. use std::sync::mpsc::{self, Sender, Receiver, channel};
  4. use std::collections::VecDeque;
  5. use std::fmt::{self, Display, Write};
  6. use std::time::{Instant, Duration};
  7. use chrono::{self, DateTime, Utc, TimeZone};
  8. use pub_sub::PubSub;
  9. use zmq;
  10. use influent::measurement::{Measurement, Value};
  11. use windows::{DurationWindow, Incremental};
  12. use money::{Ticker, Side, ByExchange, Exchange};
  13. use super::file_logger;
  14. use influx::{self, OwnedMeasurement, OwnedValue};
  15. pub type Nanos = u64;
  16. pub const SECOND: u64 = 1e9 as u64;
  17. pub const MINUTE: u64 = SECOND * 60;
  18. pub const HOUR: u64 = MINUTE * 60;
  19. pub const MILLISECOND: u64 = SECOND / 1000;
  20. pub const MICROSECOND: u64 = MILLISECOND / 1000;
  21. pub fn nanos(d: Duration) -> Nanos {
  22. d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64)
  23. }
  24. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  25. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  26. }
  27. pub fn now() -> i64 { dt_nanos(Utc::now()) }
  28. pub fn tfmt(ns: Nanos) -> String {
  29. let mut f = String::new();
  30. match ns {
  31. t if t <= MICROSECOND => {
  32. write!(f, "{}ns", t);
  33. }
  34. t if t > MICROSECOND && t < MILLISECOND => {
  35. write!(f, "{}u", t / MICROSECOND);
  36. }
  37. t if t > MILLISECOND && t < SECOND => {
  38. write!(f, "{}ms", t / MILLISECOND);
  39. }
  40. t => {
  41. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  42. }
  43. }
  44. f
  45. }
  46. pub fn tfmt_dur(d: Duration) -> String {
  47. tfmt(nanos(d))
  48. }
  49. pub fn tfmt_dt(dt: DateTime<Utc>) -> String {
  50. Utc::now().signed_duration_since(dt)
  51. .to_std()
  52. .map(|dur| {
  53. tfmt_dur(dur)
  54. }).unwrap_or("?".into())
  55. }
  56. pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) {
  57. match ns {
  58. t if t <= MICROSECOND => {
  59. write!(f, "{}ns", t);
  60. }
  61. t if t > MICROSECOND && t < MILLISECOND => {
  62. write!(f, "{}u", t / MICROSECOND);
  63. }
  64. t if t > MILLISECOND && t < SECOND => {
  65. write!(f, "{}ms", t / MILLISECOND);
  66. }
  67. t => {
  68. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  69. }
  70. }
  71. }
  72. #[derive(Debug)]
  73. pub enum Latency {
  74. Ws(Exchange, Ticker, Duration),
  75. Http(Exchange, Duration),
  76. Trade(Exchange, Ticker, Duration),
  77. Terminate
  78. }
  79. #[derive(Debug)]
  80. pub enum ExperiencedLatency {
  81. GdaxWebsocket(Duration),
  82. //GdaxWebsocketNoLock(Duration),
  83. GdaxHttpPublic(Duration),
  84. GdaxHttpPrivate(Duration),
  85. PlnxHttpPublic(Duration),
  86. PlnxHttpPrivate(Duration),
  87. PlnxOrderBook(Duration),
  88. ExmoHttpPublic(Duration),
  89. KrknHttpPublic(Duration),
  90. KrknHttpPrivate(Duration),
  91. KrknTrade(Duration, &'static str, Option<Ticker>, Option<Side>),
  92. EventLoop(Duration),
  93. Terminate
  94. }
  95. // impl Message for ExperiencedLatency {
  96. // fn kill_switch() -> Self {
  97. // ExperiencedLatency::Terminate
  98. // }
  99. // }
  100. /// represents over what period of time
  101. /// the latency measurements were taken
  102. pub trait MeasurementWindow {
  103. fn duration(&self) -> Duration;
  104. }
  105. #[derive(Debug, Clone, Copy)]
  106. pub struct WThirty;
  107. impl Default for WThirty {
  108. fn default() -> Self { WThirty {} }
  109. }
  110. impl MeasurementWindow for WThirty {
  111. fn duration(&self) -> Duration { Duration::from_secs(30) }
  112. }
  113. #[derive(Debug, Clone, Copy)]
  114. pub struct WTen;
  115. impl Default for WTen {
  116. fn default() -> Self { WTen {} }
  117. }
  118. impl MeasurementWindow for WTen {
  119. fn duration(&self) -> Duration { Duration::from_secs(10) }
  120. }
  121. #[derive(Debug, Clone)]
  122. pub struct Update {
  123. pub gdax_ws: Nanos,
  124. pub gdax_trade: Nanos,
  125. pub gdax_last: DateTime<Utc>
  126. }
  127. impl Default for Update {
  128. fn default() -> Self {
  129. Update {
  130. gdax_ws: 0,
  131. gdax_trade: 0,
  132. gdax_last: Utc::now(),
  133. }
  134. }
  135. }
  136. // impl Update {
  137. // pub fn new(window: Duration) -> Self {
  138. // Update {
  139. // window,
  140. // ws: OrderMap::new(),
  141. // http: 0,
  142. // trade: OrderMap::new(),
  143. // }
  144. // }
  145. // }
  146. // #[derive(Clone)]
  147. // pub struct Updates {
  148. // pub gdax_5: Update,
  149. // pub gdax_30: Update,
  150. // pub last: ByExchange<DateTime<Utc>>
  151. // }
  152. #[derive(Debug, Clone)]
  153. pub struct LatencyUpdate<W>
  154. where W: MeasurementWindow
  155. {
  156. pub gdax_ws: Nanos,
  157. pub krkn_pub: Nanos,
  158. pub krkn_priv: Nanos,
  159. pub plnx_pub: Nanos,
  160. pub plnx_priv: Nanos,
  161. pub plnx_order: Nanos,
  162. pub krkn_trade_30_mean: Nanos,
  163. pub krkn_trade_30_max: Nanos,
  164. pub krkn_trade_300_mean: Nanos,
  165. pub krkn_trade_300_max: Nanos,
  166. pub plnx_last: DateTime<Utc>,
  167. pub krkn_last: DateTime<Utc>,
  168. //pub event_loop: Nanos,
  169. pub size: W,
  170. }
  171. impl<W> Default for LatencyUpdate<W>
  172. where W: MeasurementWindow + Default
  173. {
  174. fn default() -> Self {
  175. LatencyUpdate {
  176. gdax_ws: Nanos::default(),
  177. krkn_pub: Nanos::default(),
  178. krkn_priv: Nanos::default(),
  179. plnx_pub: Nanos::default(),
  180. plnx_priv: Nanos::default(),
  181. plnx_order: Nanos::default(),
  182. krkn_trade_30_mean: Nanos::default(),
  183. krkn_trade_30_max: Nanos::default(),
  184. krkn_trade_300_mean: Nanos::default(),
  185. krkn_trade_300_max: Nanos::default(),
  186. plnx_last: Utc::now(),
  187. krkn_last: Utc::now(),
  188. size: W::default()
  189. }
  190. }
  191. }
  192. impl<W> Display for LatencyUpdate<W>
  193. where W: MeasurementWindow
  194. {
  195. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  196. write!(f, " gdax ws: ");
  197. tfmt_write(self.gdax_ws, f);
  198. write!(f, "\n krkn pub: ");
  199. tfmt_write(self.krkn_pub, f);
  200. write!(f, "\n krkn priv: ");
  201. tfmt_write(self.krkn_priv, f);
  202. write!(f, "\n krkn trade 30 mean: ");
  203. tfmt_write(self.krkn_trade_30_mean, f);
  204. write!(f, "\n krkn trade 30 max: ");
  205. tfmt_write(self.krkn_trade_30_max, f);
  206. write!(f, "\n krkn trade 300 mean: ");
  207. tfmt_write(self.krkn_trade_300_mean, f);
  208. write!(f, "\n krkn trade 300 max: ");
  209. tfmt_write(self.krkn_trade_300_max, f);
  210. write!(f, "\n plnx pub: ");
  211. tfmt_write(self.plnx_pub, f);
  212. write!(f, "\n plnx priv: ");
  213. tfmt_write(self.plnx_priv, f);
  214. write!(f, "\n plnx orderbook loop: ");
  215. tfmt_write(self.plnx_order, f);
  216. //write!(f, "\n gdax ws nolock: ");
  217. //tfmt_write(self.gdax_ws_nolock, f);
  218. //write!(f, "\n event loop: ");
  219. //tfmt(self.event_loop, f);
  220. write!(f,"")
  221. }
  222. }
  223. impl<W: MeasurementWindow> LatencyUpdate<W> {
  224. pub fn measurement_window(&self) -> Duration {
  225. self.size.duration()
  226. }
  227. }
  228. pub struct Manager {
  229. pub tx: Sender<Latency>,
  230. pub channel: PubSub<Update>,
  231. thread: Option<JoinHandle<()>>,
  232. }
  233. pub struct LatencyManager<W>
  234. where W: MeasurementWindow + Clone + Send + Sync
  235. {
  236. pub tx: Sender<ExperiencedLatency>,
  237. pub channel: PubSub<LatencyUpdate<W>>,
  238. thread: Option<JoinHandle<()>>,
  239. }
  240. /// returns a DateTime equal to now - `dur`
  241. ///
  242. pub fn dt_from_dur(dur: Duration) -> DateTime<Utc> {
  243. let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64);
  244. Utc::now() - old_dur
  245. }
  246. struct Last {
  247. broadcast: Instant,
  248. plnx: Instant,
  249. krkn: Instant,
  250. gdax: Instant,
  251. }
  252. impl Default for Last {
  253. fn default() -> Self {
  254. Last {
  255. broadcast: Instant::now(),
  256. plnx: Instant::now(),
  257. krkn: Instant::now(),
  258. gdax: Instant::now(),
  259. }
  260. }
  261. }
  262. impl Manager {
  263. pub fn new(window: Duration,
  264. log_path: &'static str,
  265. measurements: Sender<OwnedMeasurement>) -> Self {
  266. let (tx, rx) = channel();
  267. let tx_copy = tx.clone();
  268. let channel = PubSub::new();
  269. let channel_copy = channel.clone();
  270. let logger = file_logger(log_path);
  271. info!(logger, "initializing");
  272. let mut gdax_ws = DurationWindow::new(window);
  273. let mut gdax_trade = DurationWindow::new(window);
  274. let mut last = Last::default();
  275. info!(logger, "entering loop");
  276. let mut terminate = false;
  277. let thread = Some(thread::spawn(move || {
  278. loop {
  279. let loop_time = Instant::now();
  280. rx.try_recv().map(|msg| {
  281. debug!(logger, "rcvd {:?}", msg);
  282. match msg {
  283. Latency::Ws(exch, ticker, dur) => {
  284. // shortcut
  285. gdax_ws.update(loop_time, dur);
  286. last.gdax = loop_time;
  287. }
  288. Latency::Trade(exch, ticker, dur) => {
  289. //shorcut
  290. gdax_trade.update(loop_time, dur);
  291. last.gdax = loop_time;
  292. let nanos = DurationWindow::nanos(dur);
  293. measurements.send(
  294. OwnedMeasurement::new("gdax_trade_api")
  295. .add_string_tag("ticker", ticker.to_string())
  296. .add_field("nanos", OwnedValue::Integer(nanos as i64))
  297. .set_timestamp(influx::now()));
  298. }
  299. Latency::Terminate => {
  300. crit!(logger, "rcvd Terminate order");
  301. terminate = true;
  302. }
  303. _ => {}
  304. }
  305. });
  306. if loop_time - last.broadcast > Duration::from_millis(100) {
  307. debug!(logger, "initalizing broadcast");
  308. let update = Update {
  309. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  310. gdax_trade: gdax_trade.refresh(&loop_time).mean_nanos(),
  311. gdax_last: dt_from_dur(loop_time - last.gdax)
  312. };
  313. channel.send(update);
  314. last.broadcast = loop_time;
  315. debug!(logger, "sent broadcast");
  316. } else {
  317. #[cfg(feature = "no-thrash")]
  318. thread::sleep(Duration::from_millis(1) / 10);
  319. }
  320. if terminate { break }
  321. }
  322. crit!(logger, "goodbye");
  323. }));
  324. Manager {
  325. tx,
  326. channel: channel_copy,
  327. thread,
  328. }
  329. }
  330. }
  331. //impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> {
  332. impl LatencyManager<WTen> {
  333. pub fn new(w: WTen) -> Self {
  334. let (tx, rx) = channel();
  335. let tx_copy = tx.clone();
  336. let channel = PubSub::new();
  337. let channel_copy = channel.clone();
  338. let w = w.clone();
  339. let thread = Some(thread::spawn(move || {
  340. let logger = file_logger("var/log/latency-manager.log");
  341. info!(logger, "initializing zmq");
  342. let ctx = zmq::Context::new();
  343. let socket = influx::push(&ctx).unwrap();
  344. let mut buf = String::with_capacity(4096);
  345. info!(logger, "initializing DurationWindows");
  346. let mut gdax_ws = DurationWindow::new(w.duration());
  347. let mut gdax_priv = DurationWindow::new(w.duration());
  348. let mut krkn_pub = DurationWindow::new(w.duration());
  349. let mut krkn_priv = DurationWindow::new(w.duration());
  350. let mut plnx_pub = DurationWindow::new(w.duration());
  351. let mut plnx_priv = DurationWindow::new(w.duration());
  352. let mut plnx_order = DurationWindow::new(w.duration());
  353. // yes I am intentionally breaking from the hard-typed duration
  354. // window ... that was a stupid idea
  355. //
  356. let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30));
  357. let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300));
  358. //let mut gdax_ws_nolock = DurationWindow::new(w.duration());
  359. //let mut event_loop = DurationWindow::new(w.duration());
  360. let mut last = Last::default();
  361. thread::sleep_ms(1);
  362. info!(logger, "entering loop");
  363. loop {
  364. let loop_time = Instant::now();
  365. if let Ok(msg) = rx.recv() {
  366. debug!(logger, "new msg: {:?}", msg);
  367. match msg {
  368. ExperiencedLatency::Terminate => {
  369. crit!(logger, "terminating");
  370. break;
  371. }
  372. ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d),
  373. //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(loop_time, d),
  374. ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d),
  375. ExperiencedLatency::KrknHttpPublic(d) => {
  376. last.krkn = loop_time;
  377. krkn_pub.update(loop_time, d)
  378. }
  379. ExperiencedLatency::KrknHttpPrivate(d) => {
  380. last.krkn = loop_time;
  381. krkn_priv.update(loop_time, d)
  382. }
  383. ExperiencedLatency::PlnxHttpPublic(d) => {
  384. last.plnx = loop_time;
  385. plnx_pub.update(loop_time, d)
  386. }
  387. ExperiencedLatency::PlnxHttpPrivate(d) => {
  388. last.plnx = loop_time;
  389. plnx_priv.update(loop_time, d)
  390. }
  391. ExperiencedLatency::PlnxOrderBook(d) => {
  392. last.plnx = loop_time;
  393. plnx_order.update(loop_time, d)
  394. }
  395. ExperiencedLatency::KrknTrade(d, cmd, ticker, side) => {
  396. debug!(logger, "new KrknTrade";
  397. "cmd" => cmd);
  398. last.krkn = loop_time;
  399. let n = DurationWindow::nanos(d);
  400. krkn_trade_30.update(loop_time, d);
  401. krkn_trade_300.update(loop_time, d);
  402. let ticker_s = ticker.map(|t| t.to_string()).unwrap_or("".into());
  403. let side_s = side.map(|s| s.to_string()).unwrap_or("".into());
  404. let mut m = Measurement::new("krkn_trade_api");
  405. m.add_field("nanos", Value::Integer(n as i64));
  406. m.add_tag("cmd", cmd);
  407. if ticker.is_some() {
  408. m.add_tag("ticker", &ticker_s);
  409. }
  410. if side.is_some() {
  411. m.add_tag("side", &side_s);
  412. }
  413. m.set_timestamp(now());
  414. influx::serialize(&m, &mut buf);
  415. socket.send_str(&buf, 0);
  416. buf.clear();
  417. }
  418. //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d),
  419. other => {
  420. warn!(logger, "unexpected msg: {:?}", other);
  421. }
  422. }
  423. }
  424. if loop_time - last.broadcast > Duration::from_millis(100) {
  425. debug!(logger, "initalizing broadcast");
  426. // note - because we mutated the Window instances
  427. // above, we need a fresh Instant to avoid less than other
  428. // panic
  429. //
  430. krkn_trade_30.refresh(&loop_time);
  431. krkn_trade_300.refresh(&loop_time);
  432. let update = LatencyUpdate {
  433. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  434. //gdax_ws_nolock: gdax_ws_nolock.refresh(&loop_time).mean_nanos(),
  435. krkn_pub: krkn_pub.refresh(&loop_time).mean_nanos(),
  436. krkn_priv: krkn_priv.refresh(&loop_time).mean_nanos(),
  437. plnx_pub: plnx_pub.refresh(&loop_time).mean_nanos(),
  438. plnx_priv: plnx_priv.refresh(&loop_time).mean_nanos(),
  439. plnx_order: plnx_order.refresh(&loop_time).mean_nanos(),
  440. krkn_trade_30_mean: krkn_trade_30.mean_nanos(),
  441. krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0),
  442. krkn_trade_300_mean: krkn_trade_300.mean_nanos(),
  443. krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0),
  444. plnx_last: dt_from_dur(loop_time - last.plnx),
  445. krkn_last: dt_from_dur(loop_time - last.krkn),
  446. //event_loop: event_loop.refresh(&now).mean_nanos(),
  447. size: w.clone(),
  448. };
  449. channel.send(update);
  450. last.broadcast = loop_time;
  451. debug!(logger, "sent broadcast");
  452. }
  453. }
  454. crit!(logger, "goodbye");
  455. }));
  456. LatencyManager {
  457. tx: tx_copy,
  458. channel: channel_copy,
  459. thread
  460. }
  461. }
  462. }