You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

585 lines
18KB

  1. use std::thread::{self, JoinHandle};
  2. use std::sync::{Arc, Mutex, RwLock};
  3. use std::sync::mpsc::{self, Sender, Receiver, channel};
  4. use std::collections::VecDeque;
  5. use std::fmt::{self, Display, Write};
  6. use std::time::{Instant, Duration};
  7. use chrono::{self, DateTime, Utc, TimeZone};
  8. use pub_sub::PubSub;
  9. use zmq;
  10. use influent::measurement::{Measurement, Value};
  11. use sloggers::types::Severity;
  12. use shuteye;
  13. //use chashmap::CHashMap;
  14. use windows::{DurationWindow, Incremental, Window};
  15. use money::{Ticker, Side, ByExchange, Exchange};
  16. use super::file_logger;
  17. use influx::{self, OwnedMeasurement, OwnedValue};
  18. pub type Nanos = u64;
  19. pub const SECOND: u64 = 1e9 as u64;
  20. pub const MINUTE: u64 = SECOND * 60;
  21. pub const HOUR: u64 = MINUTE * 60;
  22. pub const MILLISECOND: u64 = SECOND / 1000;
  23. pub const MICROSECOND: u64 = MILLISECOND / 1000;
  24. pub fn nanos(d: Duration) -> Nanos {
  25. d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64)
  26. }
  27. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  28. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  29. }
  30. pub fn now() -> i64 { dt_nanos(Utc::now()) }
  31. pub fn tfmt(ns: Nanos) -> String {
  32. let mut f = String::new();
  33. match ns {
  34. t if t <= MICROSECOND => {
  35. write!(f, "{}ns", t);
  36. }
  37. t if t > MICROSECOND && t < MILLISECOND => {
  38. write!(f, "{}u", t / MICROSECOND);
  39. }
  40. t if t > MILLISECOND && t < SECOND => {
  41. write!(f, "{}ms", t / MILLISECOND);
  42. }
  43. t => {
  44. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  45. }
  46. }
  47. f
  48. }
  49. pub fn tfmt_dur(d: Duration) -> String {
  50. tfmt(nanos(d))
  51. }
  52. pub fn tfmt_dt(dt: DateTime<Utc>) -> String {
  53. Utc::now().signed_duration_since(dt)
  54. .to_std()
  55. .map(|dur| {
  56. tfmt_dur(dur)
  57. }).unwrap_or("?".into())
  58. }
  59. pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) {
  60. match ns {
  61. t if t <= MICROSECOND => {
  62. write!(f, "{}ns", t);
  63. }
  64. t if t > MICROSECOND && t < MILLISECOND => {
  65. write!(f, "{}u", t / MICROSECOND);
  66. }
  67. t if t > MILLISECOND && t < SECOND => {
  68. write!(f, "{}ms", t / MILLISECOND);
  69. }
  70. t => {
  71. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  72. }
  73. }
  74. }
  75. #[derive(Debug)]
  76. pub enum Latency {
  77. Ws(Exchange, Ticker, Duration),
  78. Http(Exchange, Duration),
  79. Trade(Exchange, Ticker, Duration),
  80. Terminate
  81. }
  82. #[derive(Debug)]
  83. pub enum ExperiencedLatency {
  84. GdaxWebsocket(Duration),
  85. //GdaxWebsocketNoLock(Duration),
  86. GdaxHttpPublic(Duration),
  87. GdaxHttpPrivate(Duration),
  88. PlnxHttpPublic(Duration),
  89. PlnxHttpPrivate(Duration),
  90. PlnxOrderBook(Duration),
  91. ExmoHttpPublic(Duration),
  92. KrknHttpPublic(Duration),
  93. KrknHttpPrivate(Duration),
  94. KrknTrade(Duration, &'static str, Option<Ticker>, Option<Side>),
  95. EventLoop(Duration),
  96. PlnxWs(Ticker),
  97. Terminate
  98. }
  99. // impl Message for ExperiencedLatency {
  100. // fn kill_switch() -> Self {
  101. // ExperiencedLatency::Terminate
  102. // }
  103. // }
  104. /// represents over what period of time
  105. /// the latency measurements were taken
  106. pub trait MeasurementWindow {
  107. fn duration(&self) -> Duration;
  108. }
  109. #[derive(Debug, Clone, Copy)]
  110. pub struct WThirty;
  111. impl Default for WThirty {
  112. fn default() -> Self { WThirty {} }
  113. }
  114. impl MeasurementWindow for WThirty {
  115. fn duration(&self) -> Duration { Duration::from_secs(30) }
  116. }
  117. #[derive(Debug, Clone, Copy)]
  118. pub struct WTen;
  119. impl Default for WTen {
  120. fn default() -> Self { WTen {} }
  121. }
  122. impl MeasurementWindow for WTen {
  123. fn duration(&self) -> Duration { Duration::from_secs(10) }
  124. }
  125. #[derive(Debug, Clone)]
  126. pub struct Update {
  127. pub gdax_ws: Nanos,
  128. pub gdax_trade: Nanos,
  129. pub gdax_last: DateTime<Utc>
  130. }
  131. impl Default for Update {
  132. fn default() -> Self {
  133. Update {
  134. gdax_ws: 0,
  135. gdax_trade: 0,
  136. gdax_last: Utc::now(),
  137. }
  138. }
  139. }
  140. #[derive(Debug, Clone)]
  141. pub struct LatencyUpdate<W>
  142. where W: MeasurementWindow
  143. {
  144. pub gdax_ws: Nanos,
  145. pub krkn_pub: Nanos,
  146. pub krkn_priv: Nanos,
  147. pub plnx_pub: Nanos,
  148. pub plnx_priv: Nanos,
  149. pub plnx_order: Nanos,
  150. pub krkn_trade_30_mean: Nanos,
  151. pub krkn_trade_30_max: Nanos,
  152. pub krkn_trade_300_mean: Nanos,
  153. pub krkn_trade_300_max: Nanos,
  154. pub plnx_last: DateTime<Utc>,
  155. pub krkn_last: DateTime<Utc>,
  156. pub plnx_ws_count: u64,
  157. //pub event_loop: Nanos,
  158. pub size: W,
  159. }
  160. impl<W> Default for LatencyUpdate<W>
  161. where W: MeasurementWindow + Default
  162. {
  163. fn default() -> Self {
  164. LatencyUpdate {
  165. gdax_ws: Nanos::default(),
  166. krkn_pub: Nanos::default(),
  167. krkn_priv: Nanos::default(),
  168. plnx_pub: Nanos::default(),
  169. plnx_priv: Nanos::default(),
  170. plnx_order: Nanos::default(),
  171. krkn_trade_30_mean: Nanos::default(),
  172. krkn_trade_30_max: Nanos::default(),
  173. krkn_trade_300_mean: Nanos::default(),
  174. krkn_trade_300_max: Nanos::default(),
  175. plnx_ws_count: 0,
  176. plnx_last: Utc::now(),
  177. krkn_last: Utc::now(),
  178. size: W::default()
  179. }
  180. }
  181. }
  182. impl<W> Display for LatencyUpdate<W>
  183. where W: MeasurementWindow
  184. {
  185. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  186. write!(f, " gdax ws: ");
  187. tfmt_write(self.gdax_ws, f);
  188. write!(f, "\n krkn pub: ");
  189. tfmt_write(self.krkn_pub, f);
  190. write!(f, "\n krkn priv: ");
  191. tfmt_write(self.krkn_priv, f);
  192. write!(f, "\n krkn trade 30 mean: ");
  193. tfmt_write(self.krkn_trade_30_mean, f);
  194. write!(f, "\n krkn trade 30 max: ");
  195. tfmt_write(self.krkn_trade_30_max, f);
  196. write!(f, "\n krkn trade 300 mean: ");
  197. tfmt_write(self.krkn_trade_300_mean, f);
  198. write!(f, "\n krkn trade 300 max: ");
  199. tfmt_write(self.krkn_trade_300_max, f);
  200. write!(f, "\n plnx pub: ");
  201. tfmt_write(self.plnx_pub, f);
  202. write!(f, "\n plnx priv: ");
  203. tfmt_write(self.plnx_priv, f);
  204. write!(f, "\n plnx orderbook loop: ");
  205. tfmt_write(self.plnx_order, f);
  206. //write!(f, "\n gdax ws nolock: ");
  207. //tfmt_write(self.gdax_ws_nolock, f);
  208. //write!(f, "\n event loop: ");
  209. //tfmt(self.event_loop, f);
  210. write!(f,"")
  211. }
  212. }
  213. impl<W: MeasurementWindow> LatencyUpdate<W> {
  214. pub fn measurement_window(&self) -> Duration {
  215. self.size.duration()
  216. }
  217. }
  218. pub struct Manager {
  219. pub tx: Sender<Latency>,
  220. pub channel: PubSub<Update>,
  221. thread: Option<JoinHandle<()>>,
  222. }
  223. pub struct LatencyManager<W>
  224. where W: MeasurementWindow + Clone + Send + Sync
  225. {
  226. pub tx: Sender<ExperiencedLatency>,
  227. pub channel: PubSub<LatencyUpdate<W>>,
  228. thread: Option<JoinHandle<()>>,
  229. }
  230. /// returns a DateTime equal to now - `dur`
  231. ///
  232. pub fn dt_from_dur(dur: Duration) -> DateTime<Utc> {
  233. let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64);
  234. Utc::now() - old_dur
  235. }
  236. struct Last {
  237. broadcast: Instant,
  238. plnx: Instant,
  239. krkn: Instant,
  240. gdax: Instant,
  241. }
  242. impl Default for Last {
  243. fn default() -> Self {
  244. Last {
  245. broadcast: Instant::now(),
  246. plnx: Instant::now(),
  247. krkn: Instant::now(),
  248. gdax: Instant::now(),
  249. }
  250. }
  251. }
  252. impl Manager {
  253. pub fn new(window: Duration,
  254. log_path: &'static str,
  255. measurements: Sender<OwnedMeasurement>) -> Self {
  256. let (tx, rx) = channel();
  257. let tx_copy = tx.clone();
  258. let channel = PubSub::new();
  259. let channel_copy = channel.clone();
  260. let logger = file_logger(log_path, Severity::Info);
  261. info!(logger, "initializing");
  262. let mut gdax_ws = DurationWindow::new(window);
  263. let mut gdax_trade = DurationWindow::new(window);
  264. let mut last = Last::default();
  265. info!(logger, "entering loop");
  266. let mut terminate = false;
  267. let thread = Some(thread::spawn(move || {
  268. loop {
  269. let loop_time = Instant::now();
  270. rx.try_recv().map(|msg| {
  271. debug!(logger, "rcvd {:?}", msg);
  272. match msg {
  273. Latency::Ws(exch, ticker, dur) => {
  274. // shortcut
  275. gdax_ws.update(loop_time, dur);
  276. last.gdax = loop_time;
  277. }
  278. Latency::Trade(exch, ticker, dur) => {
  279. //shorcut
  280. gdax_trade.update(loop_time, dur);
  281. last.gdax = loop_time;
  282. let nanos = DurationWindow::nanos(dur);
  283. measurements.send(
  284. OwnedMeasurement::new("gdax_trade_api")
  285. .add_string_tag("ticker", ticker.to_string())
  286. .add_field("nanos", OwnedValue::Integer(nanos as i64))
  287. .set_timestamp(influx::now()));
  288. }
  289. Latency::Terminate => {
  290. crit!(logger, "rcvd Terminate order");
  291. terminate = true;
  292. }
  293. _ => {}
  294. }
  295. });
  296. if loop_time - last.broadcast > Duration::from_millis(100) {
  297. debug!(logger, "initalizing broadcast");
  298. let update = Update {
  299. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  300. gdax_trade: gdax_trade.refresh(&loop_time).mean_nanos(),
  301. gdax_last: dt_from_dur(loop_time - last.gdax)
  302. };
  303. channel.send(update);
  304. last.broadcast = loop_time;
  305. debug!(logger, "sent broadcast");
  306. } else {
  307. #[cfg(feature = "no-thrash")]
  308. shuteye::sleep(Duration::new(0, 1000));
  309. }
  310. if terminate { break }
  311. }
  312. crit!(logger, "goodbye");
  313. }));
  314. Manager {
  315. tx,
  316. channel: channel_copy,
  317. thread,
  318. }
  319. }
  320. }
  321. impl Drop for Manager {
  322. fn drop(&mut self) {
  323. self.tx.send(Latency::Terminate);
  324. if let Some(thread) = self.thread.take() {
  325. let _ = thread.join();
  326. }
  327. }
  328. }
  329. //impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> {
  330. impl LatencyManager<WTen> {
  331. pub fn new(w: WTen) -> Self {
  332. let (tx, rx) = channel();
  333. let tx_copy = tx.clone();
  334. let channel = PubSub::new();
  335. let channel_copy = channel.clone();
  336. let w = w.clone();
  337. let thread = Some(thread::spawn(move || {
  338. let logger = file_logger("var/log/latency-manager.log", Severity::Info);
  339. info!(logger, "initializing zmq");
  340. let ctx = zmq::Context::new();
  341. let socket = influx::push(&ctx).unwrap();
  342. let mut buf = String::with_capacity(4096);
  343. info!(logger, "initializing DurationWindows");
  344. let mut gdax_ws = DurationWindow::new(w.duration());
  345. let mut gdax_priv = DurationWindow::new(w.duration());
  346. let mut krkn_pub = DurationWindow::new(w.duration());
  347. let mut krkn_priv = DurationWindow::new(w.duration());
  348. let mut plnx_pub = DurationWindow::new(w.duration());
  349. let mut plnx_priv = DurationWindow::new(w.duration());
  350. let mut plnx_order = DurationWindow::new(w.duration());
  351. let mut plnx_ws_count: Window<u32> = Window::new(w.duration());
  352. // yes I am intentionally breaking from the hard-typed duration
  353. // window ... that was a stupid idea
  354. //
  355. let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30));
  356. let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300));
  357. //let mut gdax_ws_nolock = DurationWindow::new(w.duration());
  358. //let mut event_loop = DurationWindow::new(w.duration());
  359. let mut last = Last::default();
  360. thread::sleep_ms(1);
  361. info!(logger, "entering loop");
  362. loop {
  363. let loop_time = Instant::now();
  364. if let Ok(msg) = rx.recv() {
  365. debug!(logger, "new msg: {:?}", msg);
  366. match msg {
  367. ExperiencedLatency::Terminate => {
  368. crit!(logger, "terminating");
  369. break;
  370. }
  371. ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d),
  372. //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(loop_time, d),
  373. ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d),
  374. ExperiencedLatency::KrknHttpPublic(d) => {
  375. last.krkn = loop_time;
  376. krkn_pub.update(loop_time, d)
  377. }
  378. ExperiencedLatency::KrknHttpPrivate(d) => {
  379. last.krkn = loop_time;
  380. krkn_priv.update(loop_time, d)
  381. }
  382. ExperiencedLatency::PlnxHttpPublic(d) => {
  383. last.plnx = loop_time;
  384. plnx_pub.update(loop_time, d)
  385. }
  386. ExperiencedLatency::PlnxHttpPrivate(d) => {
  387. last.plnx = loop_time;
  388. plnx_priv.update(loop_time, d)
  389. }
  390. ExperiencedLatency::PlnxOrderBook(d) => {
  391. last.plnx = loop_time;
  392. plnx_order.update(loop_time, d)
  393. }
  394. ExperiencedLatency::PlnxWs(_) => {
  395. last.plnx = loop_time;
  396. plnx_ws_count.update(loop_time, 1_u32);
  397. }
  398. ExperiencedLatency::KrknTrade(d, cmd, ticker, side) => {
  399. debug!(logger, "new KrknTrade";
  400. "cmd" => cmd);
  401. last.krkn = loop_time;
  402. let n = DurationWindow::nanos(d);
  403. krkn_trade_30.update(loop_time, d);
  404. krkn_trade_300.update(loop_time, d);
  405. let ticker_s = ticker.map(|t| t.to_string()).unwrap_or("".into());
  406. let side_s = side.map(|s| s.to_string()).unwrap_or("".into());
  407. let mut m = Measurement::new("krkn_trade_api");
  408. m.add_field("nanos", Value::Integer(n as i64));
  409. m.add_tag("cmd", cmd);
  410. if ticker.is_some() {
  411. m.add_tag("ticker", &ticker_s);
  412. }
  413. if side.is_some() {
  414. m.add_tag("side", &side_s);
  415. }
  416. m.set_timestamp(now());
  417. influx::serialize(&m, &mut buf);
  418. socket.send_str(&buf, 0);
  419. buf.clear();
  420. }
  421. //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d),
  422. other => {
  423. warn!(logger, "unexpected msg: {:?}", other);
  424. }
  425. }
  426. }
  427. if loop_time - last.broadcast > Duration::from_millis(100) {
  428. debug!(logger, "initalizing broadcast");
  429. // note - because we mutated the Window instances
  430. // above, we need a fresh Instant to avoid less than other
  431. // panic
  432. //
  433. krkn_trade_30.refresh(&loop_time);
  434. krkn_trade_300.refresh(&loop_time);
  435. let update = LatencyUpdate {
  436. gdax_ws: gdax_ws.refresh(&loop_time).mean_nanos(),
  437. //gdax_ws_nolock: gdax_ws_nolock.refresh(&loop_time).mean_nanos(),
  438. krkn_pub: krkn_pub.refresh(&loop_time).mean_nanos(),
  439. krkn_priv: krkn_priv.refresh(&loop_time).mean_nanos(),
  440. plnx_pub: plnx_pub.refresh(&loop_time).mean_nanos(),
  441. plnx_priv: plnx_priv.refresh(&loop_time).mean_nanos(),
  442. plnx_order: plnx_order.refresh(&loop_time).mean_nanos(),
  443. krkn_trade_30_mean: krkn_trade_30.mean_nanos(),
  444. krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0),
  445. krkn_trade_300_mean: krkn_trade_300.mean_nanos(),
  446. krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0),
  447. plnx_last: dt_from_dur(loop_time - last.plnx),
  448. krkn_last: dt_from_dur(loop_time - last.krkn),
  449. plnx_ws_count: plnx_ws_count.refresh(&loop_time).count() as u64,
  450. //event_loop: event_loop.refresh(&now).mean_nanos(),
  451. size: w.clone(),
  452. };
  453. channel.send(update);
  454. last.broadcast = loop_time;
  455. debug!(logger, "sent broadcast");
  456. }
  457. }
  458. crit!(logger, "goodbye");
  459. }));
  460. LatencyManager {
  461. tx: tx_copy,
  462. channel: channel_copy,
  463. thread
  464. }
  465. }
  466. }