You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

771 lines
23KB

  1. //! An object to handle everyone's errors
  2. //!
  3. use std::thread::{self, JoinHandle};
  4. use std::sync::{Arc, Mutex, RwLock};
  5. use std::sync::mpsc::{Sender, channel};
  6. use std::collections::{BTreeMap, VecDeque};
  7. use std::fmt::{self, Display, Error as FmtError, Formatter};
  8. use std::io::{self, Write};
  9. use std::fs;
  10. #[cfg(feature = "zmq")]
  11. use zmq;
  12. use chrono::{DateTime, Utc};
  13. use termion::color::{self, Fg, Bg};
  14. use influent::measurement::{Measurement, Value as InfluentValue};
  15. use slog::{self, OwnedKVList, Drain, Key, KV, Level, Logger};
  16. use sloggers::types::Severity;
  17. use super::{nanos, file_logger};
  18. use influx;
  19. const N_WARNINGS: usize = 500;
  20. #[macro_export]
  21. macro_rules! confirmed {
  22. ($warnings:ident, $($args:tt)*) => (
  23. {
  24. let _ = warnings.send(Warning::Confirmed( ( format!($($args)*) ) ) ).unwrap();
  25. }
  26. )
  27. }
  28. /// logs a `Warning::Awesome` message to the `WarningsManager`
  29. #[macro_export]
  30. macro_rules! awesome {
  31. ($warnings:ident, $($args:tt)*) => (
  32. {
  33. let _ = $warnings.send(Warning::Awesome( ( format!($($args)*) ) ) ).unwrap();
  34. }
  35. )
  36. }
  37. #[macro_export]
  38. macro_rules! critical {
  39. ($warnings:ident, $($args:tt)*) => (
  40. {
  41. let _ = $warnings.send(Warning::Critical( ( format!($($args)*) ) ) ).unwrap();
  42. }
  43. )
  44. }
  45. #[macro_export]
  46. macro_rules! notice {
  47. ($warnings:ident, $($args:tt)*) => (
  48. {
  49. let _ = $warnings.send(Warning::Notice( ( format!($($args)*) ) ) ).unwrap();
  50. }
  51. )
  52. }
  53. #[macro_export]
  54. macro_rules! error_w {
  55. ($warnings:ident, $($args:tt)*) => (
  56. {
  57. $warnings.send(Warning::Error( ( format!($($args)*) ) ) ).unwrap();
  58. }
  59. )
  60. }
  61. /// represents a non-fatal error somewhere in
  62. /// the system to report either to the program interface
  63. /// or in logs.
  64. ///
  65. #[derive(Debug, Clone, PartialEq)]
  66. pub enum Warning {
  67. Notice(String),
  68. Error(String),
  69. DegradedService(String),
  70. Critical(String),
  71. Confirmed(String),
  72. Awesome(String),
  73. Log {
  74. level: Level,
  75. module: &'static str,
  76. function: &'static str,
  77. line: u32,
  78. msg: String,
  79. kv: MeasurementRecord,
  80. },
  81. Terminate
  82. }
  83. impl Warning {
  84. pub fn msg(&self) -> String {
  85. match *self {
  86. Warning::Notice(ref s) | Warning::Error(ref s) |
  87. Warning::DegradedService(ref s) | Warning::Critical(ref s) |
  88. Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
  89. Warning::Log { msg: ref s, .. } =>
  90. s.clone(),
  91. Warning::Terminate => "".to_owned()
  92. }
  93. }
  94. pub fn msg_str(&self) -> &str {
  95. match *self {
  96. Warning::Notice(ref s) | Warning::Error(ref s) |
  97. Warning::DegradedService(ref s) | Warning::Critical(ref s) |
  98. Warning::Awesome(ref s) | Warning::Confirmed(ref s) |
  99. Warning::Log { msg: ref s, .. } =>
  100. s.as_ref(),
  101. Warning::Terminate => "Terminate"
  102. }
  103. }
  104. pub fn category_str(&self) -> &str {
  105. match self {
  106. &Warning::Notice(_) => "NOTC",
  107. &Warning::Error(_) => "ERRO",
  108. &Warning::Critical(_) => "CRIT",
  109. &Warning::DegradedService(_) => "DGRD",
  110. &Warning::Confirmed(_) => "CNFD",
  111. &Warning::Awesome(_) => "AWSM",
  112. &Warning::Log { ref level, .. } => level.as_short_str(),
  113. &Warning::Terminate => "TERM",
  114. }
  115. }
  116. pub fn category(&self, f: &mut Formatter) -> fmt::Result {
  117. match *self {
  118. Warning::Notice(_) => {
  119. write!(f, "[ Notice ]")
  120. }
  121. Warning::Error(_) => {
  122. write!(f, "{yellow}[{title}]{reset}",
  123. yellow = Fg(color::LightYellow),
  124. title = " Error--",
  125. reset = Fg(color::Reset))
  126. }
  127. Warning::Critical(_) => {
  128. write!(f, "{bg}{fg}{title}{resetbg}{resetfg}",
  129. bg = Bg(color::Red),
  130. fg = Fg(color::White),
  131. title = " CRITICAL ",
  132. resetbg = Bg(color::Reset),
  133. resetfg = Fg(color::Reset))
  134. }
  135. Warning::Awesome(_) => {
  136. write!(f, "{color}[{title}]{reset}",
  137. color = Fg(color::Green),
  138. title = "Awesome!",
  139. reset = Fg(color::Reset))
  140. }
  141. Warning::DegradedService(_) => {
  142. write!(f, "{color}[{title}] {reset}",
  143. color = Fg(color::Blue),
  144. title = "Degraded Service ",
  145. reset = Fg(color::Reset))
  146. }
  147. Warning::Confirmed(_) => {
  148. write!(f, "{bg}{fg}{title}{resetbg}{resetfg}",
  149. bg = Bg(color::Blue),
  150. fg = Fg(color::White),
  151. title = "Confirmed ",
  152. resetbg = Bg(color::Reset),
  153. resetfg = Fg(color::Reset))
  154. }
  155. _ => Ok(())
  156. }
  157. }
  158. }
  159. impl Display for Warning {
  160. fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
  161. self.category(f)?;
  162. write!(f, " {}", self.msg())
  163. }
  164. }
  165. #[derive(Debug, Clone)]
  166. pub struct Record {
  167. pub time: DateTime<Utc>,
  168. pub msg: Warning
  169. }
  170. impl Record {
  171. pub fn new(msg: Warning) -> Self {
  172. let time = Utc::now();
  173. Record { time, msg }
  174. }
  175. pub fn to_measurement(&self, name: &'static str) -> Measurement {
  176. let cat = self.msg.category_str();
  177. let body = self.msg.msg_str();
  178. let mut m = Measurement::new(name);
  179. m.add_tag("category", cat);
  180. m.add_field("msg", InfluentValue::String(body));
  181. m.set_timestamp(nanos(self.time) as i64);
  182. m
  183. }
  184. }
  185. impl Display for Record {
  186. fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
  187. write!(f, "{} | {}", self.time.format("%H:%M:%S"), self.msg)
  188. }
  189. }
  190. pub type SlogResult = Result<(), slog::Error>;
  191. #[derive(Debug, Clone, PartialEq)]
  192. pub enum Value {
  193. String(String),
  194. Float(f64),
  195. Integer(i64),
  196. Boolean(bool)
  197. }
  198. impl Value {
  199. pub fn to_influent<'a>(&'a self) -> InfluentValue<'a> {
  200. match self {
  201. &Value::String(ref s) => InfluentValue::String(s),
  202. &Value::Float(n) => InfluentValue::Float(n),
  203. &Value::Integer(i) => InfluentValue::Integer(i),
  204. &Value::Boolean(b) => InfluentValue::Boolean(b),
  205. }
  206. }
  207. }
  208. #[derive(Debug, Clone, PartialEq)]
  209. pub struct MeasurementRecord {
  210. fields: Vec<(Key, Value)>,
  211. tags: Vec<(Key, String)>,
  212. }
  213. impl MeasurementRecord {
  214. pub fn new() -> Self {
  215. MeasurementRecord {
  216. fields: Vec::new(),
  217. tags: Vec::new(),
  218. }
  219. }
  220. pub fn add_field(&mut self, key: Key, val: Value) -> SlogResult {
  221. self.fields.push((key, val));
  222. Ok(())
  223. }
  224. pub fn add_tag(&mut self, key: Key, val: String) -> SlogResult {
  225. match key {
  226. "exchange" | "thread" | "ticker" | "category" => {
  227. self.tags.push((key, val));
  228. }
  229. other => {
  230. self.add_field(other, Value::String(val)).unwrap();
  231. }
  232. }
  233. Ok(())
  234. }
  235. pub fn serialize_values(&mut self, record: &slog::Record, values: &OwnedKVList) {
  236. let mut builder = TagBuilder { mrec: self };
  237. let _ = values.serialize(record, &mut builder);
  238. }
  239. pub fn to_measurement<'a>(&'a self, name: &'a str) -> Measurement<'a> {
  240. let fields: BTreeMap<&'a str, InfluentValue<'a>> =
  241. self.fields.iter()
  242. .map(|&(k, ref v)| {
  243. (k, v.to_influent())
  244. }).collect();
  245. let tags: BTreeMap<&'a str, &'a str> =
  246. self.tags.iter()
  247. .map(|&(k, ref v)| {
  248. (k, v.as_ref())
  249. }).collect();
  250. Measurement {
  251. key: name,
  252. timestamp: Some(nanos(Utc::now()) as i64),
  253. fields,
  254. tags,
  255. }
  256. }
  257. }
  258. impl slog::Serializer for MeasurementRecord {
  259. fn emit_usize(&mut self, key: Key, val: usize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  260. fn emit_isize(&mut self, key: Key, val: isize) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  261. fn emit_bool(&mut self, key: Key, val: bool) -> SlogResult { self.add_field(key, Value::Boolean(val)) }
  262. fn emit_u8(&mut self, key: Key, val: u8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  263. fn emit_i8(&mut self, key: Key, val: i8) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  264. fn emit_u16(&mut self, key: Key, val: u16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  265. fn emit_i16(&mut self, key: Key, val: i16) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  266. fn emit_u32(&mut self, key: Key, val: u32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  267. fn emit_i32(&mut self, key: Key, val: i32) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  268. fn emit_f32(&mut self, key: Key, val: f32) -> SlogResult { self.add_field(key, Value::Float(val as f64)) }
  269. fn emit_u64(&mut self, key: Key, val: u64) -> SlogResult { self.add_field(key, Value::Integer(val as i64)) }
  270. fn emit_i64(&mut self, key: Key, val: i64) -> SlogResult { self.add_field(key, Value::Integer(val)) }
  271. fn emit_f64(&mut self, key: Key, val: f64) -> SlogResult { self.add_field(key, Value::Float(val)) }
  272. fn emit_str(&mut self, key: Key, val: &str) -> SlogResult { self.add_field(key, Value::String(val.to_string())) }
  273. fn emit_unit(&mut self, key: Key) -> SlogResult { self.add_field(key, Value::Boolean(true)) }
  274. fn emit_none(&mut self, _: Key) -> SlogResult { Ok(()) } //self.add_field(key, Value::String("none".into())) }
  275. fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult { self.add_field(key, Value::String(val.to_string())) }
  276. }
  277. pub struct TagBuilder<'a> {
  278. mrec: &'a mut MeasurementRecord
  279. }
  280. impl<'a> slog::Serializer for TagBuilder<'a> {
  281. fn emit_str(&mut self, key: Key, val: &str) -> SlogResult {
  282. match key {
  283. "exchange" | "thread" | "ticker" | "category" => {
  284. self.mrec.add_tag(key, val.to_string())
  285. }
  286. other => {
  287. self.mrec.add_field(other, Value::String(val.to_string()))
  288. }
  289. }
  290. }
  291. fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> SlogResult {
  292. match key {
  293. "exchange" | "thread" | "ticker" | "category" => {
  294. self.mrec.add_tag(key, val.to_string())
  295. }
  296. other => {
  297. self.mrec.add_field(other, Value::String(val.to_string()))
  298. }
  299. }
  300. }
  301. }
  302. pub struct WarningsDrain<D: Drain> {
  303. level: Level,
  304. tx: Arc<Mutex<Sender<Warning>>>,
  305. drain: D,
  306. to_file: Logger,
  307. }
  308. impl<D> WarningsDrain<D>
  309. where D: Drain
  310. {
  311. pub fn new(tx: Sender<Warning>, level: Level, drain: D) -> Self {
  312. let tx = Arc::new(Mutex::new(tx));
  313. let to_file = file_logger("var/log/mm.log", Severity::Warning);
  314. WarningsDrain { tx, drain, level, to_file }
  315. }
  316. }
  317. impl From<Sender<Warning>> for WarningsDrain<slog::Fuse<slog::Discard>> {
  318. fn from(tx: Sender<Warning>) -> Self {
  319. WarningsDrain::new(tx, Level::Debug, slog::Discard.fuse())
  320. }
  321. }
  322. impl<D: Drain> Drain for WarningsDrain<D> {
  323. type Ok = ();
  324. type Err = D::Err;
  325. fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
  326. if record.level() <= self.level {
  327. let mut ser = MeasurementRecord::new();
  328. ser.serialize_values(record, values);
  329. let _ = record.kv().serialize(record, &mut ser);
  330. let msg = record.msg().to_string();
  331. if let Ok(lock) = self.tx.lock() {
  332. let _ = lock.send(Warning::Log {
  333. level: record.level(),
  334. module: record.module(),
  335. function: record.function(),
  336. line: record.line(),
  337. msg,
  338. kv: ser
  339. });
  340. }
  341. }
  342. if record.level() <= Level::Warning {
  343. let _ = self.to_file.log(record);
  344. }
  345. let _ = self.drain.log(record, values)?;
  346. Ok(())
  347. }
  348. }
  349. #[derive(Debug)]
  350. pub struct WarningsManager {
  351. pub tx: Sender<Warning>,
  352. pub warnings: Arc<RwLock<VecDeque<Record>>>,
  353. thread: Option<JoinHandle<()>>
  354. }
  355. impl WarningsManager {
  356. /// `measurement_name` is the name of the influxdb measurement
  357. /// we will save log entries to.
  358. ///
  359. #[cfg(feature = "zmq")]
  360. pub fn new(measurement_name: &'static str) -> Self {
  361. let warnings = Arc::new(RwLock::new(VecDeque::new()));
  362. let warnings_copy = warnings.clone();
  363. let (tx, rx) = channel();
  364. let mut buf = String::with_capacity(4096);
  365. let ctx = zmq::Context::new();
  366. let socket = influx::push(&ctx).unwrap();
  367. let thread = thread::spawn(move || {
  368. let path = format!("var/log/warnings-manager-{}.log", measurement_name);
  369. let logger = file_logger(&path, Severity::Info);
  370. info!(logger, "entering loop");
  371. loop {
  372. if let Ok(msg) = rx.recv() {
  373. match msg {
  374. Warning::Terminate => {
  375. debug!(logger, "terminating");
  376. break;
  377. }
  378. Warning::Log { level, msg, kv, .. } => {
  379. debug!(logger, "new Warning::Debug arrived";
  380. "msg" => &msg);
  381. let mut meas = kv.to_measurement(measurement_name);
  382. meas.add_field("msg", InfluentValue::String(msg.as_ref()));
  383. meas.add_tag("category", level.as_short_str());
  384. influx::serialize(&meas, &mut buf);
  385. let _ = socket.send_str(&buf, 0);
  386. buf.clear();
  387. // and don't push to warnings
  388. // bc it's debug
  389. }
  390. other => {
  391. debug!(logger, "new {} arrived", other.category_str();
  392. "msg" => other.category_str());
  393. let rec = Record::new(other);
  394. {
  395. let m = rec.to_measurement(measurement_name);
  396. influx::serialize(&m, &mut buf);
  397. let _ = socket.send_str(&buf, 0);
  398. buf.clear();
  399. }
  400. if let Ok(mut lock) = warnings.write() {
  401. lock.push_front(rec);
  402. lock.truncate(N_WARNINGS);
  403. }
  404. }
  405. }
  406. }
  407. }
  408. });
  409. WarningsManager {
  410. warnings: warnings_copy,
  411. thread: Some(thread),
  412. tx
  413. }
  414. }
  415. }
  416. impl Drop for WarningsManager {
  417. fn drop(&mut self) {
  418. let _ = self.tx.send(Warning::Terminate);
  419. if let Some(thread) = self.thread.take() {
  420. thread.join().unwrap();
  421. }
  422. }
  423. }
  424. #[cfg(feature = "zmq")]
  425. #[allow(dead_code)]
  426. pub struct ZmqDrain<D>
  427. where D: Drain,
  428. {
  429. drain: D,
  430. ctx: zmq::Context,
  431. socket: zmq::Socket,
  432. buf: Arc<Mutex<Vec<u8>>>
  433. }
  434. #[cfg(feature = "zmq")]
  435. impl<D> ZmqDrain<D>
  436. where D: Drain,
  437. {
  438. pub fn new(drain: D) -> Self {
  439. let _ = fs::create_dir("/tmp/mm");
  440. let ctx = zmq::Context::new();
  441. let socket = ctx.socket(zmq::PUB).unwrap();
  442. socket.bind("ipc:///tmp/mm/log").expect("zmq publisher bind failed");
  443. let buf = Arc::new(Mutex::new(Vec::with_capacity(4096)));
  444. ZmqDrain {
  445. drain,
  446. ctx,
  447. socket,
  448. buf
  449. }
  450. }
  451. }
  452. const TIMESTAMP_FORMAT: &'static str = "%b %d %H:%M:%S%.3f";
  453. #[cfg(feature = "zmq")]
  454. impl<D> Drain for ZmqDrain<D>
  455. where D: Drain
  456. {
  457. type Ok = D::Ok;
  458. type Err = D::Err;
  459. fn log(&self, record: &slog::Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
  460. {
  461. let mut buf = self.buf.lock().unwrap();
  462. let _ = write!(buf, "{time} {level}",
  463. time = Utc::now().format(TIMESTAMP_FORMAT),
  464. level = record.level().as_short_str());
  465. {
  466. let mut thread_ser = ThreadSer(&mut buf);
  467. let _ = record.kv().serialize(record, &mut thread_ser);
  468. let _ = values.serialize(record, &mut thread_ser);
  469. }
  470. let _ = write!(buf, " {file:<20} {line:<5} {msg}",
  471. file = record.file(),
  472. line = record.line(),
  473. msg = record.msg());
  474. {
  475. let mut kv_ser = KvSer(&mut buf);
  476. // discarding any errors here...
  477. let _ = record.kv().serialize(record, &mut kv_ser);
  478. let _ = values.serialize(record, &mut kv_ser);
  479. }
  480. let _ = self.socket.send(&buf, 0);
  481. buf.clear();
  482. }
  483. self.drain.log(record, values)
  484. }
  485. }
  486. /// Can be used as a `Write` with `slog_term` and
  487. /// other libraries.
  488. ///
  489. #[cfg(feature = "zmq")]
  490. #[allow(dead_code)]
  491. pub struct ZmqIo {
  492. ctx: zmq::Context,
  493. socket: zmq::Socket,
  494. buf: Vec<u8>
  495. }
  496. #[cfg(feature = "zmq")]
  497. impl ZmqIo {
  498. pub fn new(addr: &str) -> Self {
  499. let _ = fs::create_dir("/tmp/mm");
  500. let ctx = zmq::Context::new();
  501. let socket = ctx.socket(zmq::PUB).unwrap();
  502. let addr = format!("ipc:///tmp/mm/{}", addr);
  503. socket.bind(&addr).expect("zmq publisher bind failed");
  504. let buf = Vec::with_capacity(4096);
  505. ZmqIo { ctx, socket, buf }
  506. }
  507. }
  508. #[cfg(feature = "zmq")]
  509. impl Write for ZmqIo {
  510. fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
  511. self.buf.write(buf)
  512. }
  513. fn flush(&mut self) -> io::Result<()> {
  514. match self.buf.pop() {
  515. Some(b'\n') => {
  516. let _ = self.socket.send(&self.buf, 0);
  517. }
  518. Some(other) => {
  519. self.buf.push(other);
  520. let _ = self.socket.send(&self.buf, 0);
  521. }
  522. None => {
  523. return Ok(());
  524. }
  525. }
  526. self.buf.clear();
  527. Ok(())
  528. }
  529. }
  530. /// Serializes *only* KV pair with `key == "thread"`
  531. ///
  532. struct ThreadSer<'a>(&'a mut Vec<u8>);
  533. impl<'a> slog::ser::Serializer for ThreadSer<'a> {
  534. fn emit_arguments(&mut self, _: &str, _: &fmt::Arguments) -> slog::Result {
  535. Ok(())
  536. }
  537. fn emit_str(&mut self, key: &str, val: &str) -> slog::Result {
  538. if key == "thread" {
  539. write!(self.0, " {:<20}", val)?;
  540. }
  541. Ok(())
  542. }
  543. }
  544. /// Serializes KV pairs as ", k: v"
  545. ///
  546. struct KvSer<'a>(&'a mut Vec<u8>);
  547. macro_rules! s(
  548. ($s:expr, $k:expr, $v:expr) => {
  549. try!(write!($s.0, ", {}: {}", $k, $v));
  550. };
  551. );
  552. impl<'a> slog::ser::Serializer for KvSer<'a> {
  553. fn emit_none(&mut self, key: &str) -> slog::Result {
  554. s!(self, key, "None");
  555. Ok(())
  556. }
  557. fn emit_unit(&mut self, key: &str) -> slog::Result {
  558. s!(self, key, "()");
  559. Ok(())
  560. }
  561. fn emit_bool(&mut self, key: &str, val: bool) -> slog::Result {
  562. s!(self, key, val);
  563. Ok(())
  564. }
  565. fn emit_char(&mut self, key: &str, val: char) -> slog::Result {
  566. s!(self, key, val);
  567. Ok(())
  568. }
  569. fn emit_usize(&mut self, key: &str, val: usize) -> slog::Result {
  570. s!(self, key, val);
  571. Ok(())
  572. }
  573. fn emit_isize(&mut self, key: &str, val: isize) -> slog::Result {
  574. s!(self, key, val);
  575. Ok(())
  576. }
  577. fn emit_u8(&mut self, key: &str, val: u8) -> slog::Result {
  578. s!(self, key, val);
  579. Ok(())
  580. }
  581. fn emit_i8(&mut self, key: &str, val: i8) -> slog::Result {
  582. s!(self, key, val);
  583. Ok(())
  584. }
  585. fn emit_u16(&mut self, key: &str, val: u16) -> slog::Result {
  586. s!(self, key, val);
  587. Ok(())
  588. }
  589. fn emit_i16(&mut self, key: &str, val: i16) -> slog::Result {
  590. s!(self, key, val);
  591. Ok(())
  592. }
  593. fn emit_u32(&mut self, key: &str, val: u32) -> slog::Result {
  594. s!(self, key, val);
  595. Ok(())
  596. }
  597. fn emit_i32(&mut self, key: &str, val: i32) -> slog::Result {
  598. s!(self, key, val);
  599. Ok(())
  600. }
  601. fn emit_f32(&mut self, key: &str, val: f32) -> slog::Result {
  602. s!(self, key, val);
  603. Ok(())
  604. }
  605. fn emit_u64(&mut self, key: &str, val: u64) -> slog::Result {
  606. s!(self, key, val);
  607. Ok(())
  608. }
  609. fn emit_i64(&mut self, key: &str, val: i64) -> slog::Result {
  610. s!(self, key, val);
  611. Ok(())
  612. }
  613. fn emit_f64(&mut self, key: &str, val: f64) -> slog::Result {
  614. s!(self, key, val);
  615. Ok(())
  616. }
  617. fn emit_str(&mut self, key: &str, val: &str) -> slog::Result {
  618. s!(self, key, val);
  619. Ok(())
  620. }
  621. fn emit_arguments(
  622. &mut self,
  623. key: &str,
  624. val: &fmt::Arguments,
  625. ) -> slog::Result {
  626. s!(self, key, val);
  627. Ok(())
  628. }
  629. }
  630. #[allow(unused_variables, unused_imports)]
  631. #[cfg(test)]
  632. mod tests {
  633. use super::*;
  634. use test::{black_box, Bencher};
  635. #[test]
  636. #[ignore]
  637. fn it_creates_a_logger() {
  638. let wm = WarningsManager::new("rust-test");
  639. let im = influx::writer(wm.tx.clone());
  640. let drain =
  641. WarningsDrain {
  642. tx: Arc::new(Mutex::new(wm.tx.clone())),
  643. drain: slog::Discard,
  644. to_file: Logger::root(slog::Discard, o!()),
  645. level: Level::Trace,
  646. };
  647. let logger = slog::Logger::root(drain, o!());
  648. }
  649. #[bench]
  650. fn it_sends_integers_with_a_sender_behind_a_mutex(b: &mut Bencher) {
  651. let (tx, rx) = channel();
  652. enum Msg {
  653. Val(usize),
  654. Terminate
  655. }
  656. let worker = thread::spawn(move || {
  657. let mut xs = Vec::new();
  658. loop {
  659. match rx.recv().unwrap() {
  660. Msg::Val(x) => { xs.push(x); }
  661. Msg::Terminate => break,
  662. }
  663. }
  664. xs.len()
  665. });
  666. let tx = Arc::new(Mutex::new(tx));
  667. b.iter(|| {
  668. let lock = tx.lock().unwrap();
  669. let _ = lock.send(Msg::Val(1));
  670. });
  671. let _ = tx.lock().unwrap().send(Msg::Terminate);
  672. let len = worker.join().unwrap();
  673. //println!("{}", len);
  674. }
  675. }