Browse Source

changes InfluxWriter to buffer size of 4096, or if 2 seconds since last send

master
Jonathan Strong 6 years ago
parent
commit
d1849fad78
2 changed files with 20 additions and 9 deletions
  1. +1
    -1
      Cargo.toml
  2. +19
    -8
      src/influx.rs

+ 1
- 1
Cargo.toml View File

@@ -1,6 +1,6 @@
[package] [package]
name = "logging" name = "logging"
version = "0.4.4"
version = "0.4.5"
authors = ["Jonathan Strong <jstrong@legis.io>"] authors = ["Jonathan Strong <jstrong@legis.io>"]


[[example]] [[example]]


+ 19
- 8
src/influx.rs View File

@@ -7,7 +7,7 @@ use std::sync::mpsc::{Sender, Receiver, channel, SendError};
use std::thread; use std::thread;
#[cfg(feature = "warnings")] #[cfg(feature = "warnings")]
use std::fs; use std::fs;
use std::time::Duration;
use std::time::{Instant, Duration};
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;


use hyper::status::StatusCode; use hyper::status::StatusCode;
@@ -33,6 +33,8 @@ pub use super::{dur_nanos, dt_nanos};


pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>; pub type Map<K, V> = OrderMap<K, V, BuildHasherDefault<FnvHasher>>;


pub const INFLUX_WRITER_MAX_BUFFER: usize = 4096;

pub fn new_map<K, V>(capacity: usize) -> Map<K, V> { pub fn new_map<K, V>(capacity: usize) -> Map<K, V> {
Map::with_capacity_and_hasher(capacity, Default::default()) Map::with_capacity_and_hasher(capacity, Default::default())
} }
@@ -269,11 +271,13 @@ impl InfluxWriter {
} }


#[allow(unused_assignments)] #[allow(unused_assignments)]
pub fn with_logger(host: &str, db: &str, buffer_size: u16, logger: Logger) -> Self {
pub fn with_logger(host: &str, db: &str, _buffer_size: u16, logger: Logger) -> Self {
let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel(); let (tx, rx): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = channel();


let buffer_size = INFLUX_WRITER_MAX_BUFFER;

#[cfg(feature = "no-influx-buffer")] #[cfg(feature = "no-influx-buffer")]
let buffer_size = 0u16;
let buffer_size = 0usize;


debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size); debug!(logger, "initializing url"; "host" => host, "db" => db, "buffer_size" => buffer_size);


@@ -288,6 +292,8 @@ impl InfluxWriter {
debug!(logger, "initializing buffers"); debug!(logger, "initializing buffers");
let mut buf = String::with_capacity(32 * 32 * 32); let mut buf = String::with_capacity(32 * 32 * 32);
let mut count = 0; let mut count = 0;
let mut last = Instant::now();
let mut loop_time = Instant::now();


let send = |buf: &str| { let send = |buf: &str| {
let resp = client.post(url.clone()) let resp = client.post(url.clone())
@@ -296,12 +302,15 @@ impl InfluxWriter {
match resp { match resp {


Ok(Response { status, .. }) if status == StatusCode::NoContent => { Ok(Response { status, .. }) if status == StatusCode::NoContent => {
debug!(logger, "server responded ok: 204 NoContent");
debug!(logger, "server responded ok: 204 NoContent");
} }


Ok(mut resp) => { Ok(mut resp) => {
let mut server_resp = String::with_capacity(1024);
let mut server_resp = String::with_capacity(32 * 1024); // need to allocate here bc will be
// sent to logging thread

let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0); let _ = resp.read_to_string(&mut server_resp); //.unwrap_or(0);

error!(logger, "influx server error"; error!(logger, "influx server error";
"status" => resp.status.to_string(), "status" => resp.status.to_string(),
"body" => server_resp); "body" => server_resp);
@@ -313,14 +322,14 @@ impl InfluxWriter {
} }
}; };


let next = |prev: u16, m: &OwnedMeasurement, buf: &mut String| -> u16 {
let next = |prev: usize, m: &OwnedMeasurement, buf: &mut String, loop_time: &Instant, last: &mut Instant| -> usize {
match prev { match prev {
0 if buffer_size > 0 => { 0 if buffer_size > 0 => {
serialize_owned(m, buf); serialize_owned(m, buf);
1 1
} }


n if n < buffer_size => {
n if n < buffer_size && *loop_time - *last < Duration::from_secs(2) => {
buf.push_str("\n"); buf.push_str("\n");
serialize_owned(m, buf); serialize_owned(m, buf);
n + 1 n + 1
@@ -331,6 +340,7 @@ impl InfluxWriter {
serialize_owned(m, buf); serialize_owned(m, buf);
debug!(logger, "sending buffer to influx"; "len" => n); debug!(logger, "sending buffer to influx"; "len" => n);
send(buf); send(buf);
*last = *loop_time;
buf.clear(); buf.clear();
0 0
} }
@@ -338,6 +348,7 @@ impl InfluxWriter {
}; };


loop { loop {
loop_time = Instant::now();
match rx.recv() { match rx.recv() {
Ok(Some(mut meas)) => { Ok(Some(mut meas)) => {


@@ -345,7 +356,7 @@ impl InfluxWriter {


//#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } } //#[cfg(feature = "trace")] { if count % 10 == 0 { trace!(logger, "rcvd new measurement"; "count" => count, "key" => meas.key); } }


count = next(count, &meas, &mut buf);
count = next(count, &meas, &mut buf, &loop_time, &mut last);
} }


Ok(None) => { Ok(None) => {


Loading…
Cancel
Save