You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

763 lines
22KB

  1. //! An object to handle everyone's errors
  2. //!
  3. use std::thread::{self, JoinHandle};
  4. use std::sync::{Arc, Mutex, RwLock};
  5. use std::sync::mpsc::{Sender, channel};
  6. use std::collections::{BTreeMap, VecDeque};
  7. use std::fmt::{self, Display, Error as FmtError, Formatter};
  8. use std::io::{self, Write};
  9. use std::fs;
  10. use zmq;
  11. use chrono::{DateTime, Utc};
  12. use termion::color::{self, Fg, Bg};
  13. use influent::measurement::{Measurement, Value as InfluentValue};
  14. use slog::{self, OwnedKVList, Drain, Key, KV, Level, Logger};
  15. use sloggers::types::Severity;
  16. use super::{nanos, file_logger};
  17. use influx;
  18. const N_WARNINGS: usize = 500;
  19. #[macro_export]
  20. macro_rules! confirmed {
  21. ($warnings:ident, $($args:tt)*) => (
  22. {
  23. let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap();
  24. }
  25. )
  26. }
  27. /// logs a `Warning::Awesome` message to the `WarningsManager`
  28. #[macro_export]
  29. macro_rules! awesome {
  30. ($warnings:ident, $($args:tt)*) => (
  31. {
  32. let _ = $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap();
  33. }
  34. )
  35. }
  36. #[macro_export]
  37. macro_rules! critical {
  38. ($warnings:ident, $($args:tt)*) => (
  39. {
  40. let _ = $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap();
  41. }
  42. )
  43. }
  44. #[macro_export]
  45. macro_rules! notice {
  46. ($warnings:ident, $($args:tt)*) => (
  47. {
  48. let _ = $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap();
  49. }
  50. )
  51. }
  52. #[macro_export]
  53. macro_rules! error {
  54. ($warnings:ident, $($args:tt)*) => (
  55. {
  56. $warnings.send(Warning::Error( ( format!($($args)*) ) ) ).unwrap();
  57. }
  58. )
  59. }
  60. /// represents a non-fatal error somewhere in
  61. /// the system to report either to the program interface
  62. /// or in logs.
  63. ///
  64. #[derive(Debug, Clone, PartialEq)]
  65. pub enum Warning {
  66. Notice(String),
  67. Error(String),
  68. DegradedService(String),
  69. Critical(String),
  70. Confirmed(String),
  71. Awesome(String),
  72. Log {
  73. level: Level,
  74. module: &'static str,
  75. function: &'static str,
  76. line: u32,
  77. msg: String,
  78. kv: MeasurementRecord,
  79. },
  80. Terminate
  81. }
  82. impl Warning {
  83. pub fn msg(&self) -> String {
  84. match *self {
  85. Warning::Notice(ref s) | Warning::Error(ref s) |
  86. Warning::DegradedService(ref s) | Warning::Critical(ref s) |
  87. Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
  88. Warning::Log { msg: ref s, .. } =>
  89. s.clone(),
  90. Warning::Terminate => "".to_owned()
  91. }
  92. }
  93. pub fn msg_str(&self) -> &str {
  94. match *self {
  95. Warning::Notice(ref s) | Warning::Error(ref s) |
  96. Warning::DegradedService(ref s) | Warning::Critical(ref s) |
  97. Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
  98. Warning::Log { msg: ref s, .. } =>
  99. s.as_ref(),
  100. Warning::Terminate => "Terminate"
  101. }
  102. }
  103. pub fn category_str(&self) -> &str {
  104. match self {
  105. &Warning::Notice(_) => "NOTC",
  106. &Warning::Error(_) => "ERRO",
  107. &Warning::Critical(_) => "CRIT",
  108. &Warning::DegradedService(_) => "DGRD",
  109. &Warning::Confirmed(_) => "CNFD",
  110. &Warning::Awesome(_) => "AWSM",
  111. &Warning::Log { ref level, .. } => level.as_short_str(),
  112. &Warning::Terminate => "TERM",
  113. }
  114. }
  115. pub fn category(&self, f: &mut Formatter) -> fmt::Result {
  116. match *self {
  117. Warning::Notice(_) => {
  118. write!(f, "[ Notice ]")
  119. }
  120. Warning::Error(_) => {
  121. write!(f, "{yellow}[{title}]{reset}",
  122. yellow = Fg(color::LightYellow),
  123. title = " Error--",
  124. reset = Fg(color::Reset))
  125. }
  126. Warning::Critical(_) => {
  127. write!(f, "{bg}{fg}{title}{resetbg}{resetfg}",
  128. bg = Bg(color::Red),
  129. fg = Fg(color::White),
  130. title = " CRITICAL ",
  131. resetbg = Bg(color::Reset),
  132. resetfg = Fg(color::Reset))
  133. }
  134. Warning::Awesome(_) => {
  135. write!(f, "{color}[{title}]{reset}",
  136. color = Fg(color::Green),
  137. title = "Awesome!",
  138. reset = Fg(color::Reset))
  139. }
  140. Warning::DegradedService(_) => {
  141. write!(f, "{color}[{title}] {reset}",
  142. color = Fg(color::Blue),
  143. title = "Degraded Service ",
  144. reset = Fg(color::Reset))
  145. }
  146. Warning::Confirmed(_) => {
  147. write!(f, "{bg}{fg}{title}{resetbg}{resetfg}",
  148. bg = Bg(color::Blue),
  149. fg = Fg(color::White),
  150. title = "Confirmed ",
  151. resetbg = Bg(color::Reset),
  152. resetfg = Fg(color::Reset))
  153. }
  154. _ => Ok(())
  155. }
  156. }
  157. }
  158. impl Display for Warning {
  159. fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
  160. self.category(f)?;
  161. write!(f, " {}", self.msg())
  162. }
  163. }
  164. #[derive(Debug, Clone)]
  165. pub struct Record {
  166. pub time: DateTime<Utc>,
  167. pub msg: Warning
  168. }
  169. impl Record {
  170. pub fn new(msg: Warning) -> Self {
  171. let time = Utc::now();
  172. Record { time, msg }
  173. }
  174. pub fn to_measurement(&self, name: &'static str) -> Measurement {
  175. let cat = self.msg.category_str();
  176. let body = self.msg.msg_str();
  177. let mut m = Measurement::new(name);
  178. m.add_tag("category", cat);
  179. m.add_field("msg", InfluentValue::String(body));
  180. m.set_timestamp(nanos(self.time) as i64);
  181. m
  182. }
  183. }
  184. impl Display for Record {
  185. fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
  186. write!(f, "{} | {}", self.time.format("%H:%M:%S"), self.msg)
  187. }
  188. }
  189. pub type SlogResult = Result<(), slog::Error>;
  190. #[derive(Debug, Clone, PartialEq)]
  191. pub enum Value {
  192. String(String),
  193. Float(f64),
  194. Integer(i64),
  195. Boolean(bool)
  196. }
  197. impl Value {
  198. pub fn to_influent<'a>(&'a self) -> InfluentValue<'a> {
  199. match self {
  200. &Value::String(ref s) => InfluentValue::String(s),
  201. &Value::Float(n) => InfluentValue::Float(n),
  202. &Value::Integer(i) => InfluentValue::Integer(i),
  203. &Value::Boolean(b) => InfluentValue::Boolean(b),
  204. }
  205. }
  206. }
  207. #[derive(Debug, Clone, PartialEq)]
  208. pub struct MeasurementRecord {
  209. fields: Vec<(Key, Value)>,
  210. tags: Vec<(Key, String)>,
  211. }
  212. impl MeasurementRecord {
  213. pub fn new() -> Self {
  214. MeasurementRecord {
  215. fields: Vec::new(),
  216. tags: Vec::new(),
  217. }
  218. }
  219. pub fn add_field(&mut self, key: Key, val: Value) -> SlogResult {
  220. self.fields.push((key, val));
  221. Ok(())
  222. }
  223. pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult {
  224. match key {
  225. "exchange" | "thread" | "ticker" | "category" => {
  226. self.tags.push((key, val));
  227. }
  228. other => {
  229. self.add_field(other, Value::String(val)).unwrap();
  230. }
  231. }
  232. Ok(())
  233. }
  234. pub fn serialize_values(&mut self, record: &slog::Record, values: &OwnedKVList) {
  235. let mut builder = TagBuilder { mrec: self };
  236. let _ = values.serialize(record, &mut builder);
  237. }
  238. pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> {
  239. let fields: BTreeMap<&'a str, InfluentValue<'a>> =
  240. self.fields.iter()
  241. .map(|&(k, ref v)| {
  242. (k, v.to_influent())
  243. }).collect();
  244. let tags: BTreeMap<&'a str, &'a str> =
  245. self.tags.iter()
  246. .map(|&(k, ref v)| {
  247. (k, v.as_ref())
  248. }).collect();
  249. Measurement {
  250. key: name,
  251. timestamp: Some(nanos(Utc::now()) as i64),
  252. fields,
  253. tags,
  254. }
  255. }
  256. }
  257. impl slog::Serializer for MeasurementRecord {
  258. fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  259. fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  260. fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)) }
  261. fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  262. fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  263. fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  264. fn emit_i16(&mut self, key: Key, val: i16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  265. fn emit_u32(&mut self, key: Key, val: u32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  266. fn emit_i32(&mut self, key: Key, val: i32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  267. fn emit_f32(&mut self, key: Key, val: f32) -> SlogResult { self.add_field(key, Value::Float(val as f64)) }
  268. fn emit_u64(&mut self, key: Key, val: u64) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  269. fn emit_i64(&mut self, key: Key, val: i64) -> SlogResult { self.add_field(key, Value::Integer(val)) }
  270. fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) }
  271. fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_field(key, Value::String(val.to_string())) }
  272. fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) }
  273. fn emit_none(&mut self, _: Key) -> SlogResult { Ok(()) } //self.add_field(key, Value::String("none".into())) }
  274. fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_field(key, Value::String(val.to_string())) }
  275. }
  276. pub struct TagBuilder<'a> {
  277. mrec: &'a mut MeasurementRecord
  278. }
  279. impl<'a> slog::Serializer for TagBuilder<'a> {
  280. fn emit_str(&mut self, key: Key, val: &str) -> SlogResult {
  281. match key {
  282. "exchange" | "thread" | "ticker" | "category" => {
  283. self.mrec.add_tag(key, val.to_string())
  284. }
  285. other => {
  286. self.mrec.add_field(other, Value::String(val.to_string()))
  287. }
  288. }
  289. }
  290. fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult {
  291. match key {
  292. "exchange" | "thread" | "ticker" | "category" => {
  293. self.mrec.add_tag(key, val.to_string())
  294. }
  295. other => {
  296. self.mrec.add_field(other, Value::String(val.to_string()))
  297. }
  298. }
  299. }
  300. }
  301. pub struct WarningsDrain<D: Drain> {
  302. level: Level,
  303. tx: Arc<Mutex<Sender<Warning>>>,
  304. drain: D,
  305. to_file: Logger,
  306. }
  307. impl<D> WarningsDrain<D>
  308. where D: Drain
  309. {
  310. pub fn new(tx: Sender<Warning>, level: Level, drain: D) -> Self {
  311. let tx = Arc::new(Mutex::new(tx));
  312. let to_file = file_logger("var/log/mm.log", Severity::Warning);
  313. WarningsDrain { tx, drain, level, to_file }
  314. }
  315. }
  316. impl From<Sender<Warning>> for WarningsDrain<slog::Fuse<slog::Discard>> {
  317. fn from(tx: Sender<Warning>) -> Self {
  318. WarningsDrain::new(tx, Level::Debug, slog::Discard.fuse())
  319. }
  320. }
  321. impl<D: Drain> Drain for WarningsDrain<D> {
  322. type Ok = ();
  323. type Err = D::Err;
  324. fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
  325. if record.level() <= self.level {
  326. let mut ser = MeasurementRecord::new();
  327. ser.serialize_values(record, values);
  328. let _ = record.kv().serialize(record, &mut ser);
  329. let msg = record.msg().to_string();
  330. if let Ok(lock) = self.tx.lock() {
  331. let _ = lock.send(Warning::Log {
  332. level: record.level(),
  333. module: record.module(),
  334. function: record.function(),
  335. line: record.line(),
  336. msg,
  337. kv: ser
  338. });
  339. }
  340. }
  341. if record.level() <= Level::Warning {
  342. let _ = self.to_file.log(record);
  343. }
  344. let _ = self.drain.log(record, values)?;
  345. Ok(())
  346. }
  347. }
  348. #[derive(Debug)]
  349. pub struct WarningsManager {
  350. pub tx: Sender<Warning>,
  351. pub warnings: Arc<RwLock<VecDeque<Record>>>,
  352. thread: Option<JoinHandle<()>>
  353. }
  354. impl WarningsManager {
  355. /// `measurement_name` is the name of the influxdb measurement
  356. /// we will save log entries to.
  357. ///
  358. pub fn new(measurement_name: &'static str) -> Self {
  359. let warnings = Arc::new(RwLock::new(VecDeque::new()));
  360. let warnings_copy = warnings.clone();
  361. let (tx, rx) = channel();
  362. let mut buf = String::with_capacity(4096);
  363. let ctx = zmq::Context::new();
  364. let socket = influx::push(&ctx).unwrap();
  365. let thread = thread::spawn(move || {
  366. let path = format!("var/log/warnings-manager-{}.log", measurement_name);
  367. let logger = file_logger(&path, Severity::Info);
  368. info!(logger, "entering loop");
  369. loop {
  370. if let Ok(msg) = rx.recv() {
  371. match msg {
  372. Warning::Terminate => {
  373. debug!(logger, "terminating");
  374. break;
  375. }
  376. Warning::Log { level, msg, kv, .. } => {
  377. debug!(logger, "new Warning::Debug arrived";
  378. "msg" => &msg);
  379. let mut meas = kv.to_measurement(measurement_name);
  380. meas.add_field("msg", InfluentValue::String(msg.as_ref()));
  381. meas.add_tag("category", level.as_short_str());
  382. influx::serialize(&meas, &mut buf);
  383. let _ = socket.send_str(&buf, 0);
  384. buf.clear();
  385. // and don't push to warnings
  386. // bc it's debug
  387. }
  388. other => {
  389. debug!(logger, "new {} arrived", other.category_str();
  390. "msg" => other.category_str());
  391. let rec = Record::new(other);
  392. {
  393. let m = rec.to_measurement(measurement_name);
  394. influx::serialize(&m, &mut buf);
  395. let _ = socket.send_str(&buf, 0);
  396. buf.clear();
  397. }
  398. if let Ok(mut lock) = warnings.write() {
  399. lock.push_front(rec);
  400. lock.truncate(N_WARNINGS);
  401. }
  402. }
  403. }
  404. }
  405. }
  406. });
  407. WarningsManager {
  408. warnings: warnings_copy,
  409. thread: Some(thread),
  410. tx
  411. }
  412. }
  413. }
  414. impl Drop for WarningsManager {
  415. fn drop(&mut self) {
  416. let _ = self.tx.send(Warning::Terminate);
  417. if let Some(thread) = self.thread.take() {
  418. thread.join().unwrap();
  419. }
  420. }
  421. }
  422. #[allow(dead_code)]
  423. pub struct ZmqDrain<D>
  424. where D: Drain,
  425. {
  426. drain: D,
  427. ctx: zmq::Context,
  428. socket: zmq::Socket,
  429. buf: Arc<Mutex<Vec<u8>>>
  430. }
  431. impl<D> ZmqDrain<D>
  432. where D: Drain,
  433. {
  434. pub fn new(drain: D) -> Self {
  435. let _ = fs::create_dir("/tmp/mm");
  436. let ctx = zmq::Context::new();
  437. let socket = ctx.socket(zmq::PUB).unwrap();
  438. socket.bind("ipc:///tmp/mm/log").expect("zmq publisher bind failed");
  439. let buf = Arc::new(Mutex::new(Vec::with_capacity(4096)));
  440. ZmqDrain {
  441. drain,
  442. ctx,
  443. socket,
  444. buf
  445. }
  446. }
  447. }
  448. const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f";
  449. impl<D> Drain for ZmqDrain<D>
  450. where D: Drain
  451. {
  452. type Ok = D::Ok;
  453. type Err = D::Err;
  454. fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
  455. {
  456. let mut buf = self.buf.lock().unwrap();
  457. let _ = write!(buf, "{time} {level}",
  458. time = Utc::now().format(TIMESTAMP_FORMAT),
  459. level = record.level().as_short_str());
  460. {
  461. let mut thread_ser = ThreadSer(&mut buf);
  462. let _ = record.kv().serialize(record, &mut thread_ser);
  463. let _ = values.serialize(record, &mut thread_ser);
  464. }
  465. let _ = write!(buf, " {file:<20} {line:<5} {msg}",
  466. file = record.file(),
  467. line = record.line(),
  468. msg = record.msg());
  469. {
  470. let mut kv_ser = KvSer(&mut buf);
  471. // discarding any errors here...
  472. let _ = record.kv().serialize(record, &mut kv_ser);
  473. let _ = values.serialize(record, &mut kv_ser);
  474. }
  475. let _ = self.socket.send(&buf, 0);
  476. buf.clear();
  477. }
  478. self.drain.log(record, values)
  479. }
  480. }
  481. /// Can be used as a `Write` with `slog_term` and
  482. /// other libraries.
  483. ///
  484. #[allow(dead_code)]
  485. pub struct ZmqIo {
  486. ctx: zmq::Context,
  487. socket: zmq::Socket,
  488. buf: Vec<u8>
  489. }
  490. impl ZmqIo {
  491. pub fn new(addr: &str) -> Self {
  492. let _ = fs::create_dir("/tmp/mm");
  493. let ctx = zmq::Context::new();
  494. let socket = ctx.socket(zmq::PUB).unwrap();
  495. let addr = format!("ipc:///tmp/mm/{}", addr);
  496. socket.bind(&addr).expect("zmq publisher bind failed");
  497. let buf = Vec::with_capacity(4096);
  498. ZmqIo { ctx, socket, buf }
  499. }
  500. }
  501. impl Write for ZmqIo {
  502. fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
  503. self.buf.write(buf)
  504. }
  505. fn flush(&mut self) -> io::Result<()> {
  506. match self.buf.pop() {
  507. Some(b'\n') => {
  508. let _ = self.socket.send(&self.buf, 0);
  509. }
  510. Some(other) => {
  511. self.buf.push(other);
  512. let _ = self.socket.send(&self.buf, 0);
  513. }
  514. None => {
  515. return Ok(());
  516. }
  517. }
  518. self.buf.clear();
  519. Ok(())
  520. }
  521. }
  522. /// Serializes *only* KV pair with `key == "thread"`
  523. ///
  524. struct ThreadSer<'a>(&'a mut Vec<u8>);
  525. impl<'a> slog::ser::Serializer for ThreadSer<'a> {
  526. fn emit_arguments(&mut self, _: &str, _: &fmt::Arguments) -> slog::Result {
  527. Ok(())
  528. }
  529. fn emit_str(&mut self, key: &str, val: &str) -> slog::Result {
  530. if key == "thread" {
  531. write!(self.0, " {:<20}", val)?;
  532. }
  533. Ok(())
  534. }
  535. }
  536. /// Serializes KV pairs as ", k: v"
  537. ///
  538. struct KvSer<'a>(&'a mut Vec<u8>);
  539. macro_rules! s(
  540. ($s:expr, $k:expr, $v:expr) => {
  541. try!(write!($s.0, ", {}: {}", $k, $v));
  542. };
  543. );
  544. impl<'a> slog::ser::Serializer for KvSer<'a> {
  545. fn emit_none(&mut self, key: &str) -> slog::Result {
  546. s!(self, key, "None");
  547. Ok(())
  548. }
  549. fn emit_unit(&mut self, key: &str) -> slog::Result {
  550. s!(self, key, "()");
  551. Ok(())
  552. }
  553. fn emit_bool(&mut self, key: &str, val: bool) -> slog::Result {
  554. s!(self, key, val);
  555. Ok(())
  556. }
  557. fn emit_char(&mut self, key: &str, val: char) -> slog::Result {
  558. s!(self, key, val);
  559. Ok(())
  560. }
  561. fn emit_usize(&mut self, key: &str, val: usize) -> slog::Result {
  562. s!(self, key, val);
  563. Ok(())
  564. }
  565. fn emit_isize(&mut self, key: &str, val: isize) -> slog::Result {
  566. s!(self, key, val);
  567. Ok(())
  568. }
  569. fn emit_u8(&mut self, key: &str, val: u8) -> slog::Result {
  570. s!(self, key, val);
  571. Ok(())
  572. }
  573. fn emit_i8(&mut self, key: &str, val: i8) -> slog::Result {
  574. s!(self, key, val);
  575. Ok(())
  576. }
  577. fn emit_u16(&mut self, key: &str, val: u16) -> slog::Result {
  578. s!(self, key, val);
  579. Ok(())
  580. }
  581. fn emit_i16(&mut self, key: &str, val: i16) -> slog::Result {
  582. s!(self, key, val);
  583. Ok(())
  584. }
  585. fn emit_u32(&mut self, key: &str, val: u32) -> slog::Result {
  586. s!(self, key, val);
  587. Ok(())
  588. }
  589. fn emit_i32(&mut self, key: &str, val: i32) -> slog::Result {
  590. s!(self, key, val);
  591. Ok(())
  592. }
  593. fn emit_f32(&mut self, key: &str, val: f32) -> slog::Result {
  594. s!(self, key, val);
  595. Ok(())
  596. }
  597. fn emit_u64(&mut self, key: &str, val: u64) -> slog::Result {
  598. s!(self, key, val);
  599. Ok(())
  600. }
  601. fn emit_i64(&mut self, key: &str, val: i64) -> slog::Result {
  602. s!(self, key, val);
  603. Ok(())
  604. }
  605. fn emit_f64(&mut self, key: &str, val: f64) -> slog::Result {
  606. s!(self, key, val);
  607. Ok(())
  608. }
  609. fn emit_str(&mut self, key: &str, val: &str) -> slog::Result {
  610. s!(self, key, val);
  611. Ok(())
  612. }
  613. fn emit_arguments(
  614. &mut self,
  615. key: &str,
  616. val: &fmt::Arguments,
  617. ) -> slog::Result {
  618. s!(self, key, val);
  619. Ok(())
  620. }
  621. }
  622. #[allow(unused_variables, unused_imports)]
  623. #[cfg(test)]
  624. mod tests {
  625. use super::*;
  626. use test::{black_box, Bencher};
  627. #[test]
  628. #[ignore]
  629. fn it_creates_a_logger() {
  630. let wm = WarningsManager::new("rust-test");
  631. let im = influx::writer(wm.tx.clone());
  632. let drain =
  633. WarningsDrain {
  634. tx: Arc::new(Mutex::new(wm.tx.clone())),
  635. drain: slog::Discard,
  636. to_file: Logger::root(slog::Discard, o!()),
  637. level: Level::Trace,
  638. };
  639. let logger = slog::Logger::root(drain, o!());
  640. }
  641. #[bench]
  642. fn it_sends_integers_with_a_sender_behind_a_mutex(b: &mut Bencher) {
  643. let (tx, rx) = channel();
  644. enum Msg {
  645. Val(usize),
  646. Terminate
  647. }
  648. let worker = thread::spawn(move || {
  649. let mut xs = Vec::new();
  650. loop {
  651. match rx.recv().unwrap() {
  652. Msg::Val(x) => { xs.push(x); }
  653. Msg::Terminate => break,
  654. }
  655. }
  656. xs.len()
  657. });
  658. let tx = Arc::new(Mutex::new(tx));
  659. b.iter(|| {
  660. let lock = tx.lock().unwrap();
  661. let _ = lock.send(Msg::Val(1));
  662. });
  663. let _ = tx.lock().unwrap().send(Msg::Terminate);
  664. let len = worker.join().unwrap();
  665. //println!("{}", len);
  666. }
  667. }