You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

771 lines
23KB

  1. //! An object to handle everyone's errors
  2. //!
  3. use std::thread::{self, JoinHandle};
  4. use std::sync::{Arc, Mutex, RwLock};
  5. use std::sync::mpsc::{self, Sender, Receiver, channel};
  6. use std::collections::{BTreeMap, VecDeque};
  7. use std::fmt::{self, Display, Error as FmtError, Formatter};
  8. use std::io::{self, Read, Write};
  9. use std::fs;
  10. use zmq;
  11. use chrono::{DateTime, Utc, TimeZone};
  12. use termion::color::{self, Fg, Bg};
  13. use influent::measurement::{Measurement, Value as InfluentValue};
  14. use slog::{self, OwnedKVList, Drain, Key, KV, Level, Logger};
  15. use sloggers::types::Severity;
  16. use super::{nanos, file_logger};
  17. use influx;
  18. const N_WARNINGS: usize = 500;
  19. #[macro_export]
  20. macro_rules! confirmed {
  21. ($warnings:ident, $($args:tt)*) => (
  22. {
  23. let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap();
  24. }
  25. )
  26. }
  27. /// logs a `Warning::Awesome` message to the `WarningsManager`
  28. #[macro_export]
  29. macro_rules! awesome {
  30. ($warnings:ident, $($args:tt)*) => (
  31. {
  32. let _ = $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap();
  33. }
  34. )
  35. }
  36. #[macro_export]
  37. macro_rules! critical {
  38. ($warnings:ident, $($args:tt)*) => (
  39. {
  40. let _ = $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap();
  41. }
  42. )
  43. }
  44. #[macro_export]
  45. macro_rules! notice {
  46. ($warnings:ident, $($args:tt)*) => (
  47. {
  48. let _ = $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap();
  49. }
  50. )
  51. }
  52. #[macro_export]
  53. macro_rules! error {
  54. ($warnings:ident, $($args:tt)*) => (
  55. {
  56. $warnings.send(Warning::Error( ( format!($($args)*) ) ) ).unwrap();
  57. }
  58. )
  59. }
  60. /// represents a non-fatal error somewhere in
  61. /// the system to report either to the program interface
  62. /// or in logs.
  63. ///
  64. #[derive(Debug, Clone, PartialEq)]
  65. pub enum Warning {
  66. Notice(String),
  67. Error(String),
  68. DegradedService(String),
  69. Critical(String),
  70. Confirmed(String),
  71. Awesome(String),
  72. Log {
  73. level: Level,
  74. module: &'static str,
  75. function: &'static str,
  76. line: u32,
  77. msg: String,
  78. kv: MeasurementRecord,
  79. },
  80. Terminate
  81. }
  82. impl Warning {
  83. pub fn msg(&self) -> String {
  84. match *self {
  85. Warning::Notice(ref s) | Warning::Error(ref s) |
  86. Warning::DegradedService(ref s) | Warning::Critical(ref s) |
  87. Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
  88. Warning::Log { msg: ref s, .. } =>
  89. s.clone(),
  90. Warning::Terminate => "".to_owned()
  91. }
  92. }
  93. pub fn msg_str(&self) -> &str {
  94. match *self {
  95. Warning::Notice(ref s) | Warning::Error(ref s) |
  96. Warning::DegradedService(ref s) | Warning::Critical(ref s) |
  97. Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
  98. Warning::Log { msg: ref s, .. } =>
  99. s.as_ref(),
  100. Warning::Terminate => "Terminate"
  101. }
  102. }
  103. pub fn category_str(&self) -> &str {
  104. match self {
  105. &Warning::Notice(_) => "NOTC",
  106. &Warning::Error(_) => "ERRO",
  107. &Warning::Critical(_) => "CRIT",
  108. &Warning::DegradedService(_) => "DGRD",
  109. &Warning::Confirmed(_) => "CNFD",
  110. &Warning::Awesome(_) => "AWSM",
  111. &Warning::Log { ref level, .. } => level.as_short_str(),
  112. &Warning::Terminate => "TERM",
  113. }
  114. }
  115. pub fn category(&self, f: &mut Formatter) -> fmt::Result {
  116. match *self {
  117. Warning::Notice(_) => {
  118. write!(f, "[ Notice ]")
  119. }
  120. Warning::Error(_) => {
  121. write!(f, "{yellow}[{title}]{reset}",
  122. yellow = Fg(color::LightYellow),
  123. title = " Error--",
  124. reset = Fg(color::Reset))
  125. }
  126. Warning::Critical(_) => {
  127. write!(f, "{bg}{fg}{title}{resetbg}{resetfg}",
  128. bg = Bg(color::Red),
  129. fg = Fg(color::White),
  130. title = " CRITICAL ",
  131. resetbg = Bg(color::Reset),
  132. resetfg = Fg(color::Reset))
  133. }
  134. Warning::Awesome(_) => {
  135. write!(f, "{color}[{title}]{reset}",
  136. color = Fg(color::Green),
  137. title = "Awesome!",
  138. reset = Fg(color::Reset))
  139. }
  140. Warning::DegradedService(_) => {
  141. write!(f, "{color}[{title}] {reset}",
  142. color = Fg(color::Blue),
  143. title = "Degraded Service ",
  144. reset = Fg(color::Reset))
  145. }
  146. Warning::Confirmed(_) => {
  147. write!(f, "{bg}{fg}{title}{resetbg}{resetfg}",
  148. bg = Bg(color::Blue),
  149. fg = Fg(color::White),
  150. title = "Confirmed ",
  151. resetbg = Bg(color::Reset),
  152. resetfg = Fg(color::Reset))
  153. }
  154. _ => Ok(())
  155. }
  156. }
  157. }
  158. impl Display for Warning {
  159. fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
  160. self.category(f);
  161. write!(f, " {}", self.msg())
  162. }
  163. }
  164. // impl Message for Warning {
  165. // fn kill_switch() -> Self {
  166. // Warning::Terminate
  167. // }
  168. // }
  169. #[derive(Debug, Clone)]
  170. pub struct Record {
  171. pub time: DateTime<Utc>,
  172. pub msg: Warning
  173. }
  174. impl Record {
  175. pub fn new(msg: Warning) -> Self {
  176. let time = Utc::now();
  177. Record { time, msg }
  178. }
  179. pub fn to_measurement(&self, name: &'static str) -> Measurement {
  180. let cat = self.msg.category_str();
  181. let body = self.msg.msg_str();
  182. let mut m = Measurement::new(name);
  183. m.add_tag("category", cat);
  184. m.add_field("msg", InfluentValue::String(body));
  185. m.set_timestamp(nanos(self.time) as i64);
  186. m
  187. }
  188. }
  189. impl Display for Record {
  190. fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
  191. write!(f, "{} | {}", self.time.format("%H:%M:%S"), self.msg)
  192. }
  193. }
  194. pub type SlogResult = Result<(), slog::Error>;
  195. #[derive(Debug, Clone, PartialEq)]
  196. pub enum Value {
  197. String(String),
  198. Float(f64),
  199. Integer(i64),
  200. Boolean(bool)
  201. }
  202. impl Value {
  203. pub fn to_influent<'a>(&'a self) -> InfluentValue<'a> {
  204. match self {
  205. &Value::String(ref s) => InfluentValue::String(s),
  206. &Value::Float(n) => InfluentValue::Float(n),
  207. &Value::Integer(i) => InfluentValue::Integer(i),
  208. &Value::Boolean(b) => InfluentValue::Boolean(b),
  209. }
  210. }
  211. }
  212. #[derive(Debug, Clone, PartialEq)]
  213. pub struct MeasurementRecord {
  214. fields: Vec<(Key, Value)>,
  215. //measurement: &'a mut Measurement<'a>,
  216. tags: Vec<(Key, String)>,
  217. }
  218. impl MeasurementRecord {
  219. pub fn new() -> Self {
  220. MeasurementRecord {
  221. fields: Vec::new(),
  222. tags: Vec::new(),
  223. }
  224. }
  225. pub fn add_field(&mut self, key: Key, val: Value) -> SlogResult {
  226. self.fields.push((key, val));
  227. Ok(())
  228. }
  229. pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult {
  230. match key {
  231. "exchange" | "thread" | "ticker" | "category" => {
  232. self.tags.push((key, val));
  233. }
  234. other => {
  235. self.add_field(key, Value::String(val));
  236. }
  237. }
  238. Ok(())
  239. }
  240. pub fn serialize_values(&mut self, record: &slog::Record, values: &OwnedKVList) {
  241. let mut builder = TagBuilder { mrec: self };
  242. values.serialize(record, &mut builder);
  243. }
  244. pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> {
  245. let fields: BTreeMap<&'a str, InfluentValue<'a>> =
  246. self.fields.iter()
  247. .map(|&(k, ref v)| {
  248. (k, v.to_influent())
  249. }).collect();
  250. let tags: BTreeMap<&'a str, &'a str> =
  251. self.tags.iter()
  252. .map(|&(k, ref v)| {
  253. (k, v.as_ref())
  254. }).collect();
  255. Measurement {
  256. key: name,
  257. timestamp: Some(nanos(Utc::now()) as i64),
  258. fields,
  259. tags,
  260. }
  261. }
  262. }
  263. impl slog::Serializer for MeasurementRecord {
  264. fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  265. fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  266. fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)); Ok(()) }
  267. fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  268. fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  269. fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  270. fn emit_i16(&mut self, key: Key, val: i16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  271. fn emit_u32(&mut self, key: Key, val: u32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  272. fn emit_i32(&mut self, key: Key, val: i32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  273. fn emit_f32(&mut self, key: Key, val: f32) -> SlogResult { self.add_field(key, Value::Float(val as f64)) }
  274. fn emit_u64(&mut self, key: Key, val: u64) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  275. fn emit_i64(&mut self, key: Key, val: i64) -> SlogResult { self.add_field(key, Value::Integer(val)) }
  276. fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) }
  277. fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_field(key, Value::String(val.to_string())) }
  278. fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) }
  279. fn emit_none(&mut self, key: Key) -> SlogResult { Ok(()) } //self.add_field(key, Value::String("none".into())) }
  280. fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_field(key, Value::String(val.to_string())) }
  281. }
  282. pub struct TagBuilder<'a> {
  283. mrec: &'a mut MeasurementRecord
  284. }
  285. impl<'a> slog::Serializer for TagBuilder<'a> {
  286. fn emit_str(&mut self, key: Key, val: &str) -> SlogResult {
  287. match key {
  288. "exchange" | "thread" | "ticker" | "category" => {
  289. self.mrec.add_tag(key, val.to_string())
  290. }
  291. other => {
  292. self.mrec.add_field(key, Value::String(val.to_string()))
  293. }
  294. }
  295. }
  296. fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult {
  297. match key {
  298. "exchange" | "thread" | "ticker" | "category" => {
  299. self.mrec.add_tag(key, val.to_string())
  300. }
  301. other => {
  302. self.mrec.add_field(key, Value::String(val.to_string()))
  303. }
  304. }
  305. }
  306. }
  307. pub struct WarningsDrain<D: Drain> {
  308. level: Level,
  309. tx: Arc<Mutex<Sender<Warning>>>,
  310. drain: D,
  311. to_file: Logger,
  312. }
  313. impl<D> WarningsDrain<D>
  314. where D: Drain
  315. {
  316. pub fn new(tx: Sender<Warning>, level: Level, drain: D) -> Self {
  317. let tx = Arc::new(Mutex::new(tx));
  318. let to_file = file_logger("var/log/mm.log", Severity::Warning);
  319. WarningsDrain { tx, drain, level, to_file }
  320. }
  321. }
  322. impl From<Sender<Warning>> for WarningsDrain<slog::Fuse<slog::Discard>> {
  323. fn from(tx: Sender<Warning>) -> Self {
  324. WarningsDrain::new(tx, Level::Debug, slog::Discard.fuse())
  325. }
  326. }
  327. impl<D: Drain> Drain for WarningsDrain<D> {
  328. type Ok = ();
  329. type Err = D::Err;
  330. fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
  331. if record.level() <= self.level {
  332. let mut ser = MeasurementRecord::new();
  333. ser.serialize_values(record, values);
  334. record.kv().serialize(record, &mut ser);
  335. let msg = record.msg().to_string();
  336. if let Ok(lock) = self.tx.lock() {
  337. let _ = lock.send(Warning::Log {
  338. level: record.level(),
  339. module: record.module(),
  340. function: record.function(),
  341. line: record.line(),
  342. msg,
  343. kv: ser
  344. });
  345. }
  346. }
  347. if record.level() <= Level::Warning {
  348. let _ = self.to_file.log(record);
  349. }
  350. let _ = self.drain.log(record, values)?;
  351. Ok(())
  352. }
  353. }
  354. #[derive(Debug)]
  355. pub struct WarningsManager {
  356. pub tx: Sender<Warning>,
  357. pub warnings: Arc<RwLock<VecDeque<Record>>>,
  358. thread: Option<JoinHandle<()>>
  359. }
  360. impl WarningsManager {
  361. /// `measurement_name` is the name of the influxdb measurement
  362. /// we will save log entries to.
  363. ///
  364. pub fn new(measurement_name: &'static str) -> Self {
  365. let warnings = Arc::new(RwLock::new(VecDeque::new()));
  366. let warnings_copy = warnings.clone();
  367. let (tx, rx) = channel();
  368. let mut buf = String::with_capacity(4096);
  369. let ctx = zmq::Context::new();
  370. let socket = influx::push(&ctx).unwrap();
  371. let thread = thread::spawn(move || {
  372. let path = format!("var/log/warnings-manager-{}.log", measurement_name);
  373. let logger = file_logger(&path, Severity::Info);
  374. info!(logger, "entering loop");
  375. loop {
  376. if let Ok(msg) = rx.recv() {
  377. match msg {
  378. Warning::Terminate => {
  379. crit!(logger, "terminating");
  380. break;
  381. }
  382. Warning::Log { level, module, function, line, msg, kv } => {
  383. debug!(logger, "new Warning::Debug arrived";
  384. "msg" => &msg);
  385. let mut meas = kv.to_measurement(measurement_name);
  386. meas.add_field("msg", InfluentValue::String(msg.as_ref()));
  387. meas.add_tag("category", level.as_short_str());
  388. influx::serialize(&meas, &mut buf);
  389. let _ = socket.send_str(&buf, 0);
  390. buf.clear();
  391. // and don't push to warnings
  392. // bc it's debug
  393. }
  394. other => {
  395. debug!(logger, "new {} arrived", other.category_str();
  396. "msg" => other.category_str());
  397. let rec = Record::new(other);
  398. {
  399. let m = rec.to_measurement(measurement_name);
  400. influx::serialize(&m, &mut buf);
  401. let _ = socket.send_str(&buf, 0);
  402. buf.clear();
  403. }
  404. if let Ok(mut lock) = warnings.write() {
  405. lock.push_front(rec);
  406. lock.truncate(N_WARNINGS);
  407. }
  408. }
  409. }
  410. }
  411. }
  412. });
  413. WarningsManager {
  414. warnings: warnings_copy,
  415. thread: Some(thread),
  416. tx
  417. }
  418. }
  419. }
  420. impl Drop for WarningsManager {
  421. fn drop(&mut self) {
  422. let _ = self.tx.send(Warning::Terminate);
  423. if let Some(thread) = self.thread.take() {
  424. thread.join();
  425. }
  426. }
  427. }
  428. pub struct ZmqDrain<D>
  429. where D: Drain,
  430. {
  431. drain: D,
  432. ctx: zmq::Context,
  433. socket: zmq::Socket,
  434. buf: Arc<Mutex<Vec<u8>>>
  435. }
  436. impl<D> ZmqDrain<D>
  437. where D: Drain,
  438. {
  439. pub fn new(drain: D) -> Self {
  440. let _ = fs::create_dir("/tmp/mm");
  441. let ctx = zmq::Context::new();
  442. let socket = ctx.socket(zmq::PUB).unwrap();
  443. socket.bind("ipc:///tmp/mm/log").expect("zmq publisher bind failed");
  444. let buf = Arc::new(Mutex::new(Vec::with_capacity(4096)));
  445. ZmqDrain {
  446. drain,
  447. ctx,
  448. socket,
  449. buf
  450. }
  451. }
  452. }
  453. const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f";
  454. impl<D> Drain for ZmqDrain<D>
  455. where D: Drain
  456. {
  457. type Ok = D::Ok;
  458. type Err = D::Err;
  459. fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
  460. {
  461. let mut buf = self.buf.lock().unwrap();
  462. write!(buf, "{time} {level}",
  463. time = Utc::now().format(TIMESTAMP_FORMAT),
  464. level = record.level().as_short_str());
  465. {
  466. let mut thread_ser = ThreadSer(&mut buf);
  467. record.kv().serialize(record, &mut thread_ser);
  468. values.serialize(record, &mut thread_ser);
  469. }
  470. write!(buf, " {file:<20} {line:<5} {msg}",
  471. file = record.file(),
  472. line = record.line(),
  473. msg = record.msg());
  474. {
  475. let mut kv_ser = KvSer(&mut buf);
  476. record.kv().serialize(record, &mut kv_ser);
  477. values.serialize(record, &mut kv_ser);
  478. }
  479. let _ = self.socket.send(&buf, 0);
  480. buf.clear();
  481. }
  482. self.drain.log(record, values)
  483. }
  484. }
  485. /// Can be used as a `Write` with `slog_term` and
  486. /// other libraries.
  487. ///
  488. pub struct ZmqIo {
  489. ctx: zmq::Context,
  490. socket: zmq::Socket,
  491. buf: Vec<u8>
  492. }
  493. impl ZmqIo {
  494. pub fn new(addr: &str) -> Self {
  495. let _ = fs::create_dir("/tmp/mm");
  496. let ctx = zmq::Context::new();
  497. let socket = ctx.socket(zmq::PUB).unwrap();
  498. let addr = format!("ipc:///tmp/mm/{}", addr);
  499. socket.bind(&addr).expect("zmq publisher bind failed");
  500. let buf = Vec::with_capacity(4096);
  501. ZmqIo { ctx, socket, buf }
  502. }
  503. }
  504. impl Write for ZmqIo {
  505. fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
  506. self.buf.write(buf)
  507. }
  508. fn flush(&mut self) -> io::Result<()> {
  509. match self.buf.pop() {
  510. Some(b'\n') => {
  511. let _ = self.socket.send(&self.buf, 0);
  512. }
  513. Some(other) => {
  514. self.buf.push(other);
  515. let _ = self.socket.send(&self.buf, 0);
  516. }
  517. None => {
  518. return Ok(());
  519. }
  520. }
  521. self.buf.clear();
  522. Ok(())
  523. }
  524. }
  525. /// Serializes *only* KV pair with `key == "thread"`
  526. ///
  527. struct ThreadSer<'a>(&'a mut Vec<u8>);
  528. impl<'a> slog::ser::Serializer for ThreadSer<'a> {
  529. fn emit_arguments(&mut self, key: &str, val: &fmt::Arguments) -> slog::Result {
  530. Ok(())
  531. }
  532. fn emit_str(&mut self, key: &str, val: &str) -> slog::Result {
  533. if key == "thread" {
  534. write!(self.0, " {:<20}", val);
  535. }
  536. Ok(())
  537. }
  538. }
  539. /// Serializes KV pairs as ", k: v"
  540. ///
  541. struct KvSer<'a>(&'a mut Vec<u8>);
  542. macro_rules! s(
  543. ($s:expr, $k:expr, $v:expr) => {
  544. try!(write!($s.0, ", {}: {}", $k, $v));
  545. };
  546. );
  547. impl<'a> slog::ser::Serializer for KvSer<'a> {
  548. fn emit_none(&mut self, key: &str) -> slog::Result {
  549. s!(self, key, "None");
  550. Ok(())
  551. }
  552. fn emit_unit(&mut self, key: &str) -> slog::Result {
  553. s!(self, key, "()");
  554. Ok(())
  555. }
  556. fn emit_bool(&mut self, key: &str, val: bool) -> slog::Result {
  557. s!(self, key, val);
  558. Ok(())
  559. }
  560. fn emit_char(&mut self, key: &str, val: char) -> slog::Result {
  561. s!(self, key, val);
  562. Ok(())
  563. }
  564. fn emit_usize(&mut self, key: &str, val: usize) -> slog::Result {
  565. s!(self, key, val);
  566. Ok(())
  567. }
  568. fn emit_isize(&mut self, key: &str, val: isize) -> slog::Result {
  569. s!(self, key, val);
  570. Ok(())
  571. }
  572. fn emit_u8(&mut self, key: &str, val: u8) -> slog::Result {
  573. s!(self, key, val);
  574. Ok(())
  575. }
  576. fn emit_i8(&mut self, key: &str, val: i8) -> slog::Result {
  577. s!(self, key, val);
  578. Ok(())
  579. }
  580. fn emit_u16(&mut self, key: &str, val: u16) -> slog::Result {
  581. s!(self, key, val);
  582. Ok(())
  583. }
  584. fn emit_i16(&mut self, key: &str, val: i16) -> slog::Result {
  585. s!(self, key, val);
  586. Ok(())
  587. }
  588. fn emit_u32(&mut self, key: &str, val: u32) -> slog::Result {
  589. s!(self, key, val);
  590. Ok(())
  591. }
  592. fn emit_i32(&mut self, key: &str, val: i32) -> slog::Result {
  593. s!(self, key, val);
  594. Ok(())
  595. }
  596. fn emit_f32(&mut self, key: &str, val: f32) -> slog::Result {
  597. s!(self, key, val);
  598. Ok(())
  599. }
  600. fn emit_u64(&mut self, key: &str, val: u64) -> slog::Result {
  601. s!(self, key, val);
  602. Ok(())
  603. }
  604. fn emit_i64(&mut self, key: &str, val: i64) -> slog::Result {
  605. s!(self, key, val);
  606. Ok(())
  607. }
  608. fn emit_f64(&mut self, key: &str, val: f64) -> slog::Result {
  609. s!(self, key, val);
  610. Ok(())
  611. }
  612. fn emit_str(&mut self, key: &str, val: &str) -> slog::Result {
  613. s!(self, key, val);
  614. Ok(())
  615. }
  616. fn emit_arguments(
  617. &mut self,
  618. key: &str,
  619. val: &fmt::Arguments,
  620. ) -> slog::Result {
  621. s!(self, key, val);
  622. Ok(())
  623. }
  624. }
  625. #[cfg(test)]
  626. mod tests {
  627. use super::*;
  628. use test::{black_box, Bencher};
  629. #[test]
  630. #[ignore]
  631. fn it_creates_a_logger() {
  632. let wm = WarningsManager::new("rust-test");
  633. let im = influx::writer(wm.tx.clone());
  634. let drain =
  635. WarningsDrain {
  636. tx: Arc::new(Mutex::new(wm.tx.clone())),
  637. drain: slog::Discard,
  638. to_file: Logger::root(slog::Discard, o!()),
  639. level: Level::Trace,
  640. };
  641. let logger = slog::Logger::root(drain, o!());
  642. //for _ in 0..60 {
  643. // debug!(logger, "test 123"; "exchange" => "plnx");
  644. //}
  645. }
  646. #[bench]
  647. fn it_sends_integers_with_a_sender_behind_a_mutex(b: &mut Bencher) {
  648. let (tx, rx) = channel();
  649. enum Msg {
  650. Val(usize),
  651. Terminate
  652. }
  653. let worker = thread::spawn(move || {
  654. let mut xs = Vec::new();
  655. loop {
  656. match rx.recv().unwrap() {
  657. Msg::Val(x) => { xs.push(x); }
  658. Msg::Terminate => break,
  659. }
  660. }
  661. xs.len()
  662. });
  663. let tx = Arc::new(Mutex::new(tx));
  664. b.iter(|| {
  665. let lock = tx.lock().unwrap();
  666. let _ = lock.send(Msg::Val(1));
  667. });
  668. let _ = tx.lock().unwrap().send(Msg::Terminate);
  669. let len = worker.join().unwrap();
  670. //println!("{}", len);
  671. }
  672. }