From e380a771131562dd9fb7dfdb0b8840338e32fbe7 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 10 Sep 2020 10:11:04 -0400 Subject: [PATCH] skip serializing NaN f64/d128 values, rather than previous strategy of supplying -999.0 signal value I find excluding -999.0 values annoying in practice. --- Cargo.toml | 2 +- src/lib.rs | 111 +++++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 100 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 68fc759..fca2298 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "influx-writer" -version = "0.10.1" +version = "0.11.0" authors = ["Jonathan Strong "] edition = "2018" diff --git a/src/lib.rs b/src/lib.rs index 4110637..22ce9e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,12 @@ use smallvec::SmallVec; use slog::Logger; use pretty_toa::ThousandsSep; +/// whether non-finite `f64` and `d128` values should be skipped +/// during serialization to influxdb line format. influx does not +/// handle `NaN` values at all. the other option is a marker value, +/// previously `-999.0` had been used. +pub const SKIP_NAN_VALUES: bool = true; + pub type Credentials = hyper::header::Authorization; /// Created this so I know what types can be passed through the @@ -934,7 +940,10 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { add_tag(line, key, value.as_str()); } - let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| { + let add_field = |line: &mut String, key: &str, value: &OwnedValue, is_first: bool| -> bool { + + if SKIP_NAN_VALUES && ! value.is_finite() { return false } + if is_first { line.push_str(" "); } else { line.push_str(","); } line.push_str(&escape_tag(key)); line.push_str("="); @@ -947,7 +956,7 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { if d.is_finite() { line.push_str(&format!("{}", d)); } else { - line.push_str("0.0"); + line.push_str("-999.0"); } } @@ -961,22 +970,32 @@ pub fn serialize_owned(measurement: &OwnedMeasurement, line: &mut String) { OwnedValue::Uuid(ref u) => line.push_str(&format!("\"{}\"", u)), }; - }; - let mut fields = measurement.fields.iter(); + true + }; - // first time separate from tags with space + // use this counter to ensure that at least one field was + // serialized. since NaN values may be skipped, the serialization + // would be wrong if no fields ended up being serialized. instead, + // track it, and if there are none serialized, add a n=1 to make + // the measurement serialize properly // - fields.next().map(|kv| { - add_field(line, &kv.0, &kv.1, true); - }); + // this also controls what value is passed to the `is_first` argument + // of `add_field` + let mut n_fields_serialized = 0; - // then seperate the rest w/ comma - // - for kv in fields { - add_field(line, kv.0, &kv.1, false); + for kv in measurement.fields.iter() { + if add_field(line, kv.0, &kv.1, n_fields_serialized == 0) { + n_fields_serialized += 1; + } } + // supply a minimum of one field (n=1) + // + // TODO: could potentially clobber a "n" tag? do we care? + // + if n_fields_serialized == 0 { add_field(line, "n", &OwnedValue::Integer(1), true); } + if let Some(t) = measurement.timestamp { line.push_str(" "); line.push_str(&t.to_string()); @@ -993,6 +1012,38 @@ pub enum OwnedValue { Uuid(Uuid), } +impl OwnedValue { + /// if `self` is a `Float` or `D128` variant, checks + /// whether the contained value is finite + /// + /// # Examples + /// + /// ``` + /// use std::str::FromStr; + /// use influx_writer::OwnedValue; + /// + /// let v1 = OwnedValue::Float(f64::NAN); + /// assert!( ! v1.is_finite()); + /// let v2 = OwnedValue::Float(1.234f64); + /// assert!(v2.is_finite()); + /// + /// let v3 = OwnedValue::D128(decimal::d128::from_str("NaN").unwrap()); + /// assert!( ! v3.is_finite()); + /// let v4 = OwnedValue::D128(decimal::d128::from_str("42.42").unwrap()); + /// assert!(v4.is_finite()); + /// + /// // other variants are always "finite" + /// assert!(OwnedValue::String("NaN".into()).is_finite()); + /// ``` + pub fn is_finite(&self) -> bool { + match self { + OwnedValue::Float(x) => x.is_finite(), + OwnedValue::D128(x) => x.is_finite(), + _ => true, + } + } +} + /// Holds data meant for an influxdb measurement in transit to the /// writing thread. /// @@ -1095,6 +1146,7 @@ impl OwnedMeasurement { #[allow(unused)] #[cfg(test)] mod tests { + use std::str::FromStr; use super::*; #[cfg(feature = "unstable")] use test::{black_box, Bencher}; @@ -1447,4 +1499,39 @@ mod tests { measure!(influx, auth_test_meas, i(n, 1)); drop(influx); } + + #[test] + fn it_skips_nan_values() { + assert!(SKIP_NAN_VALUES, "otherwise this test is worthless"); + let m = OwnedMeasurement::new("rust_test") + .add_field("hello", OwnedValue::Integer(1234)) + .add_field("finite_float", OwnedValue::Float(1.234)) + .add_field("nan_float", OwnedValue::Float(f64::NAN)) + .add_field("inf_float", OwnedValue::Float(f64::INFINITY)) + .add_field("neg_inf_float", OwnedValue::Float(f64::NEG_INFINITY)) + .add_field("finite_d128", OwnedValue::D128(d128::from_str("3.456").unwrap())) + .add_field("nan_d128", OwnedValue::D128(d128::from_str("NaN").unwrap())) + .set_timestamp(now()); + let mut buf = String::new(); + serialize_owned(&m, &mut buf); + dbg!(&buf); + assert!(buf.contains("hello=1234")); + assert!(buf.contains("finite_float=1.234")); + assert!( ! buf.contains("nan_float=")); + assert!( ! buf.contains("inf_float=")); + assert!( ! buf.contains("neg_inf_float=")); + assert!(buf.contains("finite_d128=3.456")); + assert!( ! buf.contains("nan_d128=")); + } + + #[test] + fn it_supplies_a_field_if_every_field_is_skipped_because_nan() { + assert!(SKIP_NAN_VALUES, "otherwise this test is worthless"); + let m = OwnedMeasurement::new("rust_test") + .add_field("nan_float", OwnedValue::Float(f64::NAN)); + let mut buf = String::new(); + serialize_owned(&m, &mut buf); + dbg!(&buf); + assert!(buf.contains("n=1i")); + } }