You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
5.7KB

  1. use std::sync::mpsc::{Sender, Receiver, channel};
  2. use std::sync::Arc;
  3. use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
  4. use std::path::PathBuf;
  5. use std::thread::{self, JoinHandle};
  6. use std::io;
  7. use std::{mem, fs, env};
  8. use hdrhistogram::{Histogram};
  9. use hdrhistogram::serialization::V2DeflateSerializer;
  10. use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder, Tag};
  11. pub type C = u64;
  12. pub fn nanos(d: Duration) -> u64 {
  13. d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)
  14. }
  15. pub struct HistLog {
  16. series: &'static str,
  17. tag: &'static str,
  18. freq: Duration,
  19. last_sent: Instant,
  20. tx: Sender<Option<Entry>>,
  21. hist: Histogram<C>,
  22. thread: Option<Arc<thread::JoinHandle<()>>>,
  23. }
  24. pub struct Entry {
  25. pub tag: &'static str,
  26. pub start: SystemTime,
  27. pub end: SystemTime,
  28. pub hist: Histogram<C>,
  29. }
  30. impl Clone for HistLog {
  31. fn clone(&self) -> Self {
  32. let thread = self.thread.as_ref().map(|x| Arc::clone(x));
  33. Self {
  34. series: self.series.clone(),
  35. tag: self.tag.clone(),
  36. freq: self.freq.clone(),
  37. last_sent: Instant::now(),
  38. tx: self.tx.clone(),
  39. hist: self.hist.clone(),
  40. thread,
  41. }
  42. }
  43. }
  44. impl HistLog {
  45. pub fn new(series: &'static str, tag: &'static str, freq: Duration) -> Self {
  46. let (tx, rx) = channel();
  47. let mut dir = env::home_dir().unwrap();
  48. dir.push("src/market-maker/var/hist");
  49. fs::create_dir_all(&dir).unwrap();
  50. let thread = Some(Arc::new(Self::scribe(series, rx, dir)));
  51. let last_sent = Instant::now();
  52. let hist = Histogram::new(3).unwrap();
  53. Self { series, tag, freq, last_sent, tx, hist, thread }
  54. }
  55. pub fn new_with_tag(&self, tag: &'static str) -> Self {
  56. Self::new(self.series, tag, self.freq)
  57. }
  58. pub fn clone_with_tag(&self, tag: &'static str) -> Self {
  59. let thread = self.thread.as_ref().map(|x| Arc::clone(x)).unwrap();
  60. assert!(self.thread.is_some(), "self.thread is {:?}", self.thread);
  61. let tx = self.tx.clone();
  62. Self {
  63. series: self.series,
  64. tag,
  65. freq: self.freq,
  66. last_sent: Instant::now(),
  67. tx,
  68. hist: self.hist.clone(),
  69. thread: Some(thread),
  70. }
  71. }
  72. pub fn clone_with_tag_and_freq(&self, tag: &'static str, freq: Duration) -> HistLog {
  73. let mut clone = self.clone_with_tag(tag);
  74. clone.freq = freq;
  75. clone
  76. }
  77. pub fn record(&mut self, value: u64) {
  78. let _ = self.hist.record(value);
  79. }
  80. /// If for some reason there was a pause in between using the struct,
  81. /// this resets the internal state of both the values recorded to the
  82. /// `Histogram` and the value of when it last sent a `Histogram` onto
  83. /// the writing thread.
  84. ///
  85. pub fn reset(&mut self) {
  86. self.hist.clear();
  87. self.last_sent = Instant::now();
  88. }
  89. fn send(&mut self, loop_time: Instant) {
  90. let end = SystemTime::now();
  91. let start = end - (loop_time - self.last_sent);
  92. assert!(end > start, "end <= start!");
  93. let mut next = Histogram::new_from(&self.hist);
  94. mem::swap(&mut self.hist, &mut next);
  95. self.tx.send(Some(Entry { tag: self.tag, start, end, hist: next })).expect("sending entry failed");
  96. self.last_sent = loop_time;
  97. }
  98. pub fn check_send(&mut self, loop_time: Instant) {
  99. //let since = loop_time - self.last_sent;
  100. if loop_time > self.last_sent && loop_time - self.last_sent >= self.freq {
  101. // send sets self.last_sent to loop_time fyi
  102. self.send(loop_time);
  103. }
  104. }
  105. fn scribe(
  106. series : &'static str,
  107. rx : Receiver<Option<Entry>>,
  108. dir : PathBuf,
  109. ) -> JoinHandle<()> {
  110. let mut ser = V2DeflateSerializer::new();
  111. let start_time = SystemTime::now();
  112. let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
  113. let path = dir.join(&format!("{}-interval-log-{}.v2z", series, seconds));
  114. let file = fs::File::create(&path).unwrap();
  115. thread::Builder::new().name(format!("mm:hist:{}", series)).spawn(move || {
  116. let mut buf = io::LineWriter::new(file);
  117. let mut wtr =
  118. IntervalLogWriterBuilder::new()
  119. .with_base_time(UNIX_EPOCH)
  120. .with_start_time(start_time)
  121. .begin_log_with(&mut buf, &mut ser)
  122. .unwrap();
  123. loop {
  124. match rx.recv() { //.recv_timeout(Duration::from_millis(1)) {
  125. //match rx.recv_timeout(Duration::new(1, 0)) {
  126. Ok(Some(Entry { tag, start, end, hist })) => {
  127. wtr.write_histogram(&hist, start.duration_since(UNIX_EPOCH).unwrap(),
  128. end.duration_since(start).unwrap(), Tag::new(tag))
  129. .ok();
  130. //.map_err(|e| { println!("{:?}", e); e }).ok();
  131. }
  132. // `None` used as terminate signal from `Drop`
  133. Ok(None) => break,
  134. _ => {
  135. thread::sleep(Duration::new(0, 0));
  136. }
  137. }
  138. }
  139. }).unwrap()
  140. }
  141. }
  142. impl Drop for HistLog {
  143. fn drop(&mut self) {
  144. if !self.hist.is_empty() { self.send(Instant::now()) }
  145. if let Some(arc) = self.thread.take() {
  146. //println!("in Drop, strong count is {}", Arc::strong_count(&arc));
  147. if let Ok(thread) = Arc::try_unwrap(arc) {
  148. let _ = self.tx.send(None);
  149. let _ = thread.join();
  150. }
  151. }
  152. }
  153. }