Browse Source

validate memory scaling, add logging

- adds example to repeatedly send many measurements to InfluxWriter for purposes of stress testing (i.e. shut influxdb off, see if InfluxWriter recovers correctly)
- InfluxWriter now tracks allocated buffer memory and logs it
- lower INITIAL_BACKLOG
master
Jonathan Strong 4 years ago
parent
commit
9178b7a13b
3 changed files with 113 additions and 11 deletions
  1. +7
    -1
      Cargo.toml
  2. +56
    -0
      examples/write.rs
  3. +50
    -10
      src/lib.rs

+ 7
- 1
Cargo.toml View File

@@ -1,6 +1,6 @@
[package] [package]
name = "influx-writer" name = "influx-writer"
version = "0.7.0"
version = "0.8.0"
authors = ["Jonathan Strong <jonathan.strong@gmail.com>"] authors = ["Jonathan Strong <jonathan.strong@gmail.com>"]
edition = "2018" edition = "2018"


@@ -8,6 +8,11 @@ edition = "2018"
name = "influx_writer" name = "influx_writer"
path = "src/lib.rs" path = "src/lib.rs"


[[example]]
name = "write"
path = "examples/write.rs"
required-features = ["signal-hook"]

[dependencies] [dependencies]
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
hyper = "0.10" hyper = "0.10"
@@ -18,6 +23,7 @@ slog-async = "2"
smallvec = "0.6" smallvec = "0.6"
crossbeam-channel = "0.3" crossbeam-channel = "0.3"
pretty_toa = "1.0.0" pretty_toa = "1.0.0"
signal-hook = { version = "0.1.15", optional = true }


decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } decimal = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" }
decimal-macros = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" } decimal-macros = { git = "https://github.com/jonathanstrong/decimal", branch = "v2.3.x" }


+ 56
- 0
examples/write.rs View File

@@ -0,0 +1,56 @@
#[macro_use]
extern crate slog;

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::*;
use std::thread;
use slog::Drain;
use pretty_toa::ThousandsSep;
use chrono::prelude::*;
use influx_writer::{InfluxWriter, measure};

const DELAY: Duration = Duration::from_millis(1);
const N_PER: usize = 567;

fn main() {
let start = Instant::now();
let term = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::SIGINT, Arc::clone(&term)).unwrap();
signal_hook::flag::register(signal_hook::SIGTERM, Arc::clone(&term)).unwrap();
signal_hook::flag::register(signal_hook::SIGQUIT, Arc::clone(&term)).unwrap();


let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
let root = slog::Logger::root(drain, o!("version" => "0.1"));

let logger = root.new(o!("thread" => "main"));

let influx = InfluxWriter::with_logger("localhost", "test", &root);

let mut n = 0;

loop {
if term.load(Ordering::Relaxed) {
info!(logger, "exiting...");
break
}

let mut now = Utc::now().timestamp_nanos();
for _ in 0..N_PER {
measure!(influx, example, i(n, 1), tm(now));
now += 1;
n += 1;
}

thread::sleep(DELAY);
}
drop(influx);

let took = Instant::now() - start;

info!(logger, "wrote {} measurements in {:?}", n.thousands_sep(), took);

}

+ 50
- 10
src/lib.rs View File

@@ -369,12 +369,12 @@ impl InfluxWriter {
const N_BUFFER_LINES: usize = 1024; const N_BUFFER_LINES: usize = 1024;
const MAX_PENDING: Duration = Duration::from_secs(3); const MAX_PENDING: Duration = Duration::from_secs(3);
const INITIAL_BUFFER_CAPACITY: usize = 4096; const INITIAL_BUFFER_CAPACITY: usize = 4096;
const INITIAL_BACKLOG: usize = 128;
const MAX_BACKLOG: usize = 1024; const MAX_BACKLOG: usize = 1024;
const MAX_OUTSTANDING_HTTP: usize = 64; const MAX_OUTSTANDING_HTTP: usize = 64;
const DEBUG_HB_EVERY: usize = 1024 * 96; const DEBUG_HB_EVERY: usize = 1024 * 96;
const INFO_HB_EVERY: usize = 1024 * 1024; const INFO_HB_EVERY: usize = 1024 * 1024;
const N_HTTP_ATTEMPTS: u32 = 15; const N_HTTP_ATTEMPTS: u32 = 15;
const INITIAL_BACKLOG: usize = MAX_OUTSTANDING_HTTP * 2;


let client = Arc::new(Client::new()); let client = Arc::new(Client::new());


@@ -398,7 +398,7 @@ impl InfluxWriter {
let mut backlog: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG); let mut backlog: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG);


for _ in 0..INITIAL_BACKLOG { for _ in 0..INITIAL_BACKLOG {
spares.push_back(String::with_capacity(1024));
spares.push_back(String::with_capacity(INITIAL_BUFFER_CAPACITY));
} }


struct Resp { struct Resp {
@@ -420,9 +420,11 @@ impl InfluxWriter {
let mut count = 0; let mut count = 0;
let mut extras = 0; // any new Strings we intro to the system let mut extras = 0; // any new Strings we intro to the system
let mut n_rcvd = 0; let mut n_rcvd = 0;
let mut in_flight_buffer_bytes = 0;
let mut last = Instant::now(); let mut last = Instant::now();
let mut active: bool; let mut active: bool;
let mut last_clear = Instant::now(); let mut last_clear = Instant::now();
let mut last_memory_check = Instant::now();
let mut loop_time = Instant::now(); let mut loop_time = Instant::now();


let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize { let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize {
@@ -431,15 +433,22 @@ impl InfluxWriter {


assert_eq!(n_out(&spares, &backlog, extras), 0); assert_eq!(n_out(&spares, &backlog, extras), 0);


let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize| {
let count_allocated_memory = |spares: &VecDeque<String>, backlog: &VecDeque<String>, in_flight_buffer_bytes: &usize| -> usize {
spares.iter().map(|x| x.capacity()).sum::<usize>()
+ backlog.iter().map(|x| x.capacity()).sum::<usize>()
+ (*in_flight_buffer_bytes)
};

let send = |mut buf: String, backlog: &mut VecDeque<String>, n_outstanding: usize, in_flight_buffer_bytes: &mut usize| {
if n_outstanding >= MAX_OUTSTANDING_HTTP { if n_outstanding >= MAX_OUTSTANDING_HTTP {
backlog.push_back(buf); backlog.push_back(buf);
return return
} }
let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url let url = url.clone(); // Arc would be faster, but `hyper::Client::post` consumes url
let tx = http_tx.clone(); let tx = http_tx.clone();
let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "n_outstanding" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "in flight req at spawn time" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure
let client = Arc::clone(&client); let client = Arc::clone(&client);
*in_flight_buffer_bytes = *in_flight_buffer_bytes + buf.capacity();
debug!(logger, "launching http thread"); debug!(logger, "launching http thread");
let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || {
let logger = thread_logger; let logger = thread_logger;
@@ -550,6 +559,18 @@ impl InfluxWriter {
'event: loop { 'event: loop {
loop_time = Instant::now(); loop_time = Instant::now();
active = false; active = false;

if loop_time - last_memory_check > Duration::from_secs(60) {
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
info!(logger, "allocated memory: {:.1}MB", allocated_mb;
"allocated bytes" => allocated_bytes,
"in flight buffer bytes" => in_flight_buffer_bytes,
"spares.len()" => spares.len(),
"backlog.len()" => backlog.len(),
);
last_memory_check = loop_time;
}
match rx.recv() { match rx.recv() {
Ok(Some(mut meas)) => { Ok(Some(mut meas)) => {
n_rcvd += 1; n_rcvd += 1;
@@ -557,21 +578,27 @@ impl InfluxWriter {


if n_rcvd % INFO_HB_EVERY == 0 { if n_rcvd % INFO_HB_EVERY == 0 {
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep();
"n_outstanding" => n_outstanding, "n_outstanding" => n_outstanding,
"spares.len()" => spares.len(), "spares.len()" => spares.len(),
"n_rcvd" => n_rcvd, "n_rcvd" => n_rcvd,
"n_active_buf" => count, "n_active_buf" => count,
"db_health" => %format_args!("{:?}", db_health.mean), "db_health" => %format_args!("{:?}", db_health.mean),
"allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
"backlog.len()" => backlog.len()); "backlog.len()" => backlog.len());
} else if n_rcvd % DEBUG_HB_EVERY == 0 { } else if n_rcvd % DEBUG_HB_EVERY == 0 {
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes);
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep();
"n_outstanding" => n_outstanding, "n_outstanding" => n_outstanding,
"spares.len()" => spares.len(), "spares.len()" => spares.len(),
"n_rcvd" => n_rcvd, "n_rcvd" => n_rcvd,
"n_active_buf" => count, "n_active_buf" => count,
"db_health" => %format_args!("{:?}", db_health.mean), "db_health" => %format_args!("{:?}", db_health.mean),
"allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
"backlog.len()" => backlog.len()); "backlog.len()" => backlog.len());
} }


@@ -623,7 +650,13 @@ impl InfluxWriter {
} }
} else { } else {
extras += 1; extras += 1;
info!(logger, "allocating new buffer: zero spares avail"; "n_outstanding" => n_outstanding, "extras" => extras);
let allocated_bytes = count_allocated_memory(&spares, &backlog, &in_flight_buffer_bytes) + INITIAL_BUFFER_CAPACITY;
let allocated_mb = allocated_bytes as f64 / 1024.0 / 1024.0;
info!(logger, "allocating new buffer: zero spares avail";
"allocated buffer memory" => %format_args!("{:.1}MB", allocated_mb),
"n_outstanding" => n_outstanding,
"extras" => extras,
);
String::with_capacity(INITIAL_BUFFER_CAPACITY) String::with_capacity(INITIAL_BUFFER_CAPACITY)
} }
} }
@@ -632,7 +665,7 @@ impl InfluxWriter {
// //
mem::swap(&mut buf, &mut next); mem::swap(&mut buf, &mut next);
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
send(next, &mut backlog, n_outstanding);
send(next, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
last = loop_time; last = loop_time;
count count
} }
@@ -650,7 +683,7 @@ impl InfluxWriter {
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
let mut placeholder = spares.pop_front().unwrap_or_else(String::new); let mut placeholder = spares.pop_front().unwrap_or_else(String::new);
mem::swap(&mut buf, &mut placeholder); mem::swap(&mut buf, &mut placeholder);
send(placeholder, &mut backlog, n_outstanding);
send(placeholder, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
} }
let mut n_ok = 0; let mut n_ok = 0;
let mut n_err = 0; let mut n_err = 0;
@@ -688,7 +721,7 @@ impl InfluxWriter {
"spares.len()" => spares.len(), "spares.len()" => spares.len(),
"n_rcvd" => n_rcvd, "n_rcvd" => n_rcvd,
"n_outstanding" => n_outstanding); "n_outstanding" => n_outstanding);
send(buf, &mut backlog, n_outstanding);
send(buf, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
last_clear = loop_time; last_clear = loop_time;
} }


@@ -696,6 +729,7 @@ impl InfluxWriter {
match http_rx.try_recv() { match http_rx.try_recv() {
Ok(Ok(Resp { buf, .. })) => { Ok(Ok(Resp { buf, .. })) => {
n_ok += 1; n_ok += 1;
in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
if spares.len() <= INITIAL_BACKLOG { if spares.len() <= INITIAL_BACKLOG {
spares.push_back(buf); // needed so `n_outstanding` count remains accurate spares.push_back(buf); // needed so `n_outstanding` count remains accurate
} else { } else {
@@ -705,6 +739,7 @@ impl InfluxWriter {
Ok(Err(Resp { buf, .. })) => { Ok(Err(Resp { buf, .. })) => {
warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); warn!(logger, "requeueing failed request"; "buf.len()" => buf.len());
n_err += 1; n_err += 1;
in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
backlog.push_front(buf); backlog.push_front(buf);
} }
Err(chan::TryRecvError::Disconnected) => { Err(chan::TryRecvError::Disconnected) => {
@@ -734,7 +769,7 @@ impl InfluxWriter {
if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy {
if let Some(queued) = backlog.pop_front() { if let Some(queued) = backlog.pop_front() {
let n_outstanding = n_out(&spares, &backlog, extras); let n_outstanding = n_out(&spares, &backlog, extras);
send(queued, &mut backlog, n_outstanding);
send(queued, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes);
active = true; active = true;
} }
} }
@@ -743,13 +778,17 @@ impl InfluxWriter {
match http_rx.try_recv() { match http_rx.try_recv() {
Ok(Ok(Resp { buf, took })) => { Ok(Ok(Resp { buf, took })) => {
db_health.add(loop_time, took); db_health.add(loop_time, took);
let in_flight_before = in_flight_buffer_bytes.clone();
in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
if spares.len() <= INITIAL_BACKLOG { if spares.len() <= INITIAL_BACKLOG {
spares.push_back(buf); spares.push_back(buf);
} else { } else {
extras = extras.saturating_sub(1); extras = extras.saturating_sub(1);
info!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size";
debug!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size";
"spares.len()" => spares.len(), "spares.len()" => spares.len(),
"extras" => extras, "extras" => extras,
"in flight before" => in_flight_before,
"in in_flight_buffer_bytes" => in_flight_buffer_bytes,
); );
} }


@@ -759,6 +798,7 @@ impl InfluxWriter {


Ok(Err(Resp { buf, took })) => { Ok(Err(Resp { buf, took })) => {
db_health.add(loop_time, took); db_health.add(loop_time, took);
in_flight_buffer_bytes = in_flight_buffer_bytes.saturating_sub(buf.capacity());
backlog.push_front(buf); backlog.push_front(buf);
active = true; active = true;
} }


Loading…
Cancel
Save