From dc9aa58e03e27eadacf2b95f475883fe6c574ff8 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Thu, 16 Apr 2020 21:18:05 -0400 Subject: [PATCH] wip binary encoding --- Cargo.toml | 4 +- src/csv.rs | 18 +++- src/encoding.rs | 271 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 7 ++ src/munge.rs | 15 +++ 5 files changed, 311 insertions(+), 4 deletions(-) create mode 100644 src/encoding.rs diff --git a/Cargo.toml b/Cargo.toml index 421c946..8e8e644 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ csv = "1.1" structopt = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" -markets = { version = "0.3.0", registry = "jstrong-dev" } +markets = { version = "0.3.1", registry = "jstrong-dev" } slog = "2" slog-async = "2" slog-term = "2" @@ -37,6 +37,8 @@ dtoa = "0.4.5" chrono = { version = "0.4", features = ["serde"] } clap = "2" itertools-num = "0.1" +bincode = "1.2" +postcard = "0.5" [dev-dependencies] approx = "0.3" diff --git a/src/csv.rs b/src/csv.rs index c5ed7b2..fd05f60 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -45,7 +45,7 @@ struct Opt { hard_mode: bool, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct Trade { /// Time of trade in unix nanoseconds pub time: u64, @@ -57,6 +57,8 @@ struct Trade { pub price: f64, /// Size/Volume of trade, in base denomination pub amount: f64, + + pub server_time: u64, } fn per_sec(n: usize, span: Duration) -> f64 { @@ -100,7 +102,9 @@ fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result return Err("illegal ticker"), }; - Ok(Trade { time, amount, exch, price, ticker }) + Ok(Trade { time, amount, exch, price, ticker, + server_time: 0, + }) } #[allow(dead_code)] @@ -138,7 +142,9 @@ fn manual_deserialize_str(row: &csv::StringRecord) -> Result return Err("illegal ticker"), }; - Ok(Trade { time, amount, exch, price, ticker }) + Ok(Trade { time, amount, exch, price, ticker, + server_time: 0, + }) } /// Example of code used in discussion of increasing CSV parsing performance @@ -433,6 +439,12 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { n += 1; + if trade.server_time != 0 { + let diff: i64 = (trade.server_time as i64 - trade.time as i64) / 1000 / 1000; + assert!(diff >= std::i32::MIN as i64, "diff = {}, trade = {:?}", diff, trade); + assert!(diff <= std::i32::MAX as i64, "diff = {}, trade = {:?}", diff, trade); + } + // verify data is sorted by time assert!(trade.time >= last_time); last_time = trade.time; diff --git a/src/encoding.rs b/src/encoding.rs new file mode 100644 index 0000000..f796a3a --- /dev/null +++ b/src/encoding.rs @@ -0,0 +1,271 @@ +use std::num::{NonZeroU64, NonZeroU8, NonZeroI32}; +use std::mem::size_of; +use std::convert::TryFrom; +use serde::{Serialize, Deserialize}; +use markets::crypto::{Exchange, Currency, Ticker, Side}; + +mod try_from_u8 { + use std::convert::TryFrom; + use std::fmt; + use std::marker::PhantomData; + use serde::{Serializer, Deserializer}; + use serde::de::Visitor; + use serde::ser::Error as SerError; + + struct V(PhantomData); + + impl<'de, T> Visitor<'de> for V + where T: TryFrom + { + type Value = T; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("an integer code between 1-255") + } + + fn visit_u8(self, v: u8) -> Result + where E: serde::de::Error, + { + match T::try_from(v) { + Ok(v) => Ok(v), + Err(_) => { + Err(serde::de::Error::custom("Invalid code")) + } + } + } + + fn visit_u64(self, v: u64) -> Result + where E: serde::de::Error, + { + if v > 255 { + return Err(serde::de::Error::custom("Value greater than 255")) + } + match T::try_from(v as u8) { + Ok(v) => Ok(v), + Err(_) => { + Err(serde::de::Error::custom("Invalid code")) + } + } + } + } + + + pub fn deserialize<'de, D, T>(deserializer: D) -> Result + where D: Deserializer<'de>, + T: TryFrom + { + deserializer.deserialize_u8(V(PhantomData)) + } + + pub fn serialize(item: &T, serializer: S) -> Result + where S: Serializer, + T: Copy, + u8: From + { + match u8::from(*item) { + 0 => Err(S::Error::custom("not implemented: no code for variant or value")), + x => serializer.serialize_u8(x) + } + } +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct Trade { + pub time: u64, + #[serde(with = "try_from_u8")] + pub exch: Exchange, + #[serde(with = "try_from_u8")] + pub ticker: Ticker, + pub price: f64, + pub amount: f64, + pub side: Option, + pub server_time: Option, +} + +/// Represents the serialized form of a trades row +/// +/// ```console,ignore +/// 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// |e|b|q|s| srvtm | time: u64 | price: f64 | amount: f64 | +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | | | | | +/// | | | | | +/// | | | | | +/// | | | | -> server_time: Option - 0=None, other=nano offset from `time` +/// | | | | +/// | | | -> side: Option - 0=None, 1=Bid, 2=Ask +/// | | | +/// | | -> quote: Currency - see markets::crypto for u8 <-> currency codes +/// | | +/// | -> base: Currency - see markets::crypto for u8 <-> currency codes +/// | +/// -> exch: Exchange - see markets::crypto for u8 <-> exchange codes +/// +/// ``` +/// +#[derive(Debug, Clone)] +pub struct PackedTrade { + pub exch: u8, + pub base: u8, + pub quote: u8, + + /// 0=None + pub side: u8, + + /// relative offset from `time`; 0=None + pub server_time: i32, + + pub time: u64, + pub price: f64, + pub amount: f64, +} + +#[derive(Debug, Clone)] +pub struct ParseError(Box); + +/// Pull out individual fields on demand from the serialized bytes of a PackedTrade +#[repr(align(32))] +pub struct PackedTradeData<'a>(&'a [u8]); + +impl<'a> PackedTradeData<'a> { + + const EXCH_OFFSET : usize = 0; + const BASE_OFFSET : usize = 1; + const QUOTE_OFFSET : usize = 2; + const SIDE_OFFSET : usize = 3; + const SERVER_TIME_OFFSET : usize = 4; + const TIME_OFFSET : usize = 8; + const PRICE_OFFSET : usize = 16; + const AMOUNT_OFFSET : usize = 24; + + #[inline] + pub fn exch(&self) -> Result { + Exchange::try_from(self.0[Self::EXCH_OFFSET]) + } + + #[inline] + pub fn base(&self) -> Result { + Currency::try_from(self.0[Self::BASE_OFFSET]) + } + + #[inline] + pub fn quote(&self) -> Result { + Currency::try_from(self.0[Self::QUOTE_OFFSET]) + } + + #[inline] + pub fn side(&self) -> Result, markets::crypto::Error> { + match self.0[Self::SIDE_OFFSET] { + 0 => Ok(None), + other => Ok(Some(Side::try_from(other)?)), + } + } + + #[inline] + pub fn time(&self) -> Result { + atoi::atoi(&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)]) + .ok_or_else(|| { + ParseError(Box::new(format!("failed to parse integer: '{}'", + std::str::from_utf8(&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)]).unwrap_or("uft8 error") + ))) + }) + } + + #[inline] + pub fn price(&self) -> Result { + lexical::parse(&self.0[Self::PRICE_OFFSET..(Self::PRICE_OFFSET + 8)]) + } + + #[inline] + pub fn amount(&self) -> Result { + lexical::parse(&self.0[Self::AMOUNT_OFFSET..(Self::AMOUNT_OFFSET + 8)]) + } + + #[inline] + pub fn server_time(&self) -> Result, ParseError> { + let st: i32 = + atoi::atoi(&self.0[Self::SERVER_TIME_OFFSET..(Self::SERVER_TIME_OFFSET + 4)]) + .ok_or_else(|| { + ParseError(Box::new(format!("failed to parse integer: '{}'", + std::str::from_utf8(&self.0[Self::SERVER_TIME_OFFSET..(Self::SERVER_TIME_OFFSET + 4)]).unwrap_or("uft8 error") + ))) + })?; + match st { + 0 => Ok(None), + + x @ std::i32::MIN .. 0 => Ok(Some(self.time()? - x.abs() as u64)), + + x @ 1 ..= std::i32::MAX => Ok(Some(self.time()? + x as u64)), + } + } +} + + +#[allow(unused)] +#[cfg(test)] +mod tests { + use super::*; + use markets::{e, t, c}; + + #[test] + fn verify_packed_trade_is_32_bytes() { + assert_eq!(size_of::(), 32); + } + + #[test] + fn check_bincode_serialized_size() { + let trade = Trade { + time: 1586996977191449698, + exch: e!(bmex), + ticker: t!(btc-usd), + price: 1.234, + amount: 4.567, + side: None, + //server_time: NonZeroU64::new(1586996977191449698 + 1_000_000), + server_time: NonZeroI32::new(1_000_000), + //server_time: Some(1586996977191449698 + 1_000_000), + }; + + /* + let packed = PackedTrade { + time: trade.time, + exch: u8::from(trade.exch), + base: u8::from(trade.ticker.base), + quote: u8::from(trade.ticker.quote), + amount: trade.amount, + price: trade.price, + //side: trade.side.and_then(|s| NonZeroU8::new(u8::from(s))), + //server_time: NonZeroI32::new(1_000_000), + side: trade.side.map(|s| u8::from(s)).unwrap_or(0), + server_time: 1_000_000, + + }; + */ + + assert_eq!(size_of::(), 32); + + //assert_eq!(serde_json::to_string(&trade).unwrap().len(), 32); + assert_eq!(bincode::serialized_size(&trade).unwrap(), 32); + } + + #[test] + fn example_of_36_byte_trades_struct_without_the_offset_i32() { + #[repr(packed)] + pub struct Trade36 { + pub exch: Exchange, + pub ticker: Ticker, + pub side: Option, + + pub time: u64, + pub price: f64, + pub amount: f64, + + pub server_time: Option, + } + + assert_eq!(size_of::(), 36); + } +} + diff --git a/src/lib.rs b/src/lib.rs index fb2aff3..94d351c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,7 @@ +#![feature(exclusive_range_pattern)] + pub mod windows; +pub mod encoding; #[allow(unused)] #[cfg(test)] @@ -53,6 +56,10 @@ mod tests { #[test] fn memory_size_of_trades_struct() { + // tests an example in one of the baselines articles showing how with + // no particular effort this struct is 48 bytes, compared to the + // CSV per-row representation of an average of 73 bytes + use markets::crypto::{Exchange, Ticker, Side}; struct Trade { diff --git a/src/munge.rs b/src/munge.rs index 63c9e95..bf1ac34 100644 --- a/src/munge.rs +++ b/src/munge.rs @@ -73,6 +73,21 @@ enum Opt { ListCodes, + /* + Binarize { + /// Path to CSV file with trades data + #[structopt(short = "f", long = "trades-csv")] + #[structopt(parse(from_os_str))] + trades_csv: PathBuf, + + /// Where to save the binary-serialized data + #[structopt(short = "o", long = "output-path")] + #[structopt(parse(from_os_str))] + output_path: PathBuf, + + } + */ + }