You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

790 lines
23KB

  1. use std::thread::{self, JoinHandle};
  2. use std::sync::mpsc::{Sender, channel};
  3. use std::fmt;
  4. use std::time::{Instant, Duration};
  5. use chrono::{self, DateTime, Utc};
  6. use pub_sub::PubSub;
  7. use sloggers::types::Severity;
  8. //use windows::{DurationWindow, Incremental, Window};
  9. use money::{Ticker, Side, Exchange};
  10. use super::file_logger;
  11. use influx::{self, OwnedMeasurement, OwnedValue};
  12. use self::windows::Incremental;
  13. pub type Nanos = u64;
  14. pub const SECOND: u64 = 1e9 as u64;
  15. pub const MINUTE: u64 = SECOND * 60;
  16. pub const HOUR: u64 = MINUTE * 60;
  17. pub const MILLISECOND: u64 = SECOND / 1000;
  18. pub const MICROSECOND: u64 = MILLISECOND / 1000;
  19. pub fn nanos(d: Duration) -> Nanos {
  20. d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64)
  21. }
  22. pub fn dt_nanos(t: DateTime<Utc>) -> i64 {
  23. (t.timestamp() as i64) * 1_000_000_000_i64 + (t.timestamp_subsec_nanos() as i64)
  24. }
  25. pub fn now() -> i64 { dt_nanos(Utc::now()) }
  26. pub fn tfmt(ns: Nanos) -> String {
  27. match ns {
  28. t if t <= MICROSECOND => {
  29. format!("{}ns", t)
  30. }
  31. t if t > MICROSECOND && t < MILLISECOND => {
  32. format!("{}u", t / MICROSECOND)
  33. }
  34. t if t > MILLISECOND && t < SECOND => {
  35. format!("{}ms", t / MILLISECOND)
  36. }
  37. t => {
  38. format!("{}.{}sec", t / SECOND, t / MILLISECOND)
  39. }
  40. }
  41. }
  42. pub fn tfmt_dur(d: Duration) -> String {
  43. tfmt(nanos(d))
  44. }
  45. pub fn tfmt_dt(dt: DateTime<Utc>) -> String {
  46. Utc::now().signed_duration_since(dt)
  47. .to_std()
  48. .map(|dur| {
  49. tfmt_dur(dur)
  50. }).unwrap_or("?".into())
  51. }
  52. pub fn tfmt_write(ns: Nanos, f: &mut fmt::Formatter) -> fmt::Result {
  53. match ns {
  54. t if t <= MICROSECOND => {
  55. write!(f, "{}ns", t)
  56. }
  57. t if t > MICROSECOND && t < MILLISECOND => {
  58. write!(f, "{}u", t / MICROSECOND)
  59. }
  60. t if t > MILLISECOND && t < SECOND => {
  61. write!(f, "{}ms", t / MILLISECOND)
  62. }
  63. t => {
  64. write!(f, "{}.{}sec", t / SECOND, t / MILLISECOND)
  65. }
  66. }
  67. }
  68. #[doc(hide)]
  69. mod windows {
  70. use super::*;
  71. use std::ops::{Div, Mul, Sub, SubAssign, AddAssign};
  72. use std::collections::VecDeque;
  73. use num::Float;
  74. const INITIAL_CAPACITY: usize = 1000;
  75. #[derive(Clone, Debug)]
  76. pub struct Point<T>
  77. //where T: Default
  78. {
  79. time: Instant,
  80. value: T
  81. }
  82. #[derive(Debug, Clone)]
  83. pub struct Window<T>
  84. where T: Default
  85. {
  86. pub size: Duration, // window size
  87. mean: T,
  88. ps: T,
  89. psa: T,
  90. var: T,
  91. sum: T,
  92. count: u32,
  93. items: VecDeque<Point<T>>,
  94. }
  95. #[derive(Default)]
  96. pub struct DurationWindow {
  97. pub size: Duration,
  98. mean: Duration,
  99. sum: Duration,
  100. count: u32,
  101. items: VecDeque<Point<Duration>>
  102. }
  103. impl<T> Point<T>
  104. where T: Default + Copy
  105. {
  106. fn new(time: Instant, value: T) -> Self {
  107. Point { time, value }
  108. }
  109. fn value(&self) -> T {
  110. self.value
  111. }
  112. }
  113. impl<T> Window<T>
  114. where T: Default + Zero
  115. {
  116. pub fn new(size: Duration) -> Self {
  117. Window {
  118. size,
  119. mean: T::default(),
  120. psa: T::default(),
  121. ps: T::default(),
  122. sum: T::default(),
  123. count: 0,
  124. var: T::default(),
  125. items: VecDeque::with_capacity(INITIAL_CAPACITY),
  126. }
  127. }
  128. pub fn with_size_and_capacity(size: Duration, capacity: usize) -> Self {
  129. Window {
  130. size,
  131. mean: T::default(),
  132. psa: T::default(),
  133. ps: T::default(),
  134. sum: T::default(),
  135. count: 0,
  136. var: T::default(),
  137. items: VecDeque::with_capacity(capacity),
  138. }
  139. }
  140. }
  141. impl<T> From<Duration> for Window<T>
  142. where T: Default + Zero
  143. {
  144. fn from(size: Duration) -> Self {
  145. Window::new(size)
  146. }
  147. }
  148. impl From<Duration> for DurationWindow {
  149. fn from(size: Duration) -> Self {
  150. DurationWindow::new(size)
  151. }
  152. }
  153. pub trait Incremental<T> {
  154. /// Purge expired items.
  155. ///
  156. #[inline]
  157. fn refresh(&mut self, t: Instant) -> &Self;
  158. /// Add a new item.
  159. ///
  160. #[inline]
  161. fn add(&mut self, time: Instant, value: T);
  162. /// Add a new item and purge expired items.
  163. ///
  164. #[inline]
  165. fn update(&mut self, time: Instant, value: T) {
  166. self.refresh(time);
  167. self.add(time, value);
  168. }
  169. }
  170. pub trait Zero {
  171. fn zero() -> Self;
  172. }
  173. pub trait One {
  174. fn one() -> Self;
  175. }
  176. macro_rules! zero {
  177. ($t:ty, $body:expr) => {
  178. impl Zero for $t {
  179. fn zero() -> $t { $body }
  180. }
  181. }
  182. }
  183. macro_rules! one {
  184. ($t:ty, $body:expr) => {
  185. impl One for $t {
  186. fn one() -> $t { $body }
  187. }
  188. }
  189. }
  190. zero!(f64, 0.0);
  191. zero!(f32, 0.0);
  192. zero!(u128, 0);
  193. zero!(i128, 0);
  194. zero!(u64, 0);
  195. zero!(i64, 0);
  196. zero!(i32, 0);
  197. zero!(u32, 0);
  198. zero!(u16, 0);
  199. one!(f64, 1.0);
  200. one!(f32, 1.0);
  201. one!(u128, 1);
  202. one!(i128, 1);
  203. one!(u64, 1);
  204. one!(i64, 1);
  205. one!(i32, 1);
  206. one!(u32, 1);
  207. one!(u16, 1);
  208. impl<T> Incremental<T> for Window<T>
  209. where T: Default + AddAssign<T> + SubAssign<T> + From<u32> + Div<Output = T> +
  210. Mul<Output = T> + Sub<Output = T> + Copy
  211. {
  212. #[inline]
  213. fn refresh(&mut self, t: Instant) -> &Self {
  214. if !self.items.is_empty() {
  215. let (n_remove, sum, ps, count) =
  216. self.items.iter()
  217. .take_while(|x| t - x.time > self.size)
  218. .fold((0, self.sum, self.ps, self.count), |(n_remove, sum, ps, count), x| {
  219. (n_remove + 1, sum - x.value, ps - x.value * x.value, count - 1)
  220. });
  221. self.sum = sum;
  222. self.ps = ps;
  223. self.count = count;
  224. for _ in 0..n_remove {
  225. self.items.pop_front();
  226. }
  227. }
  228. if self.count > 0 {
  229. self.mean = self.sum / self.count.into();
  230. self.psa = self.ps / self.count.into();
  231. let c: T = self.count.into();
  232. self.var = (self.psa * c - c * self.mean * self.mean) / c;
  233. }
  234. self
  235. }
  236. /// Creates `Point { time, value }` and pushes to `self.items`.
  237. ///
  238. #[inline]
  239. fn add(&mut self, time: Instant, value: T) {
  240. let p = Point::new(time, value);
  241. self.sum += p.value;
  242. self.ps += p.value * p.value;
  243. self.count += 1;
  244. self.items.push_back(p);
  245. }
  246. #[inline]
  247. fn update(&mut self, time: Instant, value: T) {
  248. self.add(time, value);
  249. self.refresh(time);
  250. }
  251. }
  252. impl Incremental<Duration> for DurationWindow {
  253. #[inline]
  254. fn refresh(&mut self, t: Instant) -> &Self {
  255. if !self.items.is_empty() {
  256. let (n_remove, sum, count) =
  257. self.items.iter()
  258. .take_while(|x| t - x.time > self.size)
  259. .fold((0, self.sum, self.count), |(n_remove, sum, count), x| {
  260. (n_remove + 1, sum - x.value, count - 1)
  261. });
  262. self.sum = sum;
  263. self.count = count;
  264. for _ in 0..n_remove {
  265. self.items.pop_front();
  266. }
  267. }
  268. if self.count > 0 {
  269. self.mean = self.sum / self.count.into();
  270. }
  271. self
  272. }
  273. #[inline]
  274. fn add(&mut self, time: Instant, value: Duration) {
  275. let p = Point::new(time, value);
  276. self.sum += p.value;
  277. self.count += 1;
  278. self.items.push_back(p);
  279. }
  280. }
  281. impl<T> Window<T>
  282. where T: Default + Copy
  283. {
  284. pub fn mean(&self) -> T { self.mean }
  285. pub fn var(&self) -> T { self.var }
  286. pub fn psa(&self) -> T { self.psa }
  287. pub fn ps(&self) -> T { self.ps }
  288. pub fn count(&self) -> u32 { self.count }
  289. pub fn len(&self) -> usize { self.items.len() }
  290. pub fn is_empty(&self) -> bool { self.items.is_empty() }
  291. /// Returns the `Duration` between `t` and the first `Point` in `self.items`.
  292. ///
  293. /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`.
  294. ///
  295. /// # Panics
  296. ///
  297. /// This function will panic if `t` is earlier than the first `Point`'s `Instant`.
  298. ///
  299. #[inline]
  300. pub fn elapsed(&self, t: Instant) -> Duration {
  301. self.items.front()
  302. .map(|p| {
  303. t - p.time
  304. }).unwrap_or_else(|| Duration::new(0, 0))
  305. }
  306. }
  307. impl<T> Window<T>
  308. where T: Float + Default
  309. {
  310. #[inline]
  311. pub fn std(&self) -> T { self.var.sqrt() }
  312. }
  313. impl DurationWindow {
  314. pub fn new(size: Duration) -> Self { DurationWindow { size, ..Default::default() } }
  315. pub fn mean(&self) -> Duration { self.mean }
  316. pub fn count(&self) -> u32 { self.count }
  317. pub fn len(&self) -> usize { self.items.len() }
  318. pub fn is_empty(&self) -> bool { self.items.is_empty() }
  319. #[inline]
  320. pub fn nanos(d: Duration) -> u64 { d.as_secs() * 1_000_000_000 + (d.subsec_nanos() as u64) }
  321. /// Returns number of microseconds as `u32` if `d <= Duration::new(4_294, 967_295_000)`.
  322. ///
  323. /// Any duration above ~4,295 seconds as micros is larger than `u32::MAX`. 4,295 seconds
  324. /// is about 71.5 minutes.
  325. ///
  326. /// # Examples
  327. ///
  328. /// ```
  329. /// use windows::DurationWindow;
  330. /// use std::time::Duration;
  331. ///
  332. /// assert_eq!(DurationWindow::micros(Duration::new(1, 0)), Some(1_000_000));
  333. /// assert_eq!(DurationWindow::micros(Duration::new(4_295, 0)), None);
  334. /// ```
  335. ///
  336. #[inline]
  337. pub fn micros(d: Duration) -> Option<u32> {
  338. if d <= Duration::new(4_294, 967_295_000) {
  339. Some((d.as_secs() * 1_000_000) as u32 + d.subsec_nanos() / 1_000u32)
  340. } else {
  341. None
  342. }
  343. }
  344. #[inline]
  345. pub fn mean_nanos(&self) -> u64 { DurationWindow::nanos(self.mean()) }
  346. #[inline]
  347. pub fn max(&self) -> Option<Duration> {
  348. self.items.iter()
  349. .map(|p| p.value)
  350. .max()
  351. }
  352. #[inline]
  353. pub fn max_nanos(&self) -> Option<u64> {
  354. self.max()
  355. .map(|x| DurationWindow::nanos(x))
  356. }
  357. #[inline]
  358. pub fn first(&self) -> Option<Duration> {
  359. self.items
  360. .front()
  361. .map(|pt| pt.value())
  362. }
  363. /// Returns the `Duration` between `t` and the first `Point` in `self.items`.
  364. ///
  365. /// If there are no items, returns `Duration { secs: 0, nanos: 0 }`.
  366. ///
  367. /// # Panics
  368. ///
  369. /// This function will panic if `t` is earlier than the first `Point`'s `Instant`.
  370. ///
  371. #[inline]
  372. pub fn elapsed(&self, t: Instant) -> Duration {
  373. self.items.front()
  374. .map(|p| {
  375. t - p.time
  376. }).unwrap_or_else(|| Duration::new(0, 0))
  377. }
  378. }
  379. }
  380. #[derive(Debug)]
  381. pub enum Latency {
  382. Ws(Exchange, Ticker, Duration),
  383. Http(Exchange, Duration),
  384. Trade(Exchange, Ticker, Duration),
  385. Terminate
  386. }
  387. #[derive(Debug)]
  388. pub enum ExperiencedLatency {
  389. GdaxWebsocket(Duration),
  390. GdaxHttpPublic(Duration),
  391. GdaxHttpPrivate(Duration),
  392. PlnxHttpPublic(Duration),
  393. PlnxHttpPrivate(Duration),
  394. PlnxOrderBook(Duration),
  395. KrknHttpPublic(Duration),
  396. KrknHttpPrivate(Duration),
  397. KrknTrade(Duration, &'static str, Option<Ticker>, Option<Side>),
  398. PlnxWs(Ticker),
  399. Terminate
  400. }
  401. #[derive(Debug, Clone)]
  402. pub struct Update {
  403. pub gdax_ws: Nanos,
  404. pub gdax_trade: Nanos,
  405. pub gdax_last: DateTime<Utc>
  406. }
  407. impl Default for Update {
  408. fn default() -> Self {
  409. Update {
  410. gdax_ws: 0,
  411. gdax_trade: 0,
  412. gdax_last: Utc::now(),
  413. }
  414. }
  415. }
  416. #[derive(Debug, Clone)]
  417. pub struct LatencyUpdate {
  418. pub gdax_ws: Nanos,
  419. pub krkn_pub: Nanos,
  420. pub krkn_priv: Nanos,
  421. pub plnx_pub: Nanos,
  422. pub plnx_priv: Nanos,
  423. pub plnx_order: Nanos,
  424. pub krkn_trade_30_mean: Nanos,
  425. pub krkn_trade_30_max: Nanos,
  426. pub krkn_trade_300_mean: Nanos,
  427. pub krkn_trade_300_max: Nanos,
  428. pub plnx_last: DateTime<Utc>,
  429. pub krkn_last: DateTime<Utc>,
  430. pub plnx_ws_count: u64,
  431. }
  432. impl Default for LatencyUpdate {
  433. fn default() -> Self {
  434. LatencyUpdate {
  435. gdax_ws : 0,
  436. krkn_pub : 0,
  437. krkn_priv : 0,
  438. plnx_pub : 0,
  439. plnx_priv : 0,
  440. plnx_order : 0,
  441. krkn_trade_30_mean : 0,
  442. krkn_trade_30_max : 0,
  443. krkn_trade_300_mean : 0,
  444. krkn_trade_300_max : 0,
  445. plnx_ws_count : 0,
  446. plnx_last : Utc::now(),
  447. krkn_last : Utc::now(),
  448. }
  449. }
  450. }
  451. pub struct Manager {
  452. pub tx: Sender<Latency>,
  453. pub channel: PubSub<Update>,
  454. thread: Option<JoinHandle<()>>,
  455. }
  456. pub struct LatencyManager {
  457. pub tx: Sender<ExperiencedLatency>,
  458. pub channel: PubSub<LatencyUpdate>,
  459. thread: Option<JoinHandle<()>>,
  460. }
  461. /// returns a DateTime equal to now - `dur`
  462. ///
  463. pub fn dt_from_dur(dur: Duration) -> DateTime<Utc> {
  464. let old_dur = chrono::Duration::nanoseconds(nanos(dur) as i64);
  465. Utc::now() - old_dur
  466. }
  467. struct Last {
  468. broadcast: Instant,
  469. plnx: Instant,
  470. krkn: Instant,
  471. gdax: Instant,
  472. }
  473. impl Default for Last {
  474. fn default() -> Self {
  475. Last {
  476. broadcast: Instant::now(),
  477. plnx: Instant::now(),
  478. krkn: Instant::now(),
  479. gdax: Instant::now(),
  480. }
  481. }
  482. }
  483. impl Manager {
  484. pub fn new(window: Duration,
  485. log_path: &'static str,
  486. measurements: Sender<OwnedMeasurement>) -> Self {
  487. let (tx, rx) = channel();
  488. let channel = PubSub::new();
  489. let channel_copy = channel.clone();
  490. let logger = file_logger(log_path, Severity::Info);
  491. info!(logger, "initializing");
  492. let mut gdax_ws = windows::DurationWindow::new(window);
  493. let mut gdax_trade = windows::DurationWindow::new(window);
  494. let mut last = Last::default();
  495. info!(logger, "entering loop");
  496. let thread = Some(thread::spawn(move || {
  497. loop {
  498. let loop_time = Instant::now();
  499. if let Ok(msg) = rx.recv_timeout(Duration::from_millis(1)) {
  500. debug!(logger, "rcvd {:?}", msg);
  501. match msg {
  502. Latency::Ws(_, _, dur) => {
  503. gdax_ws.update(loop_time, dur);
  504. last.gdax = loop_time;
  505. }
  506. Latency::Trade(_, ticker, dur) => {
  507. gdax_trade.update(loop_time, dur);
  508. last.gdax = loop_time;
  509. let nanos = windows::DurationWindow::nanos(dur);
  510. measurements.send(
  511. OwnedMeasurement::new("gdax_trade_api")
  512. .add_tag("ticker", ticker.as_str())
  513. .add_field("nanos", OwnedValue::Integer(nanos as i64))
  514. .set_timestamp(influx::now())).unwrap();
  515. }
  516. Latency::Terminate => break,
  517. _ => {}
  518. }
  519. }
  520. if loop_time - last.broadcast > Duration::from_millis(100) {
  521. debug!(logger, "initalizing broadcast");
  522. let update = Update {
  523. gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(),
  524. gdax_trade: gdax_trade.refresh(loop_time).mean_nanos(),
  525. gdax_last: dt_from_dur(loop_time - last.gdax)
  526. };
  527. channel.send(update).unwrap();
  528. last.broadcast = loop_time;
  529. debug!(logger, "sent broadcast");
  530. }
  531. }
  532. debug!(logger, "latency manager terminating");
  533. }));
  534. Manager {
  535. tx,
  536. channel: channel_copy,
  537. thread,
  538. }
  539. }
  540. }
  541. impl Drop for LatencyManager {
  542. fn drop(&mut self) {
  543. for _ in 0..100 { self.tx.send(ExperiencedLatency::Terminate).unwrap(); }
  544. if let Some(thread) = self.thread.take() {
  545. let _ = thread.join();
  546. }
  547. }
  548. }
  549. impl Drop for Manager {
  550. fn drop(&mut self) {
  551. for _ in 0..100 { self.tx.send(Latency::Terminate).unwrap(); }
  552. if let Some(thread) = self.thread.take() {
  553. let _ = thread.join();
  554. }
  555. }
  556. }
  557. impl LatencyManager {
  558. pub fn new(d: Duration) -> Self {
  559. let (tx, rx) = channel();
  560. let tx_copy = tx.clone();
  561. let channel = PubSub::new();
  562. let channel_copy = channel.clone();
  563. //let w = w.clone();
  564. let thread = Some(thread::spawn(move || {
  565. let logger = file_logger("var/log/latency-manager.log", Severity::Info);
  566. info!(logger, "initializing zmq");
  567. info!(logger, "initializing DurationWindows");
  568. let mut gdax_ws = windows::DurationWindow::new(d);
  569. let mut gdax_priv = windows::DurationWindow::new(d);
  570. let mut krkn_pub = windows::DurationWindow::new(d);
  571. let mut krkn_priv = windows::DurationWindow::new(d);
  572. let mut plnx_pub = windows::DurationWindow::new(d);
  573. let mut plnx_priv = windows::DurationWindow::new(d);
  574. let mut plnx_order = windows::DurationWindow::new(d);
  575. let mut plnx_ws_count: windows::Window<u32> = windows::Window::new(d);
  576. // yes I am intentionally breaking from the hard-typed duration
  577. // window ... that was a stupid idea
  578. //
  579. let mut krkn_trade_30 = windows::DurationWindow::new(Duration::from_secs(30));
  580. let mut krkn_trade_300 = windows::DurationWindow::new(Duration::from_secs(300));
  581. let mut last = Last::default();
  582. thread::sleep(Duration::from_millis(1));
  583. info!(logger, "entering loop");
  584. loop {
  585. let loop_time = Instant::now();
  586. if let Ok(msg) = rx.recv() {
  587. debug!(logger, "new msg: {:?}", msg);
  588. match msg {
  589. ExperiencedLatency::Terminate => {
  590. crit!(logger, "terminating");
  591. break;
  592. }
  593. ExperiencedLatency::GdaxWebsocket(d) => gdax_ws.update(loop_time, d),
  594. ExperiencedLatency::GdaxHttpPrivate(d) => gdax_priv.update(loop_time, d),
  595. ExperiencedLatency::KrknHttpPublic(d) => {
  596. last.krkn = loop_time;
  597. krkn_pub.update(loop_time, d)
  598. }
  599. ExperiencedLatency::KrknHttpPrivate(d) => {
  600. last.krkn = loop_time;
  601. krkn_priv.update(loop_time, d)
  602. }
  603. ExperiencedLatency::PlnxHttpPublic(d) => {
  604. last.plnx = loop_time;
  605. plnx_pub.update(loop_time, d)
  606. }
  607. ExperiencedLatency::PlnxHttpPrivate(d) => {
  608. last.plnx = loop_time;
  609. plnx_priv.update(loop_time, d)
  610. }
  611. ExperiencedLatency::PlnxOrderBook(d) => {
  612. last.plnx = loop_time;
  613. plnx_order.update(loop_time, d)
  614. }
  615. ExperiencedLatency::PlnxWs(_) => {
  616. last.plnx = loop_time;
  617. plnx_ws_count.update(loop_time, 1_u32);
  618. }
  619. ExperiencedLatency::KrknTrade(d, cmd, _, _) => {
  620. debug!(logger, "new KrknTrade";
  621. "cmd" => cmd);
  622. last.krkn = loop_time;
  623. krkn_trade_30.update(loop_time, d);
  624. krkn_trade_300.update(loop_time, d);
  625. }
  626. other => {
  627. warn!(logger, "unexpected msg: {:?}", other);
  628. }
  629. }
  630. }
  631. if loop_time - last.broadcast > Duration::from_millis(100) {
  632. debug!(logger, "initalizing broadcast");
  633. // note - because we mutated the Window instances
  634. // above, we need a fresh Instant to avoid less than other
  635. // panic
  636. //
  637. krkn_trade_30.refresh(loop_time);
  638. krkn_trade_300.refresh(loop_time);
  639. let update = LatencyUpdate {
  640. gdax_ws: gdax_ws.refresh(loop_time).mean_nanos(),
  641. krkn_pub: krkn_pub.refresh(loop_time).mean_nanos(),
  642. krkn_priv: krkn_priv.refresh(loop_time).mean_nanos(),
  643. plnx_pub: plnx_pub.refresh(loop_time).mean_nanos(),
  644. plnx_priv: plnx_priv.refresh(loop_time).mean_nanos(),
  645. plnx_order: plnx_order.refresh(loop_time).mean_nanos(),
  646. krkn_trade_30_mean: krkn_trade_30.mean_nanos(),
  647. krkn_trade_30_max: krkn_trade_30.max_nanos().unwrap_or(0),
  648. krkn_trade_300_mean: krkn_trade_300.mean_nanos(),
  649. krkn_trade_300_max: krkn_trade_300.max_nanos().unwrap_or(0),
  650. plnx_last: dt_from_dur(loop_time - last.plnx),
  651. krkn_last: dt_from_dur(loop_time - last.krkn),
  652. plnx_ws_count: plnx_ws_count.refresh(loop_time).count() as u64,
  653. };
  654. channel.send(update).unwrap();
  655. last.broadcast = loop_time;
  656. debug!(logger, "sent broadcast");
  657. }
  658. }
  659. crit!(logger, "goodbye");
  660. }));
  661. LatencyManager {
  662. tx: tx_copy,
  663. channel: channel_copy,
  664. thread
  665. }
  666. }
  667. }