Browse Source

Merge remote-tracking branch 'origin/master' into log-total-pts

master
Jonathan Strong 3 years ago
parent
commit
49d74d207e
4 changed files with 93 additions and 10 deletions
  1. +4
    -4
      Cargo.toml
  2. +3
    -3
      examples/write.rs
  3. +23
    -0
      justfile
  4. +63
    -3
      src/lib.rs

+ 4
- 4
Cargo.toml View File

@@ -1,6 +1,6 @@
[package] [package]
name = "influx-writer" name = "influx-writer"
version = "0.14.0"
version = "0.18.1"
authors = ["Jonathan Strong <jonathan.strong@gmail.com>"] authors = ["Jonathan Strong <jonathan.strong@gmail.com>"]
edition = "2018" edition = "2018"
description = "opinionated influxdb client" description = "opinionated influxdb client"
@@ -26,11 +26,11 @@ slog-term = "2"
uuid = { version = "0.8", features = ["serde", "v4", "slog"] } uuid = { version = "0.8", features = ["serde", "v4", "slog"] }
slog-async = "2" slog-async = "2"
smallvec = "0.6" smallvec = "0.6"
crossbeam-channel = "0.3"
crossbeam-channel = "0.5"
pretty_toa = "1.0.0" pretty_toa = "1.0.0"
signal-hook = { version = "0.1.15", optional = true }
signal-hook = { version = "0.3.8", optional = true }


decimal = { version = "2.4.2", registry = "mmcxi" }
decimal = { version = "2.4.3", registry = "mmcxi" }
decimal-macros = { version = "0.3", registry = "mmcxi" } decimal-macros = { version = "0.3", registry = "mmcxi" }


[features] [features]


+ 3
- 3
examples/write.rs View File

@@ -16,9 +16,9 @@ const N_PER: usize = 567;
fn main() { fn main() {
let start = Instant::now(); let start = Instant::now();
let term = Arc::new(AtomicBool::new(false)); let term = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::SIGINT, Arc::clone(&term)).unwrap();
signal_hook::flag::register(signal_hook::SIGTERM, Arc::clone(&term)).unwrap();
signal_hook::flag::register(signal_hook::SIGQUIT, Arc::clone(&term)).unwrap();
signal_hook::flag::register(signal_hook::consts::signal::SIGINT, Arc::clone(&term)).unwrap();
signal_hook::flag::register(signal_hook::consts::signal::SIGTERM, Arc::clone(&term)).unwrap();
signal_hook::flag::register(signal_hook::consts::signal::SIGQUIT, Arc::clone(&term)).unwrap();


let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();


+ 23
- 0
justfile View File

@@ -7,6 +7,9 @@ cargo +args='':
check +args='': check +args='':
@just cargo check {{args}} @just cargo check {{args}}


bench +args='':
@just cargo bench {{args}}

build name +args='': build name +args='':
@just cargo build --bin {{name}} {{args}} @just cargo build --bin {{name}} {{args}}


@@ -22,4 +25,24 @@ test +args='':
doc +args='': doc +args='':
@just cargo doc --open --document-private-items {{args}} @just cargo doc --open --document-private-items {{args}}


# just rebuild docs, don't open browser page again
redoc +args='':
@just cargo doc {{args}}

publish +args='':
@just cargo publish --registry mmcxi {{args}}

update +args='':
@just cargo update {{args}}

# blow away build dir and start all over again
rebuild:
just cargo clean
just update
just test

# display env variables that will be used for building
show-build-env:
@ env | rg RUST --color never




+ 63
- 3
src/lib.rs View File

@@ -302,7 +302,6 @@ impl Default for InfluxWriter {


impl Clone for InfluxWriter { impl Clone for InfluxWriter {
fn clone(&self) -> Self { fn clone(&self) -> Self {
debug_assert!(self.thread.is_some());
let thread = self.thread.as_ref().map(|x| Arc::clone(x)); let thread = self.thread.as_ref().map(|x| Arc::clone(x));
InfluxWriter { InfluxWriter {
host: self.host.to_string(), host: self.host.to_string(),
@@ -322,7 +321,12 @@ impl InfluxWriter {
/// ///
#[inline] #[inline]
pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> { pub fn send(&self, m: OwnedMeasurement) -> Result<(), SendError<Option<OwnedMeasurement>>> {
self.tx.send(Some(m))
//if self.thread.is_none() {
// let _ = self.tx.try_send(Some(m));
// Ok(())
//} else {
self.tx.send(Some(m))
//}
} }


#[inline] #[inline]
@@ -354,8 +358,9 @@ impl InfluxWriter {
#[inline] #[inline]
pub fn is_full(&self) -> bool { self.tx.is_full() } pub fn is_full(&self) -> bool { self.tx.is_full() }


/// provides a shell interface that immediately drops measurements sent to it
pub fn placeholder() -> Self { pub fn placeholder() -> Self {
let (tx, _) = bounded(1024);
let (tx, _) = bounded(1);
Self { Self {
host: String::new(), host: String::new(),
db: String::new(), db: String::new(),
@@ -364,6 +369,10 @@ impl InfluxWriter {
} }
} }


pub fn is_placeholder(&self) -> bool {
self.thread.is_none() && self.host == ""
}

pub fn new(host: &str, db: &str) -> Self { pub fn new(host: &str, db: &str) -> Self {
let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!()); let noop_logger = slog::Logger::root(slog::Discard.fuse(), o!());
Self::with_logger_and_opt_creds(host, db, None, &noop_logger) Self::with_logger_and_opt_creds(host, db, None, &noop_logger)
@@ -1177,6 +1186,57 @@ mod tests {
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
use test::{black_box, Bencher}; use test::{black_box, Bencher};



#[cfg(feature = "unstable")]
#[bench]
fn send_to_disconnected_channel(b: &mut Bencher) {
let (tx, _): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(1);
let time = now();
b.iter(|| {
const VERSION: &str = "1.0.0";
let color = "red";
let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
tx.send(Some(m))
})
}

#[cfg(feature = "unstable")]
#[bench]
fn try_send_to_disconnected_channel(b: &mut Bencher) {
let (tx, _): (Sender<Option<OwnedMeasurement>>, Receiver<Option<OwnedMeasurement>>) = bounded(1);
let time = now();
b.iter(|| {
const VERSION: &str = "1.0.0";
let color = "red";
let m = measure!(@make_meas test, i(n, 1), t(color), v(VERSION), tm(time));
tx.try_send(Some(m))
})
}

#[cfg(feature = "unstable")]
#[bench]
fn send_to_disconnected_channel_via_placeholder(b: &mut Bencher) {
let time = now();
let influx = InfluxWriter::placeholder();
b.iter(|| {
const VERSION: &str = "1.0.0";
let color = "red";
measure!(influx, test, i(n, 1), t(color), v(VERSION), tm(time));
})
}

#[cfg(feature = "unstable")]
#[bench]
fn send_to_connected_channel_via_measure(b: &mut Bencher) {
let time = now();
let influx = InfluxWriter::new("localhost", "test");
b.iter(|| {
const VERSION: &str = "1.0.0";
let color = "red";
measure!(influx, bench, i(n, 1), t(color), v(VERSION), tm(time));
})
}

#[ignore] #[ignore]
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[bench] #[bench]


Loading…
Cancel
Save