Browse Source

moving stuff around

master
Jonathan Strong 7 years ago
parent
commit
021f96d2a1
1 changed files with 204 additions and 197 deletions
  1. +204
    -197
      src/influx.rs

+ 204
- 197
src/influx.rs View File

@@ -105,67 +105,151 @@ macro_rules! measure {
}}; }};
} }


#[test]
fn it_checks_color_tag_error_in_non_doctest() {
let (tx, rx) = channel();
measure!(tx, test, tag[color;"red"], int[n;1]);
let meas: OwnedMeasurement = rx.recv().unwrap();
assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
/// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
/// incoming `Measurement`s that way *in addition* to the old socket/`String`
/// method
///
pub struct InfluxWriter {
kill_switch: Sender<()>,
thread: Option<thread::JoinHandle<()>>,
} }


#[test]
fn it_uses_the_measure_macro() {
let (tx, rx) = channel();
measure!(tx, test_measurement,
tag [ one => "a" ],
tag [ two => "b" ],
int [ three => 2 ],
float [ four => 1.2345 ],
string [ five => String::from("d") ],
bool [ six => true ],
int [ seven => { 1 + 2 } ],
time [ 1 ]
);
thread::sleep_ms(10);
let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.tags.get("two"), Some(&"b"));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.timestamp, Some(1));
}
impl InfluxWriter {


#[test]
fn it_uses_the_measure_macro_alt_syntax() {

let (tx, rx) = channel();
measure!(tx, test_measurement,
tag[one; "a"],
tag[two; "b"],
int[three; 2],
float[four; 1.2345],
string[five; String::from("d")],
bool [ six => true ],
int[seven; { 1 + 2 }],
time[1]
);

thread::sleep_ms(10);
let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.tags.get("two"), Some(&"b"));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.timestamp, Some(1));
pub fn new(log_path: &str, warnings: Sender<Warning>) -> (Self, Sender<OwnedMeasurement>) {
let (kill_switch, terminate) = channel();
let (tx, rx) = channel();
let logger = file_logger(log_path, Severity::Info);
let thread = thread::spawn(move || {
info!(logger, "initializing zmq");
let _ = fs::create_dir("/tmp/mm");
let ctx = zmq::Context::new();
let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
info!(logger, "initializing url";
"DB_HOST" => DB_HOST,
"DB_NAME" => DB_NAME);
let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
let client = Client::new();
info!(logger, "initializing buffers");
let mut meas_buf = String::with_capacity(4096);
let mut buf = String::with_capacity(4096);
let mut server_resp = String::with_capacity(4096);
let mut count = 0;

let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
trace!(logger, "appending serialized measurement to buffer";
"prev" => prev,
"buf.len()" => buf.len());
match prev {
0 => {
buf.push_str(s);
1
}

n @ 1 ... N_BUFFER => {
buf.push_str("\n");
buf.push_str(s);
n + 1
}

_ => {
buf.push_str("\n");
if s.len() > 0 {
buf.push_str(s);
}
trace!(logger, "sending buffer to influx";
"buf.len()" => buf.len());

let resp = client.post(url.clone())
.body(buf.as_str())
.send();
match resp {

Ok(Response { status, .. }) if status == StatusCode::NoContent => {
debug!(logger, "server responded ok: 204 NoContent");
}

Ok(mut resp) => {
let mut server_resp = String::with_capacity(1024);
//server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
//server_resp.push_str(&buf);
//server_resp.push_str("\nreceived:\n");
resp.read_to_string(&mut server_resp); //.unwrap_or(0);
error!(logger, "influx server error";
"status" => resp.status.to_string(),
"body" => server_resp);
}

Err(why) => {
error!(logger, "http request failed: {:?}", why);
// warnings.send(
// Warning::Error(
// format!("Influx write error: {}", why)));
}
}
buf.clear();
0
}
}
};

let mut rcvd_msg = false;

loop {
rcvd_msg = false;
rx.try_recv()
.map(|meas| {
debug!(logger, "rcvd new OwnedMeasurement";
"count" => count);
serialize_owned(&meas, &mut meas_buf);
count = next(count, &meas_buf, &mut buf);
meas_buf.clear();
rcvd_msg = true;
});

socket.recv_bytes(zmq::DONTWAIT).ok()
.and_then(|bytes| {
String::from_utf8(bytes).ok()
}).map(|s| {
debug!(logger, "rcvd new serialized";
"count" => count);
count = next(count, &s, &mut buf);
rcvd_msg = true;
});

let end = terminate.try_recv()
.map(|_| {
let _ = next(::std::u8::MAX, "", &mut buf);
true
}).unwrap_or(false);

if end { break }

#[cfg(feature = "no-thrash")]
{
if !rcvd_msg {
thread::sleep(Duration::new(0, 5000));
}
}
}

crit!(logger, "goodbye");
});
let writer = InfluxWriter {
kill_switch,
thread: Some(thread)
};
(writer, tx)
}
} }


#[test]
fn try_to_break_measure_macro() {
let (tx, _) = channel();
measure!(tx, one, tag[x=>"y"], int[n;1]);
measure!(tx, one, tag[x;"y"], int[n;1],);
impl Drop for InfluxWriter {
fn drop(&mut self) {
let _ = self.kill_switch.send(()).unwrap();
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
} }


pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> { pub fn pull(ctx: &zmq::Context) -> Result<zmq::Socket, zmq::Error> {
@@ -281,6 +365,11 @@ pub fn serialize(measurement: &Measurement, line: &mut String) {
} }
} }


/// Serializes an `&OwnedMeasurement` as influx line protocol into `line`.
///
/// The serialized measurement is appended to the end of the string without
/// any regard for what exited in it previously.
///
pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) {
line.push_str(&escape_tag(measurement.key)); line.push_str(&escape_tag(measurement.key));


@@ -449,156 +538,74 @@ pub fn dur_nanos(d: ::std::time::Duration) -> i64 {


//pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) } //pub fn now() -> i64 { ::latency::dt_nanos(Utc::now()) }


/// exactly like `writer`, but also returns a `Sender<Measurement>` and accepts
/// incoming `Measurement`s that way *in addition* to the old socket/`String`
/// method
///
pub struct InfluxWriter {
kill_switch: Sender<()>,
thread: Option<thread::JoinHandle<()>>,
}

impl InfluxWriter {

pub fn new(log_path: &str, warnings: Sender<Warning>) -> (Self, Sender<OwnedMeasurement>) {
let (kill_switch, terminate) = channel();
let (tx, rx) = channel();
let logger = file_logger(log_path, Severity::Info);
let thread = thread::spawn(move || {
info!(logger, "initializing zmq");
let _ = fs::create_dir("/tmp/mm");
let ctx = zmq::Context::new();
let socket = pull(&ctx).expect("influx::writer failed to create pull socket");
info!(logger, "initializing url";
"DB_HOST" => DB_HOST,
"DB_NAME" => DB_NAME);
let url = Url::parse_with_params(DB_HOST, &[("db", DB_NAME), ("precision", "ns")]).expect("influx writer url should parse");
let client = Client::new();
info!(logger, "initializing buffers");
let mut meas_buf = String::with_capacity(4096);
let mut buf = String::with_capacity(4096);
let mut server_resp = String::with_capacity(4096);
let mut count = 0;

let next = |prev: u8, s: &str, buf: &mut String| -> u8 {
trace!(logger, "appending serialized measurement to buffer";
"prev" => prev,
"buf.len()" => buf.len());
match prev {
0 => {
buf.push_str(s);
1
}

n @ 1 ... N_BUFFER => {
buf.push_str("\n");
buf.push_str(s);
n + 1
}

_ => {
buf.push_str("\n");
if s.len() > 0 {
buf.push_str(s);
}
trace!(logger, "sending buffer to influx";
"buf.len()" => buf.len());

let resp = client.post(url.clone())
.body(buf.as_str())
.send();
match resp {

Ok(Response { status, .. }) if status == StatusCode::NoContent => {
debug!(logger, "server responded ok: 204 NoContent");
}

Ok(mut resp) => {
let mut server_resp = String::with_capacity(1024);
//server_resp.push_str(&format!("sent at {}:\n", Utc::now()));
//server_resp.push_str(&buf);
//server_resp.push_str("\nreceived:\n");
resp.read_to_string(&mut server_resp); //.unwrap_or(0);
error!(logger, "influx server error";
"status" => resp.status.to_string(),
"body" => server_resp);
}

Err(why) => {
error!(logger, "http request failed: {:?}", why);
// warnings.send(
// Warning::Error(
// format!("Influx write error: {}", why)));
}
}
buf.clear();
0
}
}
};

let mut rcvd_msg = false;


loop {
rcvd_msg = false;
rx.try_recv()
.map(|meas| {
debug!(logger, "rcvd new OwnedMeasurement";
"count" => count);
serialize_owned(&meas, &mut meas_buf);
count = next(count, &meas_buf, &mut buf);
meas_buf.clear();
rcvd_msg = true;
});


socket.recv_bytes(zmq::DONTWAIT).ok()
.and_then(|bytes| {
String::from_utf8(bytes).ok()
}).map(|s| {
debug!(logger, "rcvd new serialized";
"count" => count);
count = next(count, &s, &mut buf);
rcvd_msg = true;
});
mod tests {
use super::*;
use test::{black_box, Bencher};


let end = terminate.try_recv()
.map(|_| {
let _ = next(::std::u8::MAX, "", &mut buf);
true
}).unwrap_or(false);
#[test]
fn it_checks_color_tag_error_in_non_doctest() {
let (tx, rx) = channel();
measure!(tx, test, tag[color;"red"], int[n;1]);
let meas: OwnedMeasurement = rx.recv().unwrap();
assert_eq!(meas.tags.get("color"), Some(&"red"), "meas = \n {:?} \n", meas);
}


if end { break }
#[test]
fn it_uses_the_measure_macro() {
let (tx, rx) = channel();
measure!(tx, test_measurement,
tag [ one => "a" ],
tag [ two => "b" ],
int [ three => 2 ],
float [ four => 1.2345 ],
string [ five => String::from("d") ],
bool [ six => true ],
int [ seven => { 1 + 2 } ],
time [ 1 ]
);
thread::sleep_ms(10);
let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.tags.get("two"), Some(&"b"));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.timestamp, Some(1));
}


#[cfg(feature = "no-thrash")]
{
if !rcvd_msg {
thread::sleep(Duration::new(0, 5000));
}
}
}
#[test]
fn it_uses_the_measure_macro_alt_syntax() {


crit!(logger, "goodbye");
});
let writer = InfluxWriter {
kill_switch,
thread: Some(thread)
};
(writer, tx)
let (tx, rx) = channel();
measure!(tx, test_measurement,
tag[one; "a"],
tag[two; "b"],
int[three; 2],
float[four; 1.2345],
string[five; String::from("d")],
bool [ six => true ],
int[seven; { 1 + 2 }],
time[1]
);

thread::sleep_ms(10);
let meas: OwnedMeasurement = rx.try_recv().unwrap();
assert_eq!(meas.key, "test_measurement");
assert_eq!(meas.tags.get("one"), Some(&"a"));
assert_eq!(meas.tags.get("two"), Some(&"b"));
assert_eq!(meas.fields.get("three"), Some(&OwnedValue::Integer(2)));
assert_eq!(meas.fields.get("seven"), Some(&OwnedValue::Integer(3)));
assert_eq!(meas.timestamp, Some(1));
} }
}


impl Drop for InfluxWriter {
fn drop(&mut self) {
let _ = self.kill_switch.send(()).unwrap();
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
#[test]
fn try_to_break_measure_macro() {
let (tx, _) = channel();
measure!(tx, one, tag[x=>"y"], int[n;1]);
measure!(tx, one, tag[x;"y"], int[n;1],);
} }
}

mod tests {
use super::*;
use test::{black_box, Bencher};


#[bench] #[bench]
fn measure_macro_small(b: &mut Bencher) { fn measure_macro_small(b: &mut Bencher) {


Loading…
Cancel
Save