You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

604 lines
18KB

  1. use std::thread::{self, JoinHandle};
  2. use std::sync::{Arc, Mutex, RwLock};
  3. use std::sync::mpsc::{self, Sender, Receiver, channel};
  4. use std::collections::VecDeque;
  5. use std::fmt::{self, Display, Write};
  6. use std::time::{Instant, Duration};
  7. use chrono::{self, DateTime, Utc, TimeZone};
  8. use pub_sub::PubSub;
  9. use zmq;
  10. use influent::measurement::{Measurement, Value};
  11. use sloggers::types::Severity;
  12. use shuteye;
  13. use windows::{DurationWindow, Incremental, Window};
  14. use money::{Ticker, Side, ByExchange, Exchange};
  15. use super::file_logger;
  16. use influx::{self, OwnedMeasurement, OwnedValue};
  17. pub type Nanos = u64;
  18. pub const SECOND: u64 = 1e9 as u64;
  19. pub const MINUTE: u64 = SECOND * 60;
  20. pub const HOUR: u64 = MINUTE * 60;
  21. pub const MILLISECOND: u64 = SECOND / 1000;
  22. pub const MICROSECOND: u64 = MILLISECOND / 1000;
  23. pub fn nanos(d: Duration) -> Nanos {
  24. d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64)
  25. }
  26. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  27. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  28. }
  29. pub fn now() -> i64 { dt_nanos(Utc::now()) }
  30. pub fn tfmt(ns: Nanos) -> String {
  31. let mut f = String::new();
  32. match ns {
  33. t if t <= MICROSECOND => {
  34. write!(f, "{}ns", t);
  35. }
  36. t if t > MICROSECOND && t < MILLISECOND => {
  37. write!(f, "{}u", t / MICROSECOND);
  38. }
  39. t if t > MILLISECOND && t < SECOND => {
  40. write!(f, "{}ms", t / MILLISECOND);
  41. }
  42. t => {
  43. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  44. }
  45. }
  46. f
  47. }
  48. pub fn tfmt_dur(d: Duration) -> String {
  49. tfmt(nanos(d))
  50. }
  51. pub fn tfmt_dt(dt: DateTime<Utc>) -> String {
  52. Utc::now().signed_duration_since(dt)
  53. .to_std()
  54. .map(|dur| {
  55. tfmt_dur(dur)
  56. }).unwrap_or("?".into())
  57. }
  58. pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) {
  59. match ns {
  60. t if t <= MICROSECOND => {
  61. write!(f, "{}ns", t);
  62. }
  63. t if t > MICROSECOND && t < MILLISECOND => {
  64. write!(f, "{}u", t / MICROSECOND);
  65. }
  66. t if t > MILLISECOND && t < SECOND => {
  67. write!(f, "{}ms", t / MILLISECOND);
  68. }
  69. t => {
  70. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  71. }
  72. }
  73. }
  74. #[derive(Debug)]
  75. pub enum Latency {
  76. Ws(Exchange, Ticker, Duration),
  77. Http(Exchange, Duration),
  78. Trade(Exchange, Ticker, Duration),
  79. Terminate
  80. }
  81. #[derive(Debug)]
  82. pub enum ExperiencedLatency {
  83. GdaxWebsocket(Duration),
  84. //GdaxWebsocketNoLock(Duration),
  85. GdaxHttpPublic(Duration),
  86. GdaxHttpPrivate(Duration),
  87. PlnxHttpPublic(Duration),
  88. PlnxHttpPrivate(Duration),
  89. PlnxOrderBook(Duration),
  90. ExmoHttpPublic(Duration),
  91. KrknHttpPublic(Duration),
  92. KrknHttpPrivate(Duration),
  93. KrknTrade(Duration, &'static str, Option<Ticker>, Option<Side>),
  94. EventLoop(Duration),
  95. PlnxWs(Ticker),
  96. Terminate
  97. }
  98. // impl Message for ExperiencedLatency {
  99. // fn kill_switch() -> Self {
  100. // ExperiencedLatency::Terminate
  101. // }
  102. // }
  103. /// represents over what period of time
  104. /// the latency measurements were taken
  105. pub trait MeasurementWindow {
  106. fn duration(&self) -> Duration;
  107. }
  108. #[derive(Debug, Clone, Copy)]
  109. pub struct WThirty;
  110. impl Default for WThirty {
  111. fn default() -> Self { WThirty {} }
  112. }
  113. impl MeasurementWindow for WThirty {
  114. fn duration(&self) -> Duration { Duration::from_secs(30) }
  115. }
  116. #[derive(Debug, Clone, Copy)]
  117. pub struct WTen;
  118. impl Default for WTen {
  119. fn default() -> Self { WTen {} }
  120. }
  121. impl MeasurementWindow for WTen {
  122. fn duration(&self) -> Duration { Duration::from_secs(10) }
  123. }
  124. #[derive(Debug, Clone)]
  125. pub struct Update {
  126. pub gdax_ws: Nanos,
  127. pub gdax_trade: Nanos,
  128. pub gdax_last: DateTime<Utc>
  129. }
  130. impl Default for Update {
  131. fn default() -> Self {
  132. Update {
  133. gdax_ws: 0,
  134. gdax_trade: 0,
  135. gdax_last: Utc::now(),
  136. }
  137. }
  138. }
  139. // impl Update {
  140. // pub fn new(window: Duration) -> Self {
  141. // Update {
  142. // window,
  143. // ws: OrderMap::new(),
  144. // http: 0,
  145. // trade: OrderMap::new(),
  146. // }
  147. // }
  148. // }
  149. // #[derive(Clone)]
  150. // pub struct Updates {
  151. // pub gdax_5: Update,
  152. // pub gdax_30: Update,
  153. // pub last: ByExchange<DateTime<Utc>>
  154. // }
  155. #[derive(Debug, Clone)]
  156. pub struct LatencyUpdate<W>
  157. where W: MeasurementWindow
  158. {
  159. pub gdax_ws: Nanos,
  160. pub krkn_pub: Nanos,
  161. pub krkn_priv: Nanos,
  162. pub plnx_pub: Nanos,
  163. pub plnx_priv: Nanos,
  164. pub plnx_order: Nanos,
  165. pub krkn_trade_30_mean: Nanos,
  166. pub krkn_trade_30_max: Nanos,
  167. pub krkn_trade_300_mean: Nanos,
  168. pub krkn_trade_300_max: Nanos,
  169. pub plnx_last: DateTime<Utc>,
  170. pub krkn_last: DateTime<Utc>,
  171. pub plnx_ws_count: u64,
  172. //pub event_loop: Nanos,
  173. pub size: W,
  174. }
  175. impl<W> Default for LatencyUpdate<W>
  176. where W: MeasurementWindow + Default
  177. {
  178. fn default() -> Self {
  179. LatencyUpdate {
  180. gdax_ws: Nanos::default(),
  181. krkn_pub: Nanos::default(),
  182. krkn_priv: Nanos::default(),
  183. plnx_pub: Nanos::default(),
  184. plnx_priv: Nanos::default(),
  185. plnx_order: Nanos::default(),
  186. krkn_trade_30_mean: Nanos::default(),
  187. krkn_trade_30_max: Nanos::default(),
  188. krkn_trade_300_mean: Nanos::default(),
  189. krkn_trade_300_max: Nanos::default(),
  190. plnx_ws_count: 0,
  191. plnx_last: Utc::now(),
  192. krkn_last: Utc::now(),
  193. size: W::default()
  194. }
  195. }
  196. }
  197. impl<W> Display for LatencyUpdate<W>
  198. where W: MeasurementWindow
  199. {
  200. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  201. write!(f, " gdax ws: ");
  202. tfmt_write(self.gdax_ws, f);
  203. write!(f, "\n krkn pub: ");
  204. tfmt_write(self.krkn_pub, f);
  205. write!(f, "\n krkn priv: ");
  206. tfmt_write(self.krkn_priv, f);
  207. write!(f, "\n krkn trade 30 mean: ");
  208. tfmt_write(self.krkn_trade_30_mean, f);
  209. write!(f, "\n krkn trade 30 max: ");
  210. tfmt_write(self.krkn_trade_30_max, f);
  211. write!(f, "\n krkn trade 300 mean: ");
  212. tfmt_write(self.krkn_trade_300_mean, f);
  213. write!(f, "\n krkn trade 300 max: ");
  214. tfmt_write(self.krkn_trade_300_max, f);
  215. write!(f, "\n plnx pub: ");
  216. tfmt_write(self.plnx_pub, f);
  217. write!(f, "\n plnx priv: ");
  218. tfmt_write(self.plnx_priv, f);
  219. write!(f, "\n plnx orderbook loop: ");
  220. tfmt_write(self.plnx_order, f);
  221. //write!(f, "\n gdax ws nolock: ");
  222. //tfmt_write(self.gdax_ws_nolock, f);
  223. //write!(f, "\n event loop: ");
  224. //tfmt(self.event_loop, f);
  225. write!(f,"")
  226. }
  227. }
  228. impl<W: MeasurementWindow> LatencyUpdate<W> {
  229. pub fn measurement_window(&self) -> Duration {
  230. self.size.duration()
  231. }
  232. }
  233. pub struct Manager {
  234. pub tx: Sender<Latency>,
  235. pub channel: PubSub<Update>,
  236. thread: Option<JoinHandle<()>>,
  237. }
  238. pub struct LatencyManager<W>
  239. where W: MeasurementWindow + Clone + Send + Sync
  240. {
  241. pub tx: Sender<ExperiencedLatency>,
  242. pub channel: PubSub<LatencyUpdate<W>>,
  243. thread: Option<JoinHandle<()>>,
  244. }
  245. /// returns a DateTime equal to now - `dur`
  246. ///
  247. pub fn dt_from_dur(dur: Duration) -> DateTime<Utc> {
  248. let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64);
  249. Utc::now() - old_dur
  250. }
  251. struct Last {
  252. broadcast: Instant,
  253. plnx: Instant,
  254. krkn: Instant,
  255. gdax: Instant,
  256. }
  257. impl Default for Last {
  258. fn default() -> Self {
  259. Last {
  260. broadcast: Instant::now(),
  261. plnx: Instant::now(),
  262. krkn: Instant::now(),
  263. gdax: Instant::now(),
  264. }
  265. }
  266. }
  267. impl Manager {
  268. pub fn new(window: Duration,
  269. log_path: &'static str,
  270. measurements: Sender<OwnedMeasurement>) -> Self {
  271. let (tx, rx) = channel();
  272. let tx_copy = tx.clone();
  273. let channel = PubSub::new();
  274. let channel_copy = channel.clone();
  275. let logger = file_logger(log_path, Severity::Info);
  276. info!(logger, "initializing");
  277. let mut gdax_ws = DurationWindow::new(window);
  278. let mut gdax_trade = DurationWindow::new(window);
  279. let mut last = Last::default();
  280. info!(logger, "entering loop");
  281. let mut terminate = false;
  282. let thread = Some(thread::spawn(move || {
  283. loop {
  284. let loop_time = Instant::now();
  285. rx.try_recv().map(|msg| {
  286. debug!(logger, "rcvd {:?}", msg);
  287. match msg {
  288. Latency::Ws(exch, ticker, dur) => {
  289. // shortcut
  290. gdax_ws.update(loop_time, dur);
  291. last.gdax = loop_time;
  292. }
  293. Latency::Trade(exch, ticker, dur) => {
  294. //shorcut
  295. gdax_trade.update(loop_time, dur);
  296. last.gdax = loop_time;
  297. let nanos = DurationWindow::nanos(dur);
  298. measurements.send(
  299. OwnedMeasurement::new("gdax_trade_api")
  300. .add_string_tag("ticker", ticker.to_string())
  301. .add_field("nanos", OwnedValue::Integer(nanos as i64))
  302. .set_timestamp(influx::now()));
  303. }
  304. Latency::Terminate => {
  305. crit!(logger, "rcvd Terminate order");
  306. terminate = true;
  307. }
  308. _ => {}
  309. }
  310. });
  311. if loop_time - last.broadcast > Duration::from_millis(100) {
  312. debug!(logger, "initalizing broadcast");
  313. let update = Update {
  314. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  315. gdax_trade: gdax_trade.refresh(&loop_time).mean_nanos(),
  316. gdax_last: dt_from_dur(loop_time - last.gdax)
  317. };
  318. channel.send(update);
  319. last.broadcast = loop_time;
  320. debug!(logger, "sent broadcast");
  321. } else {
  322. #[cfg(feature = "no-thrash")]
  323. shuteye::sleep(Duration::new(0, 1000));
  324. }
  325. if terminate { break }
  326. }
  327. crit!(logger, "goodbye");
  328. }));
  329. Manager {
  330. tx,
  331. channel: channel_copy,
  332. thread,
  333. }
  334. }
  335. }
  336. impl Drop for Manager {
  337. fn drop(&mut self) {
  338. self.tx.send(Latency::Terminate);
  339. if let Some(thread) = self.thread.take() {
  340. let _ = thread.join();
  341. }
  342. }
  343. }
  344. //impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> {
  345. impl LatencyManager<WTen> {
  346. pub fn new(w: WTen) -> Self {
  347. let (tx, rx) = channel();
  348. let tx_copy = tx.clone();
  349. let channel = PubSub::new();
  350. let channel_copy = channel.clone();
  351. let w = w.clone();
  352. let thread = Some(thread::spawn(move || {
  353. let logger = file_logger("var/log/latency-manager.log", Severity::Info);
  354. info!(logger, "initializing zmq");
  355. let ctx = zmq::Context::new();
  356. let socket = influx::push(&ctx).unwrap();
  357. let mut buf = String::with_capacity(4096);
  358. info!(logger, "initializing DurationWindows");
  359. let mut gdax_ws = DurationWindow::new(w.duration());
  360. let mut gdax_priv = DurationWindow::new(w.duration());
  361. let mut krkn_pub = DurationWindow::new(w.duration());
  362. let mut krkn_priv = DurationWindow::new(w.duration());
  363. let mut plnx_pub = DurationWindow::new(w.duration());
  364. let mut plnx_priv = DurationWindow::new(w.duration());
  365. let mut plnx_order = DurationWindow::new(w.duration());
  366. let mut plnx_ws_count: Window<u32> = Window::new(w.duration());
  367. // yes I am intentionally breaking from the hard-typed duration
  368. // window ... that was a stupid idea
  369. //
  370. let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30));
  371. let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300));
  372. //let mut gdax_ws_nolock = DurationWindow::new(w.duration());
  373. //let mut event_loop = DurationWindow::new(w.duration());
  374. let mut last = Last::default();
  375. thread::sleep_ms(1);
  376. info!(logger, "entering loop");
  377. loop {
  378. let loop_time = Instant::now();
  379. if let Ok(msg) = rx.recv() {
  380. debug!(logger, "new msg: {:?}", msg);
  381. match msg {
  382. ExperiencedLatency::Terminate => {
  383. crit!(logger, "terminating");
  384. break;
  385. }
  386. ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d),
  387. //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(loop_time, d),
  388. ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d),
  389. ExperiencedLatency::KrknHttpPublic(d) => {
  390. last.krkn = loop_time;
  391. krkn_pub.update(loop_time, d)
  392. }
  393. ExperiencedLatency::KrknHttpPrivate(d) => {
  394. last.krkn = loop_time;
  395. krkn_priv.update(loop_time, d)
  396. }
  397. ExperiencedLatency::PlnxHttpPublic(d) => {
  398. last.plnx = loop_time;
  399. plnx_pub.update(loop_time, d)
  400. }
  401. ExperiencedLatency::PlnxHttpPrivate(d) => {
  402. last.plnx = loop_time;
  403. plnx_priv.update(loop_time, d)
  404. }
  405. ExperiencedLatency::PlnxOrderBook(d) => {
  406. last.plnx = loop_time;
  407. plnx_order.update(loop_time, d)
  408. }
  409. ExperiencedLatency::PlnxWs(_) => {
  410. last.plnx = loop_time;
  411. plnx_ws_count.update(loop_time, 1_u32);
  412. }
  413. ExperiencedLatency::KrknTrade(d, cmd, ticker, side) => {
  414. debug!(logger, "new KrknTrade";
  415. "cmd" => cmd);
  416. last.krkn = loop_time;
  417. let n = DurationWindow::nanos(d);
  418. krkn_trade_30.update(loop_time, d);
  419. krkn_trade_300.update(loop_time, d);
  420. let ticker_s = ticker.map(|t| t.to_string()).unwrap_or("".into());
  421. let side_s = side.map(|s| s.to_string()).unwrap_or("".into());
  422. let mut m = Measurement::new("krkn_trade_api");
  423. m.add_field("nanos", Value::Integer(n as i64));
  424. m.add_tag("cmd", cmd);
  425. if ticker.is_some() {
  426. m.add_tag("ticker", &ticker_s);
  427. }
  428. if side.is_some() {
  429. m.add_tag("side", &side_s);
  430. }
  431. m.set_timestamp(now());
  432. influx::serialize(&m, &mut buf);
  433. socket.send_str(&buf, 0);
  434. buf.clear();
  435. }
  436. //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d),
  437. other => {
  438. warn!(logger, "unexpected msg: {:?}", other);
  439. }
  440. }
  441. }
  442. if loop_time - last.broadcast > Duration::from_millis(100) {
  443. debug!(logger, "initalizing broadcast");
  444. // note - because we mutated the Window instances
  445. // above, we need a fresh Instant to avoid less than other
  446. // panic
  447. //
  448. krkn_trade_30.refresh(&loop_time);
  449. krkn_trade_300.refresh(&loop_time);
  450. let update = LatencyUpdate {
  451. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  452. //gdax_ws_nolock: gdax_ws_nolock.refresh(&loop_time).mean_nanos(),
  453. krkn_pub: krkn_pub.refresh(&loop_time).mean_nanos(),
  454. krkn_priv: krkn_priv.refresh(&loop_time).mean_nanos(),
  455. plnx_pub: plnx_pub.refresh(&loop_time).mean_nanos(),
  456. plnx_priv: plnx_priv.refresh(&loop_time).mean_nanos(),
  457. plnx_order: plnx_order.refresh(&loop_time).mean_nanos(),
  458. krkn_trade_30_mean: krkn_trade_30.mean_nanos(),
  459. krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0),
  460. krkn_trade_300_mean: krkn_trade_300.mean_nanos(),
  461. krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0),
  462. plnx_last: dt_from_dur(loop_time - last.plnx),
  463. krkn_last: dt_from_dur(loop_time - last.krkn),
  464. plnx_ws_count: plnx_ws_count.refresh(&loop_time).count() as u64,
  465. //event_loop: event_loop.refresh(&now).mean_nanos(),
  466. size: w.clone(),
  467. };
  468. channel.send(update);
  469. last.broadcast = loop_time;
  470. debug!(logger, "sent broadcast");
  471. }
  472. }
  473. crit!(logger, "goodbye");
  474. }));
  475. LatencyManager {
  476. tx: tx_copy,
  477. channel: channel_copy,
  478. thread
  479. }
  480. }
  481. }