You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

322 lines
9.8KB

  1. use std::thread::{self, JoinHandle};
  2. use std::sync::{Arc, Mutex, RwLock};
  3. use std::sync::mpsc::{self, Sender, Receiver, channel};
  4. use std::collections::VecDeque;
  5. use std::fmt::{self, Display, Error as FmtError, Formatter, Write};
  6. use std::time::{Instant, Duration};
  7. use chrono::{DateTime, Utc, TimeZone};
  8. use pub_sub::PubSub;
  9. use zmq;
  10. use influent::measurement::{Measurement, Value};
  11. use windows::{DurationWindow, Incremental};
  12. use influx;
  13. pub type Nanos = u64;
  14. pub const SECOND: u64 = 1e9 as u64;
  15. pub const MINUTE: u64 = SECOND * 60;
  16. pub const HOUR: u64 = MINUTE * 60;
  17. pub const MILLISECOND: u64 = SECOND / 1000;
  18. pub const MICROSECOND: u64 = MILLISECOND / 1000;
  19. pub fn nanos(d: Duration) -> Nanos {
  20. d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64)
  21. }
  22. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  23. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  24. }
  25. pub fn now() -> i64 { dt_nanos(Utc::now()) }
  26. pub fn tfmt(ns: Nanos) -> String {
  27. let mut f = String::new();
  28. match ns {
  29. t if t <= MICROSECOND => {
  30. write!(f, "{}ns", t);
  31. }
  32. t if t > MICROSECOND && t < MILLISECOND => {
  33. write!(f, "{}u", t / MICROSECOND);
  34. }
  35. t if t > MILLISECOND && t < SECOND => {
  36. write!(f, "{}ms", t / MILLISECOND);
  37. }
  38. t => {
  39. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  40. }
  41. }
  42. f
  43. }
  44. pub fn tfmt_dur(d: Duration) -> String {
  45. tfmt(nanos(d))
  46. }
  47. pub fn tfmt_write(ns: Nanos, f: &mut Formatter) {
  48. match ns {
  49. t if t <= MICROSECOND => {
  50. write!(f, "{}ns", t);
  51. }
  52. t if t > MICROSECOND && t < MILLISECOND => {
  53. write!(f, "{}u", t / MICROSECOND);
  54. }
  55. t if t > MILLISECOND && t < SECOND => {
  56. write!(f, "{}ms", t / MILLISECOND);
  57. }
  58. t => {
  59. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND);
  60. }
  61. }
  62. }
  63. #[derive(Debug)]
  64. pub enum ExperiencedLatency {
  65. GdaxWebsocket(Duration),
  66. //GdaxWebsocketNoLock(Duration),
  67. GdaxHttpPublic(Duration),
  68. GdaxHttpPrivate(Duration),
  69. PlnxHttpPublic(Duration),
  70. PlnxHttpPrivate(Duration),
  71. PlnxOrderBook(Duration),
  72. ExmoHttpPublic(Duration),
  73. KrknHttpPublic(Duration),
  74. KrknHttpPrivate(Duration),
  75. KrknTrade(Duration),
  76. EventLoop(Duration),
  77. Terminate
  78. }
  79. // impl Message for ExperiencedLatency {
  80. // fn kill_switch() -> Self {
  81. // ExperiencedLatency::Terminate
  82. // }
  83. // }
  84. /// represents over what period of time
  85. /// the latency measurements were taken
  86. pub trait MeasurementWindow {
  87. fn duration(&self) -> Duration;
  88. }
  89. #[derive(Debug, Clone, Copy)]
  90. pub struct WThirty;
  91. impl Default for WThirty {
  92. fn default() -> Self { WThirty {} }
  93. }
  94. impl MeasurementWindow for WThirty {
  95. fn duration(&self) -> Duration { Duration::from_secs(30) }
  96. }
  97. #[derive(Debug, Clone, Copy)]
  98. pub struct WTen;
  99. impl Default for WTen {
  100. fn default() -> Self { WTen {} }
  101. }
  102. impl MeasurementWindow for WTen {
  103. fn duration(&self) -> Duration { Duration::from_secs(10) }
  104. }
  105. #[derive(Debug, Clone, Default)]
  106. pub struct LatencyUpdate<W>
  107. where W: MeasurementWindow
  108. {
  109. pub gdax_ws: Nanos,
  110. //pub gdax_ws_nolock: Nanos,
  111. pub krkn_pub: Nanos,
  112. pub krkn_priv: Nanos,
  113. pub plnx_pub: Nanos,
  114. pub plnx_priv: Nanos,
  115. pub plnx_order: Nanos,
  116. pub krkn_trade_30_mean: Nanos,
  117. pub krkn_trade_30_max: Nanos,
  118. pub krkn_trade_300_mean: Nanos,
  119. pub krkn_trade_300_max: Nanos,
  120. //pub event_loop: Nanos,
  121. pub size: W,
  122. }
  123. impl<W> Display for LatencyUpdate<W>
  124. where W: MeasurementWindow
  125. {
  126. fn fmt(&self, f: &mut Formatter) -> fmt::Result {
  127. write!(f, " Latency\n gdax ws: ");
  128. tfmt_write(self.gdax_ws, f);
  129. write!(f, "\n krkn pub: ");
  130. tfmt_write(self.krkn_pub, f);
  131. write!(f, "\n krkn priv: ");
  132. tfmt_write(self.krkn_priv, f);
  133. write!(f, "\n krkn trade 30 mean: ");
  134. tfmt_write(self.krkn_trade_30_mean, f);
  135. write!(f, "\n krkn trade 30 max: ");
  136. tfmt_write(self.krkn_trade_30_max, f);
  137. write!(f, "\n krkn trade 300 mean: ");
  138. tfmt_write(self.krkn_trade_300_mean, f);
  139. write!(f, "\n krkn trade 300 max: ");
  140. tfmt_write(self.krkn_trade_300_max, f);
  141. write!(f, "\n plnx pub: ");
  142. tfmt_write(self.plnx_pub, f);
  143. write!(f, "\n plnx priv: ");
  144. tfmt_write(self.plnx_priv, f);
  145. write!(f, "\n plnx orderbook loop: ");
  146. tfmt_write(self.plnx_order, f);
  147. //write!(f, "\n gdax ws nolock: ");
  148. //tfmt_write(self.gdax_ws_nolock, f);
  149. //write!(f, "\n event loop: ");
  150. //tfmt(self.event_loop, f);
  151. write!(f,"")
  152. }
  153. }
  154. impl<W: MeasurementWindow> LatencyUpdate<W> {
  155. pub fn measurement_window(&self) -> Duration {
  156. self.size.duration()
  157. }
  158. }
  159. pub struct LatencyManager<W>
  160. where W: MeasurementWindow + Clone + Send + Sync
  161. {
  162. pub tx: Sender<ExperiencedLatency>,
  163. pub channel: PubSub<LatencyUpdate<W>>,
  164. thread: Option<JoinHandle<()>>,
  165. }
  166. //impl<W: MeasurementWindow + Clone + Send + Sync> LatencyManager<W> {
  167. impl LatencyManager<WTen> {
  168. pub fn new(w: WTen) -> Self {
  169. let (tx, rx) = channel();
  170. let tx_copy = tx.clone();
  171. let channel = PubSub::new();
  172. let channel_copy = channel.clone();
  173. let w = w.clone();
  174. let thread = Some(thread::spawn(move || {
  175. let ctx = zmq::Context::new();
  176. let socket = influx::push(&ctx).unwrap();
  177. let mut buf = String::with_capacity(4096);
  178. let w = w.clone();
  179. let mut gdax_ws = DurationWindow::new(w.duration());
  180. let mut gdax_priv = DurationWindow::new(w.duration());
  181. let mut krkn_pub = DurationWindow::new(w.duration());
  182. let mut krkn_priv = DurationWindow::new(w.duration());
  183. let mut plnx_pub = DurationWindow::new(w.duration());
  184. let mut plnx_priv = DurationWindow::new(w.duration());
  185. let mut plnx_order = DurationWindow::new(w.duration());
  186. /// yes I am intentionally breaking from the hard-typed duration
  187. /// window ... that was a stupid idea
  188. ///
  189. let mut krkn_trade_30 = DurationWindow::new(Duration::from_secs(30));
  190. let mut krkn_trade_300 = DurationWindow::new(Duration::from_secs(300));
  191. //let mut gdax_ws_nolock = DurationWindow::new(w.duration());
  192. //let mut event_loop = DurationWindow::new(w.duration());
  193. let mut last_broadcast = Instant::now();
  194. loop {
  195. if let Ok(msg) = rx.recv() {
  196. match msg {
  197. ExperiencedLatency::Terminate => {
  198. println!("latency manager terminating");
  199. break;
  200. }
  201. ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(Instant::now(), d),
  202. //ExperiencedLatency::GdaxWebsocketNoLock(d) => gdax_ws_nolock.update(Instant::now(), d),
  203. ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(Instant::now(), d),
  204. ExperiencedLatency::KrknHttpPublic(d) => krkn_pub.update(Instant::now(), d),
  205. ExperiencedLatency::KrknHttpPrivate(d) => krkn_priv.update(Instant::now(), d),
  206. ExperiencedLatency::PlnxHttpPublic(d) => plnx_pub.update(Instant::now(), d),
  207. ExperiencedLatency::PlnxHttpPrivate(d) => plnx_priv.update(Instant::now(), d),
  208. ExperiencedLatency::PlnxOrderBook(d) => plnx_order.update(Instant::now(), d),
  209. ExperiencedLatency::KrknTrade(d) => {
  210. let n = DurationWindow::nanos(d);
  211. krkn_trade_30.update(Instant::now(), d);
  212. krkn_trade_300.update(Instant::now(), d);
  213. let mut m = Measurement::new("krkn_trade_api");
  214. m.add_field("nanos", Value::Integer(n as i64));
  215. m.set_timestamp(now());
  216. influx::serialize(&m, &mut buf);
  217. socket.send_str(&buf, 0);
  218. buf.clear();
  219. }
  220. //ExperiencedLatency::EventLoop(d) => event_loop.update(Instant::now(), d),
  221. other => {}
  222. }
  223. }
  224. if Instant::now() - last_broadcast > Duration::from_millis(100) {
  225. let now = Instant::now();
  226. krkn_trade_30.refresh(&now);
  227. krkn_trade_300.refresh(&now);
  228. let update = LatencyUpdate {
  229. gdax_ws: gdax_ws.refresh(&now).mean_nanos(),
  230. //gdax_ws_nolock: gdax_ws_nolock.refresh(&now).mean_nanos(),
  231. krkn_pub: krkn_pub.refresh(&now).mean_nanos(),
  232. krkn_priv: krkn_priv.refresh(&now).mean_nanos(),
  233. plnx_pub: plnx_pub.refresh(&now).mean_nanos(),
  234. plnx_priv: plnx_priv.refresh(&now).mean_nanos(),
  235. plnx_order: plnx_order.refresh(&now).mean_nanos(),
  236. krkn_trade_30_mean: krkn_trade_30.mean_nanos(),
  237. krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0),
  238. krkn_trade_300_mean: krkn_trade_300.mean_nanos(),
  239. krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0),
  240. //event_loop: event_loop.refresh(&now).mean_nanos(),
  241. size: w.clone(),
  242. };
  243. channel.send(update);
  244. last_broadcast = now;
  245. }
  246. }
  247. }));
  248. LatencyManager {
  249. tx: tx_copy,
  250. channel: channel_copy,
  251. thread
  252. }
  253. }
  254. }