diff --git a/Cargo.toml b/Cargo.toml index e1b95fe..9f93984 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "influx-writer" -version = "0.14.0" +version = "0.18.1" authors = ["Jonathan Strong "] edition = "2018" description = "opinionated influxdb client" @@ -26,11 +26,11 @@ slog-term = "2" uuid = { version = "0.8", features = ["serde", "v4", "slog"] } slog-async = "2" smallvec = "0.6" -crossbeam-channel = "0.3" +crossbeam-channel = "0.5" pretty_toa = "1.0.0" -signal-hook = { version = "0.1.15", optional = true } +signal-hook = { version = "0.3.8", optional = true } -decimal = { version = "2.4.2", registry = "mmcxi" } +decimal = { version = "2.4.3", registry = "mmcxi" } decimal-macros = { version = "0.3", registry = "mmcxi" } [features] diff --git a/examples/write.rs b/examples/write.rs index 497f8d0..88a07a5 100644 --- a/examples/write.rs +++ b/examples/write.rs @@ -16,9 +16,9 @@ const N_PER: usize = 567; fn main() { let start = Instant::now(); let term = Arc::new(AtomicBool::new(false)); - signal_hook::flag::register(signal_hook::SIGINT, Arc::clone(&term)).unwrap(); - signal_hook::flag::register(signal_hook::SIGTERM, Arc::clone(&term)).unwrap(); - signal_hook::flag::register(signal_hook::SIGQUIT, Arc::clone(&term)).unwrap(); + signal_hook::flag::register(signal_hook::consts::signal::SIGINT, Arc::clone(&term)).unwrap(); + signal_hook::flag::register(signal_hook::consts::signal::SIGTERM, Arc::clone(&term)).unwrap(); + signal_hook::flag::register(signal_hook::consts::signal::SIGQUIT, Arc::clone(&term)).unwrap(); let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); diff --git a/justfile b/justfile index 1c603aa..d1c4b58 100644 --- a/justfile +++ b/justfile @@ -7,6 +7,9 @@ cargo +args='': check +args='': @just cargo check {{args}} +bench +args='': + @just cargo bench {{args}} + build name +args='': @just cargo build --bin {{name}} {{args}} @@ -22,4 +25,24 @@ test +args='': doc +args='': @just cargo doc --open --document-private-items {{args}} +# just rebuild docs, don't open browser page again +redoc +args='': + @just cargo doc {{args}} + +publish +args='': + @just cargo publish --registry mmcxi {{args}} + +update +args='': + @just cargo update {{args}} + +# blow away build dir and start all over again +rebuild: + just cargo clean + just update + just test + +# display env variables that will be used for building +show-build-env: + @ env | rg RUST --color never + diff --git a/src/lib.rs b/src/lib.rs index 12e0c64..f984f30 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -302,7 +302,6 @@ impl Default for InfluxWriter { impl Clone for InfluxWriter { fn clone(&self) -> Self { - debug_assert!(self.thread.is_some()); let thread = self.thread.as_ref().map(|x| Arc::clone(x)); InfluxWriter { host: self.host.to_string(), @@ -322,7 +321,12 @@ impl InfluxWriter { /// #[inline] pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError>> { - self.tx.send(Some(m)) + //if self.thread.is_none() { + // let _ = self.tx.try_send(Some(m)); + // Ok(()) + //} else { + self.tx.send(Some(m)) + //} } #[inline] @@ -354,8 +358,9 @@ impl InfluxWriter { #[inline] pub fn is_full(&self) -> bool { self.tx.is_full() } + /// provides a shell interface that immediately drops measurements sent to it pub fn placeholder() -> Self { - let (tx, _) = bounded(1024); + let (tx, _) = bounded(1); Self { host: String::new(), db: String::new(), @@ -364,6 +369,10 @@ impl InfluxWriter { } } + pub fn is_placeholder(&self) -> bool { + self.thread.is_none() && self.host == "" + } + pub fn new(host: &str, db: &str) -> Self { let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!()); Self::with_logger_and_opt_creds(host, db, None, &noop_logger) @@ -1177,6 +1186,57 @@ mod tests { #[cfg(feature = "unstable")] use test::{black_box, Bencher}; + + #[cfg(feature = "unstable")] + #[bench] + fn send_to_disconnected_channel(b: &mut Bencher) { + let (tx, _): (Sender>, Receiver>) = bounded(1); + let time = now(); + b.iter(|| { + const VERSION: &str = "1.0.0"; + let color = "red"; + let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time)); + tx.send(Some(m)) + }) + } + + #[cfg(feature = "unstable")] + #[bench] + fn try_send_to_disconnected_channel(b: &mut Bencher) { + let (tx, _): (Sender>, Receiver>) = bounded(1); + let time = now(); + b.iter(|| { + const VERSION: &str = "1.0.0"; + let color = "red"; + let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time)); + tx.try_send(Some(m)) + }) + } + + #[cfg(feature = "unstable")] + #[bench] + fn send_to_disconnected_channel_via_placeholder(b: &mut Bencher) { + let time = now(); + let influx = InfluxWriter::placeholder(); + b.iter(|| { + const VERSION: &str = "1.0.0"; + let color = "red"; + measure!(influx, test, i(n, 1), t(color), v(VERSION), tm(time)); + }) + } + + #[cfg(feature = "unstable")] + #[bench] + fn send_to_connected_channel_via_measure(b: &mut Bencher) { + let time = now(); + let influx = InfluxWriter::new("localhost", "test"); + b.iter(|| { + const VERSION: &str = "1.0.0"; + let color = "red"; + measure!(influx, bench, i(n, 1), t(color), v(VERSION), tm(time)); + }) + } + #[ignore] #[cfg(feature = "unstable")] #[bench]