You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
5.8KB

  1. use std::sync::mpsc::{Sender, Receiver, channel, SendError};
  2. use std::sync::Arc;
  3. use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
  4. use std::path::PathBuf;
  5. use std::thread::{self, JoinHandle};
  6. use std::io::{self, Write};
  7. use std::{mem, fs, env};
  8. use chrono::{DateTime, Utc, TimeZone};
  9. use hdrsample::{Histogram, Counter};
  10. use hdrsample::serialization::{Serializer, V2DeflateSerializer, V2Serializer};
  11. use hdrsample::serialization::interval_log::{IntervalLogWriterBuilder, Tag};
  12. type C = u64;
  13. pub fn nanos(d: Duration) -> u64 {
  14. d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)
  15. }
  16. pub struct HistLog {
  17. series: &'static str,
  18. tag: &'static str,
  19. freq: Duration,
  20. last_sent: Instant,
  21. tx: Sender<Option<Entry>>,
  22. hist: Histogram<C>,
  23. thread: Option<Arc<thread::JoinHandle<()>>>,
  24. }
  25. pub struct Entry {
  26. pub tag: &'static str,
  27. pub start: SystemTime,
  28. pub end: SystemTime,
  29. pub hist: Histogram<C>,
  30. }
  31. impl HistLog {
  32. pub fn new(series: &'static str, tag: &'static str, freq: Duration) -> Self {
  33. let (tx, rx) = channel();
  34. let mut dir = env::home_dir().unwrap();
  35. dir.push("src/market-maker/var/hist");
  36. fs::create_dir_all(&dir).unwrap();
  37. let thread = Some(Arc::new(Self::scribe(series, rx, dir)));
  38. let last_sent = Instant::now();
  39. let hist = Histogram::new(3).unwrap();
  40. Self { series, tag, freq, last_sent, tx, hist, thread }
  41. }
  42. pub fn clone_with_tag(&self, tag: &'static str) -> HistLog {
  43. let thread = self.thread.as_ref().map(|x| Arc::clone(x)).unwrap();
  44. assert!(self.thread.is_some(), "self.thread is {:?}", self.thread);
  45. let tx = self.tx.clone();
  46. Self {
  47. series: self.series,
  48. tag,
  49. freq: self.freq,
  50. last_sent: Instant::now(),
  51. tx,
  52. hist: self.hist.clone(),
  53. thread: Some(thread),
  54. }
  55. }
  56. pub fn record(&mut self, value: u64) {
  57. let _ = self.hist.record(value);
  58. }
  59. /// If for some reason there was a pause in between using the struct,
  60. /// this resets the internal state of both the values recorded to the
  61. /// `Histogram` and the value of when it last sent a `Histogram` onto
  62. /// the writing thread.
  63. ///
  64. pub fn reset(&mut self) {
  65. self.hist.clear();
  66. self.last_sent = Instant::now();
  67. }
  68. fn send(&mut self, loop_time: Instant) {
  69. let end = SystemTime::now();
  70. let start = end - (loop_time - self.last_sent);
  71. assert!(end > start, "end <= start!");
  72. let mut next = Histogram::new_from(&self.hist);
  73. mem::swap(&mut self.hist, &mut next);
  74. self.tx.send(Some(Entry { tag: self.tag, start, end, hist: next })).expect("sending entry failed");
  75. self.last_sent = loop_time;
  76. }
  77. pub fn check_send(&mut self, loop_time: Instant) {
  78. //let since = loop_time - self.last_sent;
  79. if loop_time > self.last_sent && loop_time - self.last_sent >= self.freq {
  80. self.send(loop_time);
  81. }
  82. }
  83. fn scribe(
  84. series : &'static str,
  85. rx : Receiver<Option<Entry>>,
  86. dir : PathBuf,
  87. ) -> JoinHandle<()> {
  88. let mut ser = V2DeflateSerializer::new();
  89. let start_time = SystemTime::now();
  90. let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
  91. let path = dir.join(&format!("{}-interval-log-{}.v2z", series, seconds));
  92. let file = fs::File::create(&path).unwrap();
  93. thread::Builder::new().name(format!("HistLog::scribe::{}", series)).spawn(move || {
  94. let mut buf = io::LineWriter::new(file);
  95. let mut wtr =
  96. IntervalLogWriterBuilder::new()
  97. .with_base_time(UNIX_EPOCH)
  98. .with_start_time(start_time)
  99. .begin_log_with(&mut buf, &mut ser)
  100. .unwrap();
  101. loop {
  102. match rx.try_recv() { //.recv_timeout(Duration::from_millis(1)) {
  103. //match rx.recv_timeout(Duration::new(1, 0)) {
  104. Ok(Some(Entry { tag, start, end, hist })) => {
  105. wtr.write_histogram(&hist, start.duration_since(UNIX_EPOCH).unwrap(),
  106. end.duration_since(start).unwrap(), Tag::new(tag))
  107. .ok();
  108. //.map_err(|e| { println!("{:?}", e); e }).ok();
  109. }
  110. // `None` used as terminate signal from `Drop`
  111. Ok(None) => break,
  112. _ => {
  113. thread::sleep(Duration::new(0, 0));
  114. }
  115. }
  116. }
  117. }).unwrap()
  118. }
  119. }
  120. impl Drop for HistLog {
  121. fn drop(&mut self) {
  122. if !self.hist.is_empty() { self.send(Instant::now()) }
  123. if let Some(arc) = self.thread.take() {
  124. //println!("in Drop, strong count is {}", Arc::strong_count(&arc));
  125. if let Ok(thread) = Arc::try_unwrap(arc) {
  126. let _ = self.tx.send(None);
  127. thread::sleep(Duration::from_millis(2));
  128. let _ = thread.join();
  129. }
  130. }
  131. }
  132. }
  133. // pub fn save_hist<T: Counter>(thread: &'static str, ticker: Ticker, hist: Histogram<T>) {
  134. // env::home_dir().and_then(|mut path| {
  135. // path.push(&format!("src/market-maker/var/hist/{}/", ticker.to_str()));
  136. // let _ = fs::create_dir_all(&path);
  137. // path.push(&format!("mm-v{}-{}-{}-1h-{}.v2", crate_version!(), thread, ticker.to_string(), Utc::now().to_rfc3339()));
  138. // fs::File::create(&path).ok()
  139. // }).map(|mut file| {
  140. // let mut ser = V2DeflateSerializer::new();
  141. // ser.serialize(&hist, &mut file)
  142. // .map_err(|e| {
  143. // let _ = write!(&mut file, "\n\n{:?}", e);
  144. // e
  145. // }).ok();
  146. // });
  147. // }