Browse Source

Merge branch 'v0.2'

Accepts v0.2 changes as the new master branch.
master
Jonathan Strong 7 years ago
parent
commit
1ccb151eb2
3 changed files with 64 additions and 61 deletions
  1. +1
    -0
      .gitignore
  2. +1
    -2
      Cargo.toml
  3. +62
    -59
      src/influx.rs

+ 1
- 0
.gitignore View File

@@ -2,3 +2,4 @@
**/*.rs.bk **/*.rs.bk
Cargo.lock Cargo.lock
.*.swp .*.swp
/var/*.log

+ 1
- 2
Cargo.toml View File

@@ -1,6 +1,6 @@
[package] [package]
name = "logging" name = "logging"
version = "0.1.0"
version = "0.2.0"
authors = ["Jonathan Strong <jstrong@legis.io>"] authors = ["Jonathan Strong <jstrong@legis.io>"]


[dependencies] [dependencies]
@@ -13,7 +13,6 @@ termion = "1.4.0"
slog = "2.0.6" slog = "2.0.6"
sloggers = "0.2" sloggers = "0.2"
slog-term = "2" slog-term = "2"
# shuteye = "^0"
# chashmap = "2" # chashmap = "2"


windows = { path = "../windows" } windows = { path = "../windows" }


+ 62
- 59
src/influx.rs View File

@@ -3,7 +3,7 @@


use std::iter::FromIterator; use std::iter::FromIterator;
use std::io::{Write, Read}; use std::io::{Write, Read};
use std::sync::mpsc::{Sender, Receiver, channel};
use std::sync::mpsc::{Sender, Receiver, channel, SendError};
use std::thread; use std::thread;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::{self, OpenOptions}; use std::fs::{self, OpenOptions};
@@ -29,7 +29,7 @@ const DB_HOST: &'static str = "http://washington.0ptimus.internal:8086/write";
const ZMQ_RCV_HWM: i32 = 0; const ZMQ_RCV_HWM: i32 = 0;
const ZMQ_SND_HWM: i32 = 0; const ZMQ_SND_HWM: i32 = 0;


const N_BUFFER: u8 = 160;
const N_BUFFER: u8 = 80;


/// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`. /// Provides flexible and ergonomic use of `Sender<OwnedMeasurement>`.
/// ///
@@ -89,7 +89,6 @@ macro_rules! measure {
(@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; (@kv $t:tt, $meas:ident, $k:tt => $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
(@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) }; (@kv $t:tt, $meas:ident, $k:tt; $v:expr) => { measure!(@ea $t, $meas, stringify!($k), $v) };
(@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) }; (@kv time, $meas:ident, $tm:expr) => { $meas = $meas.set_timestamp($tm as i64) };

(@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); }; (@ea tag, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_tag($k, $v); };
(@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) }; (@ea int, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Integer($v)) };
(@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) }; (@ea float, $meas:ident, $k:expr, $v:expr) => { $meas = $meas.add_field($k, $crate::influx::OwnedValue::Float($v)) };
@@ -105,41 +104,51 @@ macro_rules! measure {
}}; }};
} }


/// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
/// incoming `Measurement`s that way *in addition* to the old socket/`String`
/// method
///
/// Holds a thread (and provides an interface to it) that serializes `OwnedMeasurement`s
/// it receives (over a SPSC channel) and inserts to influxdb via http when `N_BUFFER`
/// measurements have accumulated.
///
pub struct InfluxWriter { pub struct InfluxWriter {
host: &'static str,
db: &'static str,
tx: Sender<OwnedMeasurement>,
kill_switch: Sender<()>, kill_switch: Sender<()>,
thread: Option<thread::JoinHandle<()>>, thread: Option<thread::JoinHandle<()>>,
} }


impl Default for InfluxWriter {
fn default() -> Self {
InfluxWriter::new("washington.0ptimus.internal", "mm_test", "var/default.log")
}
}

impl InfluxWriter { impl InfluxWriter {
/// Sends the `OwnedMeasurement` to the serialization thread.
///
pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<OwnedMeasurement>> {
self.tx.send(m)
}


pub fn new(log_path: &str, warnings: Sender<Warning>) -> (Self, Sender<OwnedMeasurement>) {
pub fn new(host: &'static str, db: &'static str, log_path: &str) -> Self {
let (kill_switch, terminate) = channel(); let (kill_switch, terminate) = channel();
let (tx, rx) = channel(); let (tx, rx) = channel();
let logger = file_logger(log_path, Severity::Info); let logger = file_logger(log_path, Severity::Info);
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
info!(logger, "initializing zmq");
let _ = fs::create_dir("/tmp/mm");
let ctx = zmq::Context::new();
let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
info!(logger, "initializing url"; info!(logger, "initializing url";
"DB_HOST" => DB_HOST,
"DB_NAME" => DB_NAME);
let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
"DB_HOST" => host,
"DB_NAME" => db);
let url = Url::parse_with_params(&format!("http://{}:8086/write", host), &[("db", db), ("precision", "ns")]).expect("influx writer url should parse");
let client = Client::new(); let client = Client::new();
info!(logger, "initializing buffers"); info!(logger, "initializing buffers");
let mut meas_buf = String::with_capacity(4096);
let mut buf = String::with_capacity(4096);
let mut server_resp = String::with_capacity(4096);
let mut meas_buf = String::with_capacity(32 * 32 * 32);
let mut buf = String::with_capacity(32 * 32 * 32);
let mut count = 0; let mut count = 0;


let next = |prev: u8, s: &str, buf: &mut String| -> u8 { let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
trace!(logger, "appending serialized measurement to buffer"; trace!(logger, "appending serialized measurement to buffer";
"prev" => prev, "prev" => prev,
"buf.len()" => buf.len()); "buf.len()" => buf.len());

match prev { match prev {
0 => { 0 => {
buf.push_str(s); buf.push_str(s);
@@ -160,31 +169,28 @@ impl InfluxWriter {
trace!(logger, "sending buffer to influx"; trace!(logger, "sending buffer to influx";
"buf.len()" => buf.len()); "buf.len()" => buf.len());


let resp = client.post(url.clone())
.body(buf.as_str())
.send();
match resp {
#[cfg(not(test))]
{
let resp = client.post(url.clone())
.body(buf.as_str())
.send();
match resp {


Ok(Response { status, .. }) if status == StatusCode::NoContent => {
debug!(logger, "server responded ok: 204 NoContent");
}
Ok(Response { status, .. }) if status == StatusCode::NoContent => {
trace!(logger, "server responded ok: 204 NoContent");
}


Ok(mut resp) => {
let mut server_resp = String::with_capacity(1024);
//server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
//server_resp.push_str(&buf);
//server_resp.push_str("\nreceived:\n");
resp.read_to_string(&mut server_resp); //.unwrap_or(0);
error!(logger, "influx server error";
"status" => resp.status.to_string(),
"body" => server_resp);
}
Ok(mut resp) => {
let mut server_resp = String::with_capacity(1024);
resp.read_to_string(&mut server_resp); //.unwrap_or(0);
error!(logger, "influx server error";
"status" => resp.status.to_string(),
"body" => server_resp);
}


Err(why) => {
error!(logger, "http request failed: {:?}", why);
// warnings.send(
// Warning::Error(
// format!("Influx write error: {}", why)));
Err(why) => {
error!(logger, "http request failed: {:?}", why);
}
} }
} }
buf.clear(); buf.clear();
@@ -197,26 +203,15 @@ impl InfluxWriter {


loop { loop {
rcvd_msg = false; rcvd_msg = false;
rx.try_recv()
rx.recv_timeout(Duration::from_millis(10))
.map(|meas| { .map(|meas| {
debug!(logger, "rcvd new OwnedMeasurement";
"count" => count);
trace!(logger, "rcvd new OwnedMeasurement"; "count" => count);
serialize_owned(&meas, &mut meas_buf); serialize_owned(&meas, &mut meas_buf);
count = next(count, &meas_buf, &mut buf); count = next(count, &meas_buf, &mut buf);
meas_buf.clear(); meas_buf.clear();
rcvd_msg = true; rcvd_msg = true;
}); });


socket.recv_bytes(zmq::DONTWAIT).ok()
.and_then(|bytes| {
String::from_utf8(bytes).ok()
}).map(|s| {
debug!(logger, "rcvd new serialized";
"count" => count);
count = next(count, &s, &mut buf);
rcvd_msg = true;
});

let end = terminate.try_recv() let end = terminate.try_recv()
.map(|_| { .map(|_| {
let _ = next(::std::u8::MAX, "", &mut buf); let _ = next(::std::u8::MAX, "", &mut buf);
@@ -235,11 +230,14 @@ impl InfluxWriter {


crit!(logger, "goodbye"); crit!(logger, "goodbye");
}); });
let writer = InfluxWriter {

InfluxWriter {
host,
db,
tx,
kill_switch, kill_switch,
thread: Some(thread) thread: Some(thread)
};
(writer, tx)
}
} }
} }


@@ -536,14 +534,19 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 {
(d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64 (d.as_secs() * 1_000_000_000_u64 + (d.subsec_nanos() as u64)) as i64
} }


//pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) }




mod tests { mod tests {
use super::*; use super::*;
use test::{black_box, Bencher}; use test::{black_box, Bencher};


#[bench]
fn influx_writer_send(b: &mut Bencher) {
let m = InfluxWriter::default();
b.iter(|| {
measure!(m, test, tag[color; "red"], int[n; 1], float[p; 1.234]);
});
}

#[test] #[test]
fn it_checks_color_tag_error_in_non_doctest() { fn it_checks_color_tag_error_in_non_doctest() {
let (tx, rx) = channel(); let (tx, rx) = channel();


Loading…
Cancel
Save