You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

592 lines
18KB

  1. use std::thread::{self, JoinHandle};
  2. use std::sync::{Arc, Mutex, RwLock};
  3. use std::sync::mpsc::{self, Sender, Receiver, channel};
  4. use std::collections::VecDeque;
  5. use std::fmt::{self, Display, Write};
  6. use std::time::{Instant, Duration};
  7. use chrono::{self, DateTime, Utc, TimeZone};
  8. use pub_sub::PubSub;
  9. use zmq;
  10. use influent::measurement::{Measurement, Value};
  11. use sloggers::types::Severity;
  12. use windows::{DurationWindow, Incremental};
  13. use money::{Ticker, Side, ByExchange, Exchange};
  14. use super::file_logger;
  15. use influx::{self, OwnedMeasurement, OwnedValue};
  16. pub type Nanos = u64;
  17. pub const SECOND: u64 = 1e9 as u64;
  18. pub const MINUTE: u64 = SECOND * 60;
  19. pub const HOUR: u64 = MINUTE * 60;
  20. pub const MILLISECOND: u64 = SECOND / 1000;
  21. pub const MICROSECOND: u64 = MILLISECOND / 1000;
  22. pub fn nanos(d: Duration) -> Nanos {
  23. d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64)
  24. }
  25. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  26. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  27. }
  28. pub fn now() -> i64 { dt_nanos(Utc::now()) }
  29. pub fn tfmt(ns: Nanos) -> String {
  30. let mut f = String::new();
  31. match ns {
  32. t if t <= MICROSECOND => {
  33. write!(f, "{}ns", t);
  34. }
  35. t if t > MICROSECOND && t < MILLISECOND => {
  36. write!(f, "{}u", t / MICROSECOND);
  37. }
  38. t if t > MILLISECOND && t < SECOND => {
  39. write!(f, "{}ms", t / MILLISECOND);
  40. }
  41. t => {
  42. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  43. }
  44. }
  45. f
  46. }
  47. pub fn tfmt_dur(d: Duration) -> String {
  48. tfmt(nanos(d))
  49. }
  50. pub fn tfmt_dt(dt: DateTime<Utc>) -> String {
  51. Utc::now().signed_duration_since(dt)
  52. .to_std()
  53. .map(|dur| {
  54. tfmt_dur(dur)
  55. }).unwrap_or("?".into())
  56. }
  57. pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) {
  58. match ns {
  59. t if t <= MICROSECOND => {
  60. write!(f, "{}ns", t);
  61. }
  62. t if t > MICROSECOND && t < MILLISECOND => {
  63. write!(f, "{}u", t / MICROSECOND);
  64. }
  65. t if t > MILLISECOND && t < SECOND => {
  66. write!(f, "{}ms", t / MILLISECOND);
  67. }
  68. t => {
  69. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  70. }
  71. }
  72. }
  73. #[derive(Debug)]
  74. pub enum Latency {
  75. Ws(Exchange, Ticker, Duration),
  76. Http(Exchange, Duration),
  77. Trade(Exchange, Ticker, Duration),
  78. Terminate
  79. }
  80. #[derive(Debug)]
  81. pub enum ExperiencedLatency {
  82. GdaxWebsocket(Duration),
  83. //GdaxWebsocketNoLock(Duration),
  84. GdaxHttpPublic(Duration),
  85. GdaxHttpPrivate(Duration),
  86. PlnxHttpPublic(Duration),
  87. PlnxHttpPrivate(Duration),
  88. PlnxOrderBook(Duration),
  89. ExmoHttpPublic(Duration),
  90. KrknHttpPublic(Duration),
  91. KrknHttpPrivate(Duration),
  92. KrknTrade(Duration, &'static str, Option<Ticker>, Option<Side>),
  93. EventLoop(Duration),
  94. Terminate
  95. }
  96. // impl Message for ExperiencedLatency {
  97. // fn kill_switch() -> Self {
  98. // ExperiencedLatency::Terminate
  99. // }
  100. // }
  101. /// represents over what period of time
  102. /// the latency measurements were taken
  103. pub trait MeasurementWindow {
  104. fn duration(&self) -> Duration;
  105. }
  106. #[derive(Debug, Clone, Copy)]
  107. pub struct WThirty;
  108. impl Default for WThirty {
  109. fn default() -> Self { WThirty {} }
  110. }
  111. impl MeasurementWindow for WThirty {
  112. fn duration(&self) -> Duration { Duration::from_secs(30) }
  113. }
  114. #[derive(Debug, Clone, Copy)]
  115. pub struct WTen;
  116. impl Default for WTen {
  117. fn default() -> Self { WTen {} }
  118. }
  119. impl MeasurementWindow for WTen {
  120. fn duration(&self) -> Duration { Duration::from_secs(10) }
  121. }
  122. #[derive(Debug, Clone)]
  123. pub struct Update {
  124. pub gdax_ws: Nanos,
  125. pub gdax_trade: Nanos,
  126. pub gdax_last: DateTime<Utc>
  127. }
  128. impl Default for Update {
  129. fn default() -> Self {
  130. Update {
  131. gdax_ws: 0,
  132. gdax_trade: 0,
  133. gdax_last: Utc::now(),
  134. }
  135. }
  136. }
  137. // impl Update {
  138. // pub fn new(window: Duration) -> Self {
  139. // Update {
  140. // window,
  141. // ws: OrderMap::new(),
  142. // http: 0,
  143. // trade: OrderMap::new(),
  144. // }
  145. // }
  146. // }
  147. // #[derive(Clone)]
  148. // pub struct Updates {
  149. // pub gdax_5: Update,
  150. // pub gdax_30: Update,
  151. // pub last: ByExchange<DateTime<Utc>>
  152. // }
  153. #[derive(Debug, Clone)]
  154. pub struct LatencyUpdate<W>
  155. where W: MeasurementWindow
  156. {
  157. pub gdax_ws: Nanos,
  158. pub krkn_pub: Nanos,
  159. pub krkn_priv: Nanos,
  160. pub plnx_pub: Nanos,
  161. pub plnx_priv: Nanos,
  162. pub plnx_order: Nanos,
  163. pub krkn_trade_30_mean: Nanos,
  164. pub krkn_trade_30_max: Nanos,
  165. pub krkn_trade_300_mean: Nanos,
  166. pub krkn_trade_300_max: Nanos,
  167. pub plnx_last: DateTime<Utc>,
  168. pub krkn_last: DateTime<Utc>,
  169. //pub event_loop: Nanos,
  170. pub size: W,
  171. }
  172. impl<W> Default for LatencyUpdate<W>
  173. where W: MeasurementWindow + Default
  174. {
  175. fn default() -> Self {
  176. LatencyUpdate {
  177. gdax_ws: Nanos::default(),
  178. krkn_pub: Nanos::default(),
  179. krkn_priv: Nanos::default(),
  180. plnx_pub: Nanos::default(),
  181. plnx_priv: Nanos::default(),
  182. plnx_order: Nanos::default(),
  183. krkn_trade_30_mean: Nanos::default(),
  184. krkn_trade_30_max: Nanos::default(),
  185. krkn_trade_300_mean: Nanos::default(),
  186. krkn_trade_300_max: Nanos::default(),
  187. plnx_last: Utc::now(),
  188. krkn_last: Utc::now(),
  189. size: W::default()
  190. }
  191. }
  192. }
  193. impl<W> Display for LatencyUpdate<W>
  194. where W: MeasurementWindow
  195. {
  196. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  197. write!(f, " gdax ws: ");
  198. tfmt_write(self.gdax_ws, f);
  199. write!(f, "\n krkn pub: ");
  200. tfmt_write(self.krkn_pub, f);
  201. write!(f, "\n krkn priv: ");
  202. tfmt_write(self.krkn_priv, f);
  203. write!(f, "\n krkn trade 30 mean: ");
  204. tfmt_write(self.krkn_trade_30_mean, f);
  205. write!(f, "\n krkn trade 30 max: ");
  206. tfmt_write(self.krkn_trade_30_max, f);
  207. write!(f, "\n krkn trade 300 mean: ");
  208. tfmt_write(self.krkn_trade_300_mean, f);
  209. write!(f, "\n krkn trade 300 max: ");
  210. tfmt_write(self.krkn_trade_300_max, f);
  211. write!(f, "\n plnx pub: ");
  212. tfmt_write(self.plnx_pub, f);
  213. write!(f, "\n plnx priv: ");
  214. tfmt_write(self.plnx_priv, f);
  215. write!(f, "\n plnx orderbook loop: ");
  216. tfmt_write(self.plnx_order, f);
  217. //write!(f, "\n gdax ws nolock: ");
  218. //tfmt_write(self.gdax_ws_nolock, f);
  219. //write!(f, "\n event loop: ");
  220. //tfmt(self.event_loop, f);
  221. write!(f,"")
  222. }
  223. }
  224. impl<W: MeasurementWindow> LatencyUpdate<W> {
  225. pub fn measurement_window(&self) -> Duration {
  226. self.size.duration()
  227. }
  228. }
  229. pub struct Manager {
  230. pub tx: Sender<Latency>,
  231. pub channel: PubSub<Update>,
  232. thread: Option<JoinHandle<()>>,
  233. }
  234. pub struct LatencyManager<W>
  235. where W: MeasurementWindow + Clone + Send + Sync
  236. {
  237. pub tx: Sender<ExperiencedLatency>,
  238. pub channel: PubSub<LatencyUpdate<W>>,
  239. thread: Option<JoinHandle<()>>,
  240. }
  241. /// returns a DateTime equal to now - `dur`
  242. ///
  243. pub fn dt_from_dur(dur: Duration) -> DateTime<Utc> {
  244. let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64);
  245. Utc::now() - old_dur
  246. }
  247. struct Last {
  248. broadcast: Instant,
  249. plnx: Instant,
  250. krkn: Instant,
  251. gdax: Instant,
  252. }
  253. impl Default for Last {
  254. fn default() -> Self {
  255. Last {
  256. broadcast: Instant::now(),
  257. plnx: Instant::now(),
  258. krkn: Instant::now(),
  259. gdax: Instant::now(),
  260. }
  261. }
  262. }
  263. impl Manager {
  264. pub fn new(window: Duration,
  265. log_path: &'static str,
  266. measurements: Sender<OwnedMeasurement>) -> Self {
  267. let (tx, rx) = channel();
  268. let tx_copy = tx.clone();
  269. let channel = PubSub::new();
  270. let channel_copy = channel.clone();
  271. let logger = file_logger(log_path, Severity::Info);
  272. info!(logger, "initializing");
  273. let mut gdax_ws = DurationWindow::new(window);
  274. let mut gdax_trade = DurationWindow::new(window);
  275. let mut last = Last::default();
  276. info!(logger, "entering loop");
  277. let mut terminate = false;
  278. let thread = Some(thread::spawn(move || {
  279. loop {
  280. let loop_time = Instant::now();
  281. rx.try_recv().map(|msg| {
  282. debug!(logger, "rcvd {:?}", msg);
  283. match msg {
  284. Latency::Ws(exch, ticker, dur) => {
  285. // shortcut
  286. gdax_ws.update(loop_time, dur);
  287. last.gdax = loop_time;
  288. }
  289. Latency::Trade(exch, ticker, dur) => {
  290. //shorcut
  291. gdax_trade.update(loop_time, dur);
  292. last.gdax = loop_time;
  293. let nanos = DurationWindow::nanos(dur);
  294. measurements.send(
  295. OwnedMeasurement::new("gdax_trade_api")
  296. .add_string_tag("ticker", ticker.to_string())
  297. .add_field("nanos", OwnedValue::Integer(nanos as i64))
  298. .set_timestamp(influx::now()));
  299. }
  300. Latency::Terminate => {
  301. crit!(logger, "rcvd Terminate order");
  302. terminate = true;
  303. }
  304. _ => {}
  305. }
  306. });
  307. if loop_time - last.broadcast > Duration::from_millis(100) {
  308. debug!(logger, "initalizing broadcast");
  309. let update = Update {
  310. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  311. gdax_trade: gdax_trade.refresh(&loop_time).mean_nanos(),
  312. gdax_last: dt_from_dur(loop_time - last.gdax)
  313. };
  314. channel.send(update);
  315. last.broadcast = loop_time;
  316. debug!(logger, "sent broadcast");
  317. } else {
  318. #[cfg(feature = "no-thrash")]
  319. thread::sleep(Duration::from_millis(1) / 10);
  320. }
  321. if terminate { break }
  322. }
  323. crit!(logger, "goodbye");
  324. }));
  325. Manager {
  326. tx,
  327. channel: channel_copy,
  328. thread,
  329. }
  330. }
  331. }
  332. impl Drop for Manager {
  333. fn drop(&mut self) {
  334. self.tx.send(Latency::Terminate);
  335. if let Some(thread) = self.thread.take() {
  336. let _ = thread.join();
  337. }
  338. }
  339. }
  340. //impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> {
  341. impl LatencyManager<WTen> {
  342. pub fn new(w: WTen) -> Self {
  343. let (tx, rx) = channel();
  344. let tx_copy = tx.clone();
  345. let channel = PubSub::new();
  346. let channel_copy = channel.clone();
  347. let w = w.clone();
  348. let thread = Some(thread::spawn(move || {
  349. let logger = file_logger("var/log/latency-manager.log", Severity::Info);
  350. info!(logger, "initializing zmq");
  351. let ctx = zmq::Context::new();
  352. let socket = influx::push(&ctx).unwrap();
  353. let mut buf = String::with_capacity(4096);
  354. info!(logger, "initializing DurationWindows");
  355. let mut gdax_ws = DurationWindow::new(w.duration());
  356. let mut gdax_priv = DurationWindow::new(w.duration());
  357. let mut krkn_pub = DurationWindow::new(w.duration());
  358. let mut krkn_priv = DurationWindow::new(w.duration());
  359. let mut plnx_pub = DurationWindow::new(w.duration());
  360. let mut plnx_priv = DurationWindow::new(w.duration());
  361. let mut plnx_order = DurationWindow::new(w.duration());
  362. // yes I am intentionally breaking from the hard-typed duration
  363. // window ... that was a stupid idea
  364. //
  365. let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30));
  366. let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300));
  367. //let mut gdax_ws_nolock = DurationWindow::new(w.duration());
  368. //let mut event_loop = DurationWindow::new(w.duration());
  369. let mut last = Last::default();
  370. thread::sleep_ms(1);
  371. info!(logger, "entering loop");
  372. loop {
  373. let loop_time = Instant::now();
  374. if let Ok(msg) = rx.recv() {
  375. debug!(logger, "new msg: {:?}", msg);
  376. match msg {
  377. ExperiencedLatency::Terminate => {
  378. crit!(logger, "terminating");
  379. break;
  380. }
  381. ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d),
  382. //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(loop_time, d),
  383. ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d),
  384. ExperiencedLatency::KrknHttpPublic(d) => {
  385. last.krkn = loop_time;
  386. krkn_pub.update(loop_time, d)
  387. }
  388. ExperiencedLatency::KrknHttpPrivate(d) => {
  389. last.krkn = loop_time;
  390. krkn_priv.update(loop_time, d)
  391. }
  392. ExperiencedLatency::PlnxHttpPublic(d) => {
  393. last.plnx = loop_time;
  394. plnx_pub.update(loop_time, d)
  395. }
  396. ExperiencedLatency::PlnxHttpPrivate(d) => {
  397. last.plnx = loop_time;
  398. plnx_priv.update(loop_time, d)
  399. }
  400. ExperiencedLatency::PlnxOrderBook(d) => {
  401. last.plnx = loop_time;
  402. plnx_order.update(loop_time, d)
  403. }
  404. ExperiencedLatency::KrknTrade(d, cmd, ticker, side) => {
  405. debug!(logger, "new KrknTrade";
  406. "cmd" => cmd);
  407. last.krkn = loop_time;
  408. let n = DurationWindow::nanos(d);
  409. krkn_trade_30.update(loop_time, d);
  410. krkn_trade_300.update(loop_time, d);
  411. let ticker_s = ticker.map(|t| t.to_string()).unwrap_or("".into());
  412. let side_s = side.map(|s| s.to_string()).unwrap_or("".into());
  413. let mut m = Measurement::new("krkn_trade_api");
  414. m.add_field("nanos", Value::Integer(n as i64));
  415. m.add_tag("cmd", cmd);
  416. if ticker.is_some() {
  417. m.add_tag("ticker", &ticker_s);
  418. }
  419. if side.is_some() {
  420. m.add_tag("side", &side_s);
  421. }
  422. m.set_timestamp(now());
  423. influx::serialize(&m, &mut buf);
  424. socket.send_str(&buf, 0);
  425. buf.clear();
  426. }
  427. //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d),
  428. other => {
  429. warn!(logger, "unexpected msg: {:?}", other);
  430. }
  431. }
  432. }
  433. if loop_time - last.broadcast > Duration::from_millis(100) {
  434. debug!(logger, "initalizing broadcast");
  435. // note - because we mutated the Window instances
  436. // above, we need a fresh Instant to avoid less than other
  437. // panic
  438. //
  439. krkn_trade_30.refresh(&loop_time);
  440. krkn_trade_300.refresh(&loop_time);
  441. let update = LatencyUpdate {
  442. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  443. //gdax_ws_nolock: gdax_ws_nolock.refresh(&loop_time).mean_nanos(),
  444. krkn_pub: krkn_pub.refresh(&loop_time).mean_nanos(),
  445. krkn_priv: krkn_priv.refresh(&loop_time).mean_nanos(),
  446. plnx_pub: plnx_pub.refresh(&loop_time).mean_nanos(),
  447. plnx_priv: plnx_priv.refresh(&loop_time).mean_nanos(),
  448. plnx_order: plnx_order.refresh(&loop_time).mean_nanos(),
  449. krkn_trade_30_mean: krkn_trade_30.mean_nanos(),
  450. krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0),
  451. krkn_trade_300_mean: krkn_trade_300.mean_nanos(),
  452. krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0),
  453. plnx_last: dt_from_dur(loop_time - last.plnx),
  454. krkn_last: dt_from_dur(loop_time - last.krkn),
  455. //event_loop: event_loop.refresh(&now).mean_nanos(),
  456. size: w.clone(),
  457. };
  458. channel.send(update);
  459. last.broadcast = loop_time;
  460. debug!(logger, "sent broadcast");
  461. }
  462. }
  463. crit!(logger, "goodbye");
  464. }));
  465. LatencyManager {
  466. tx: tx_copy,
  467. channel: channel_copy,
  468. thread
  469. }
  470. }
  471. }