From f944dba8258a29d18c738a3c959edb95a7112546 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 23 Jul 2020 16:14:32 -0400 Subject: [PATCH] add authenticated writes functionality --- Cargo.toml | 11 ++++++++- examples/write.rs | 3 +-- justfile | 27 +++++++++++++++++++-- src/lib.rs | 62 ++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 87 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f3e75df..de166e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "influx-writer" -version = "0.8.0" +version = "0.9.0" authors = ["Jonathan Strong "] edition = "2018" @@ -34,6 +34,15 @@ trace = ["slog/release_max_level_trace", "slog/max_level_trace"] debug = ["slog/release_max_level_debug", "slog/max_level_debug"] string-tags = [] unstable = [] +# tests that authenticated requests are accepted by influxdb server +# +# setup: +# +# - create database "auth_test" +# - create user "auth_test_user" with password "hot dog" grant permissions +# to write to "auth_test" database +# +auth-tests = [] [profile.bench] lto = true diff --git a/examples/write.rs b/examples/write.rs index 56867da..497f8d0 100644 --- a/examples/write.rs +++ b/examples/write.rs @@ -20,7 +20,6 @@ fn main() { signal_hook::flag::register(signal_hook::SIGTERM, Arc::clone(&term)).unwrap(); signal_hook::flag::register(signal_hook::SIGQUIT, Arc::clone(&term)).unwrap(); - let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); @@ -28,7 +27,7 @@ fn main() { let logger = root.new(o!("thread" => "main")); - let influx = InfluxWriter::with_logger("localhost", "test", &root); + let influx = InfluxWriter::with_logger_and_opt_creds("localhost", "test", None, &root); let mut n = 0; diff --git a/justfile b/justfile index 21ac29d..1c603aa 100644 --- a/justfile +++ b/justfile @@ -1,2 +1,25 @@ -bench NAME='': - RUSTFLAGS="-C target-cpu=native" cargo bench {{NAME}} +export MAKEFLAGS := "-j8" +export RUSTFLAGS := "-C link-arg=-fuse-ld=lld" + +cargo +args='': + cargo {{args}} + +check +args='': + @just cargo check {{args}} + +build name +args='': + @just cargo build --bin {{name}} {{args}} + +release-build name +args='': + @just cargo build --bin {{name}} --release {{args}} + +example name +args='': + @just cargo build --example {{name}} {{args}} + +test +args='': + @just cargo test {{args}} + +doc +args='': + @just cargo doc --open --document-private-items {{args}} + + diff --git a/src/lib.rs b/src/lib.rs index 0618232..d559126 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,8 @@ use smallvec::SmallVec; use slog::Logger; use pretty_toa::ThousandsSep; +pub type Credentials = hyper::header::Authorization; + /// Created this so I know what types can be passed through the /// `measure!` macro, which used to convert with `as i64` and /// `as f64` until I accidentally passed a function name, and it @@ -346,11 +348,32 @@ impl InfluxWriter { pub fn new(host: &str, db: &str) -> Self { let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!()); - Self::with_logger(host, db, &noop_logger) + Self::with_logger_and_opt_creds(host, db, None, &noop_logger) + } + + pub fn get_credentials(username: String, password: Option) -> Credentials { + hyper::header::Authorization( + hyper::header::Basic { username, password } + ) } + fn http_req<'a>(client: &'a Client, url: Url, body: &'a str, creds: &Option) -> hyper::client::RequestBuilder<'a> { + let req = client.post(url.clone()) + .body(body); + if let Some(auth) = creds { + req.header(auth.clone()) + } else { + req + } + } + + #[deprecated(since = "0.9.0", note = "replaced by with_logger_and_opt_creds")] #[allow(unused_assignments)] pub fn with_logger(host: &str, db: &str, logger: &Logger) -> Self { + Self::with_logger_and_opt_creds(host, db, None, logger) + } + + pub fn with_logger_and_opt_creds(host: &str, db: &str, creds: Option, logger: &Logger) -> Self { let logger = logger.new(o!( "host" => host.to_string(), "db" => db.to_string())); @@ -377,6 +400,7 @@ impl InfluxWriter { const INITIAL_BACKLOG: usize = MAX_OUTSTANDING_HTTP * 2; let client = Arc::new(Client::new()); + let creds = Arc::new(creds); info!(logger, "initializing InfluxWriter ..."; "N_BUFFER_LINES" => N_BUFFER_LINES, @@ -425,7 +449,7 @@ impl InfluxWriter { let mut active: bool; let mut last_clear = Instant::now(); let mut last_memory_check = Instant::now(); - let mut loop_time = Instant::now(); + let mut loop_time: Instant; let n_out = |s: &VecDeque, b: &VecDeque, extras: usize| -> usize { INITIAL_BACKLOG + extras - s.len() - b.len() - 1 @@ -448,6 +472,7 @@ impl InfluxWriter { let tx = http_tx.clone(); let thread_logger = logger.new(o!("thread" => "InfluxWriter:http", "in flight req at spawn time" => n_outstanding)); // re `thread_logger` name: disambiguating for `logger` after thread closure let client = Arc::clone(&client); + let creds = Arc::clone(&creds); *in_flight_buffer_bytes = *in_flight_buffer_bytes + buf.capacity(); debug!(logger, "launching http thread"); let thread_res = thread::Builder::new().name(format!("inflx-http{}", n_outstanding)).spawn(move || { @@ -464,9 +489,8 @@ impl InfluxWriter { thread::sleep(throttle); // 0, 2, 8, 16, 32 } let sent = Instant::now(); - let resp = client.post(url.clone()) - .body(buf.as_str()) - .send(); + let req = Self::http_req(&client, url.clone(), buf.as_str(), &creds); + let resp = req.send(); let rcvd = Instant::now(); let took = rcvd - sent; let mut n_tx = 0u32; @@ -678,7 +702,7 @@ impl InfluxWriter { warn!(logger, "terminate signal rcvd"; "count" => count); if buf.len() > 0 { info!(logger, "sending remaining buffer to influx on terminate"; "count" => count); - let meas = OwnedMeasurement::new("wtrterm").add_field("n", OwnedValue::Integer(1)); + let meas = OwnedMeasurement::new("influx_writer").add_field("n", OwnedValue::Integer(1)); let _ = next(N_BUFFER_LINES, &meas, &mut buf, loop_time, last); let n_outstanding = n_out(&spares, &backlog, extras); let mut placeholder = spares.pop_front().unwrap_or_else(String::new); @@ -722,7 +746,6 @@ impl InfluxWriter { "n_rcvd" => n_rcvd, "n_outstanding" => n_outstanding); send(buf, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes); - last_clear = loop_time; } 'rx: loop { @@ -766,12 +789,16 @@ impl InfluxWriter { db_health.refresh(loop_time); let n_outstanding = n_out(&spares, &backlog, extras); let healthy = db_health.count == 0 || db_health.mean < Duration::from_secs(200); - if (n_outstanding < MAX_OUTSTANDING_HTTP || loop_time - last_clear > Duration::from_secs(60)) && healthy { + if (n_outstanding < MAX_OUTSTANDING_HTTP + || loop_time.saturating_duration_since(last_clear) > Duration::from_secs(60)) + && healthy { + if let Some(queued) = backlog.pop_front() { let n_outstanding = n_out(&spares, &backlog, extras); send(queued, &mut backlog, n_outstanding, &mut in_flight_buffer_bytes); active = true; } + last_clear = loop_time; } loop { @@ -1386,9 +1413,8 @@ mod tests { let url = Url::parse_with_params("http://localhost:8086/write", &[("db", "test"), ("precision", "ns")]).expect("influx writer url should parse"); let client = Client::new(); - match client.post(url.clone()) - .body(&buf) - .send() { + let req = InfluxWriter::http_req(&client, url.clone(), &buf, &None); + match req.send() { Ok(Response { status, .. }) if status == StatusCode::NoContent => {} @@ -1402,4 +1428,18 @@ mod tests { } } } + + #[cfg(feature = "auth-tests")] + #[test] + fn it_sends_authenticated_measurements() { + let creds = InfluxWriter::get_credentials("auth_test_user".into(), Some("hot dog".into())); + let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!()); + //let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); + //let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); + //let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); + //let root = slog::Logger::root(drain, o!("version" => "0.1")); + //let influx = InfluxWriter::with_logger_and_opt_creds("localhost", "auth_test", Some(creds), &root); + measure!(influx, auth_test_meas, i(n, 1)); + drop(influx); + } }