|
|
@@ -369,31 +369,35 @@ impl InfluxWriter { |
|
|
|
const N_BUFFER_LINES: usize = 1024; |
|
|
|
const MAX_PENDING: Duration = Duration::from_secs(3); |
|
|
|
const INITIAL_BUFFER_CAPACITY: usize = 4096; |
|
|
|
const MAX_BACKLOG: usize = 128; |
|
|
|
const INITIAL_BACKLOG: usize = 128; |
|
|
|
const MAX_BACKLOG: usize = 1024; |
|
|
|
const MAX_OUTSTANDING_HTTP: usize = 64; |
|
|
|
const HB_EVERY: usize = 100_000; |
|
|
|
const DEBUG_HB_EVERY: usize = 1024 * 96; |
|
|
|
const INFO_HB_EVERY: usize = 1024 * 1024; |
|
|
|
const N_HTTP_ATTEMPTS: u32 = 15; |
|
|
|
|
|
|
|
let client = Arc::new(Client::new()); |
|
|
|
|
|
|
|
info!(logger, "initializing InfluxWriter ..."; |
|
|
|
"N_BUFFER_LINES" => N_BUFFER_LINES, |
|
|
|
"MAX_PENDING" => %format_args!("{:?}", MAX_PENDING), |
|
|
|
"MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP, |
|
|
|
"INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, |
|
|
|
"MAX_BACKLOG" => MAX_BACKLOG); |
|
|
|
"N_BUFFER_LINES" => N_BUFFER_LINES, |
|
|
|
"MAX_PENDING" => %format_args!("{:?}", MAX_PENDING), |
|
|
|
"MAX_OUTSTANDING_HTTP" => MAX_OUTSTANDING_HTTP, |
|
|
|
"INITIAL_BUFFER_CAPACITY" => INITIAL_BUFFER_CAPACITY, |
|
|
|
"INITIAL_BACKLOG" => INITIAL_BACKLOG, |
|
|
|
"MAX_BACKLOG" => MAX_BACKLOG, |
|
|
|
); |
|
|
|
|
|
|
|
// pre-allocated buffers ready for use if the active one is stasheed |
|
|
|
// during an outage |
|
|
|
let mut spares: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG); |
|
|
|
let mut spares: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG); |
|
|
|
|
|
|
|
// queue failed sends here until problem resolved, then send again. in worst |
|
|
|
// case scenario, loop back around on buffers queued in `backlog`, writing |
|
|
|
// over the oldest first. |
|
|
|
// |
|
|
|
let mut backlog: VecDeque<String> = VecDeque::with_capacity(MAX_BACKLOG); |
|
|
|
let mut backlog: VecDeque<String> = VecDeque::with_capacity(INITIAL_BACKLOG); |
|
|
|
|
|
|
|
for _ in 0..MAX_BACKLOG { |
|
|
|
for _ in 0..INITIAL_BACKLOG { |
|
|
|
spares.push_back(String::with_capacity(1024)); |
|
|
|
} |
|
|
|
|
|
|
@@ -422,7 +426,7 @@ impl InfluxWriter { |
|
|
|
let mut loop_time = Instant::now(); |
|
|
|
|
|
|
|
let n_out = |s: &VecDeque<String>, b: &VecDeque<String>, extras: usize| -> usize { |
|
|
|
MAX_BACKLOG + extras - s.len() - b.len() - 1 |
|
|
|
INITIAL_BACKLOG + extras - s.len() - b.len() - 1 |
|
|
|
}; |
|
|
|
|
|
|
|
assert_eq!(n_out(&spares, &backlog, extras), 0); |
|
|
@@ -551,7 +555,7 @@ impl InfluxWriter { |
|
|
|
n_rcvd += 1; |
|
|
|
active = true; |
|
|
|
|
|
|
|
if n_rcvd % HB_EVERY == 0 { |
|
|
|
if n_rcvd % INFO_HB_EVERY == 0 { |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
info!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
@@ -560,6 +564,15 @@ impl InfluxWriter { |
|
|
|
"n_active_buf" => count, |
|
|
|
"db_health" => %format_args!("{:?}", db_health.mean), |
|
|
|
"backlog.len()" => backlog.len()); |
|
|
|
} else if n_rcvd % DEBUG_HB_EVERY == 0 { |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
debug!(logger, "rcvd {} measurements", n_rcvd.thousands_sep(); |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
"n_active_buf" => count, |
|
|
|
"db_health" => %format_args!("{:?}", db_health.mean), |
|
|
|
"backlog.len()" => backlog.len()); |
|
|
|
} |
|
|
|
|
|
|
|
if meas.timestamp.is_none() { meas.timestamp = Some(now()) } |
|
|
@@ -579,33 +592,39 @@ impl InfluxWriter { |
|
|
|
|
|
|
|
None => { |
|
|
|
let n_outstanding = n_out(&spares, &backlog, extras); |
|
|
|
crit!(logger, "no available buffers in `spares`, pulling from backlog"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
"backlog.len()" => backlog.len()); |
|
|
|
match backlog.pop_front() { |
|
|
|
// Note: this does not clear the backlog buffer, |
|
|
|
// instead we will just write more and more until |
|
|
|
// we are out of memory. I expect that will never |
|
|
|
// happen. |
|
|
|
// |
|
|
|
Some(x) => { |
|
|
|
count = 1; // otherwise, no '\n' added in `next(..)` - we are |
|
|
|
// sending a "full" buffer to be extended |
|
|
|
x |
|
|
|
} |
|
|
|
|
|
|
|
None => { |
|
|
|
extras += 1; |
|
|
|
crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"backlog.len()" => backlog.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
"extras" => extras); |
|
|
|
String::new() |
|
|
|
if n_outstanding > MAX_BACKLOG { |
|
|
|
warn!(logger, "no available buffers in `spares`, pulling from backlog"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
"backlog.len()" => backlog.len()); |
|
|
|
match backlog.pop_front() { |
|
|
|
// Note: this does not clear the backlog buffer, |
|
|
|
// instead we will just write more and more until |
|
|
|
// we are out of memory. I expect that will never |
|
|
|
// happen. |
|
|
|
// |
|
|
|
Some(x) => { |
|
|
|
count = 1; // otherwise, no '\n' added in `next(..)` - we are |
|
|
|
// sending a "full" buffer to be extended |
|
|
|
x |
|
|
|
} |
|
|
|
|
|
|
|
None => { |
|
|
|
extras += 1; |
|
|
|
crit!(logger, "failed to pull from backlog, too!! WTF #!(*#(* ... creating new String"; |
|
|
|
"n_outstanding" => n_outstanding, |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"backlog.len()" => backlog.len(), |
|
|
|
"n_rcvd" => n_rcvd, |
|
|
|
"extras" => extras); |
|
|
|
String::new() |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
extras += 1; |
|
|
|
info!(logger, "allocating new buffer: zero spares avail"; "n_outstanding" => n_outstanding, "extras" => extras); |
|
|
|
String::with_capacity(INITIAL_BUFFER_CAPACITY) |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
@@ -677,7 +696,11 @@ impl InfluxWriter { |
|
|
|
match http_rx.try_recv() { |
|
|
|
Ok(Ok(Resp { buf, .. })) => { |
|
|
|
n_ok += 1; |
|
|
|
spares.push_back(buf); // needed so `n_outstanding` count remains accurate |
|
|
|
if spares.len() <= INITIAL_BACKLOG { |
|
|
|
spares.push_back(buf); // needed so `n_outstanding` count remains accurate |
|
|
|
} else { |
|
|
|
extras = extras.saturating_sub(1); |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(Err(Resp { buf, .. })) => { |
|
|
|
warn!(logger, "requeueing failed request"; "buf.len()" => buf.len()); |
|
|
@@ -720,7 +743,17 @@ impl InfluxWriter { |
|
|
|
match http_rx.try_recv() { |
|
|
|
Ok(Ok(Resp { buf, took })) => { |
|
|
|
db_health.add(loop_time, took); |
|
|
|
spares.push_back(buf); |
|
|
|
if spares.len() <= INITIAL_BACKLOG { |
|
|
|
spares.push_back(buf); |
|
|
|
} else { |
|
|
|
extras = extras.saturating_sub(1); |
|
|
|
info!(logger, "dropping buffer to reduce memory back to INITIAL_BACKLOG size"; |
|
|
|
"spares.len()" => spares.len(), |
|
|
|
"extras" => extras, |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
//spares.push_back(buf); |
|
|
|
active = true; |
|
|
|
} |
|
|
|
|
|
|
|