diff --git a/Cargo.toml b/Cargo.toml index 8e8e644..aceb669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,13 @@ [package] name = "data-pipelines" -version = "0.1.0" +version = "0.2.0" authors = ["Jonathan Strong "] edition = "2018" +[[bin]] +name = "binary-serialization" +path = "src/binary-serialization.rs" + [[bin]] name = "csv" path = "src/csv.rs" @@ -25,7 +29,7 @@ csv = "1.1" structopt = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" -markets = { version = "0.3.1", registry = "jstrong-dev" } +markets = { version = "0.4.0", registry = "jstrong-dev" } slog = "2" slog-async = "2" slog-term = "2" @@ -39,6 +43,7 @@ clap = "2" itertools-num = "0.1" bincode = "1.2" postcard = "0.5" +memmap = "0.7" [dev-dependencies] approx = "0.3" diff --git a/justfile b/justfile new file mode 100644 index 0000000..c22a4fe --- /dev/null +++ b/justfile @@ -0,0 +1,2 @@ +flush-cache: + sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches' diff --git a/src/binary-serialization.rs b/src/binary-serialization.rs new file mode 100644 index 0000000..6de8c03 --- /dev/null +++ b/src/binary-serialization.rs @@ -0,0 +1,402 @@ +#![allow(unused)] + +#[macro_use] +extern crate slog; +#[macro_use] +extern crate markets; + +use std::io::{self, prelude::*}; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::*; +use pretty_toa::ThousandsSep; +use structopt::StructOpt; +use serde::{Serialize, Deserialize}; +use slog::Drain; +use chrono::{DateTime, Utc, NaiveDateTime}; +use markets::crypto::{Exchange, Ticker, Side, Currency}; +use pipelines::encoding; +use pipelines::windows::WeightedMeanWindow; + +macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ + eprintln!($fmt, $($args)*); + std::process::exit(1); +}}} + +const PROGRESS_EVERY: usize = 1024 * 1024 * 16; +const ONE_SECOND: u64 = 1_000_000_000; +const ONE_HOUR: u64 = ONE_SECOND * 60 * 60; + + +#[structopt(rename_all="kebab-case")] +#[derive(Debug, StructOpt)] +struct Opt { + /// Path to file with binary trades data + #[structopt(short = "f", long = "input-file")] + #[structopt(parse(from_os_str))] + input_path: PathBuf, + + /// Where to save the query results (CSV output) + #[structopt(short = "o", long = "output-path")] + #[structopt(parse(from_os_str))] + output_path: PathBuf, + + #[structopt(short = "z", long = "hard-mode")] + hard_mode: bool, +} + +fn nanos_to_utc(nanos: u64) -> DateTime { + const ONE_SECOND: u64 = 1_000_000_000; + let sec: i64 = (nanos / ONE_SECOND) as i64; + let nsec: u32 = (nanos % ONE_SECOND) as u32; + let naive = NaiveDateTime::from_timestamp(sec, nsec); + DateTime::from_utc(naive, Utc) +} + +fn per_sec(n: usize, span: Duration) -> f64 { + if n == 0 || span < Duration::from_micros(1) { return 0.0 } + let s: f64 = span.as_nanos() as f64 / 1e9f64; + n as f64 / s +} + +fn nanos(utc: DateTime) -> u64 { + (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64) +} + +fn easy_query( + data: &memmap::Mmap, + mut wtr: W, + logger: &slog::Logger, +) -> Result + where W: Write +{ + let logger = logger.new(o!("easy-mode" => "whatever, man")); + info!(logger, "beginning easy mode"); + + let n_records = data.len() / encoding::SERIALIZED_SIZE; + + let mut n = 0; + let mut n_written = 0; + + let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE); + + let mut row_buffer: Vec = Vec::with_capacity(512); + + writeln!(&mut wtr, "time,ratio,bmex,gdax") + .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?; + + assert!(n_records > 0); + let first = encoding::PackedTradeData::new(records.next().unwrap()); + n += 1; + + let mut cur_hour = first.time() - first.time() % ONE_HOUR; + let mut next_hour = cur_hour + ONE_HOUR; + + let mut bmex_total = 0.0; + let mut bmex_amount = 0.0; + let mut n_bmex = 0; + + let mut gdax_total = 0.0; + let mut gdax_amount = 0.0; + let mut n_gdax = 0; + + macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body + ($trade:ident) => {{ + match ($trade.exch(), $trade.base(), $trade.quote()) { + (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => { + bmex_total += $trade.price() * $trade.amount(); + bmex_amount += $trade.amount(); + n_bmex += 1; + } + + (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => { + gdax_total += $trade.price() * $trade.amount(); + gdax_amount += $trade.amount(); + n_gdax += 1; + + } + + _ => {} + } + }} + } + + update!(first); + + for record in records { + n += 1; + + let trade = encoding::PackedTradeData::new(record); + + if trade.time() > next_hour { + row_buffer.clear(); + itoa::write(&mut row_buffer, cur_hour).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + + if n_bmex == 0 || n_gdax == 0 { + row_buffer.write(",NaN,NaN,NaN\n".as_bytes()).unwrap(); + } else { + let bmex_wt_avg = bmex_total / bmex_amount; + let gdax_wt_avg = gdax_total / gdax_amount; + let ratio = bmex_wt_avg / gdax_wt_avg; + + row_buffer.push(b','); + dtoa::write(&mut row_buffer, ratio).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, bmex_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, gdax_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b'\n'); + } + + wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?; + n_written += 1; + + bmex_total = 0.0; + bmex_amount = 0.0; + gdax_total = 0.0; + gdax_amount = 0.0; + n_bmex = 0; + n_gdax = 0; + + cur_hour = next_hour; + next_hour += ONE_HOUR; + + // if we are skipping hours in between the last and current row, we + // need to write a NaN row for the hours that had no data + while next_hour <= trade.time() { + writeln!(&mut wtr, "{},NaN,NaN,NaN", cur_hour) + .map_err(|e| format!("writing output row failed: {}", e))?; + + n_written += 1; + cur_hour = next_hour; + next_hour += ONE_HOUR; + } + } + + update!(trade); + + if n % PROGRESS_EVERY == 0 { + info!(logger, "calculating query"; + "n" => %n.thousands_sep(), + "n_written" => %n_written.thousands_sep(), + ); + } + } + info!(logger, "finished with easy query"); + Ok(n) + +} + +fn hard_query( + data: &memmap::Mmap, + mut wtr: W, + logger: &slog::Logger, +) -> Result + where W: Write +{ + let logger = logger.new(o!("hard-mode" => "challenge accepted")); + info!(logger, "beginning hard mode"); + + let n_records = data.len() / encoding::SERIALIZED_SIZE; + + let mut n = 0; + let mut n_written = 0; + + let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE); + + // pull out first row to initialize query calculations + assert!(n_records > 0); + let first = encoding::PackedTradeData::new(records.next().unwrap()); + n += 1; + + let mut cur_bucket = first.time() - (first.time() % (ONE_SECOND * 10)) + ONE_SECOND * 10; + + #[derive(Default, Clone)] + struct Lookbacks { + pub p5: T, + pub p15: T, + pub p60: T, + } + + let mut ratios: Lookbacks = Default::default(); + + let mut bmex_windows: Lookbacks = + Lookbacks { + p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ), + p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15), + p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60), + }; + + let mut gdax_windows = bmex_windows.clone(); + + let mut row_buffer: Vec = Vec::with_capacity(512); + + macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body + ($trade:ident) => {{ + match ($trade.exch(), $trade.base(), $trade.quote()) { + (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => { + bmex_windows.p5 .push($trade.time(), $trade.price(), $trade.amount()); + bmex_windows.p15.push($trade.time(), $trade.price(), $trade.amount()); + bmex_windows.p60.push($trade.time(), $trade.price(), $trade.amount()); + } + + (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => { + gdax_windows.p5 .push($trade.time(), $trade.price(), $trade.amount()); + gdax_windows.p15.push($trade.time(), $trade.price(), $trade.amount()); + gdax_windows.p60.push($trade.time(), $trade.price(), $trade.amount()); + } + + _ => {} + } + }} + } + + writeln!(&mut wtr, "time,r5,r15,r60") + .map_err(|e| format!("writing CSV headers to output file failed: {}", e))?; + + update!(first); + + for record in records { + n += 1; + + let trade = encoding::PackedTradeData::new(record); + + if trade.time() > cur_bucket { + debug!(logger, "about to purge"; + "n" => n, + "n written" => n_written, + "trade.time" => trade.time(), + "cur_bucket" => cur_bucket, + "gdax p5 len" => gdax_windows.p5.len(), + "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(), + ); + + bmex_windows.p5 .purge(cur_bucket); + bmex_windows.p15.purge(cur_bucket); + bmex_windows.p60.purge(cur_bucket); + + gdax_windows.p5 .purge(cur_bucket); + gdax_windows.p15.purge(cur_bucket); + gdax_windows.p60.purge(cur_bucket); + + debug!(logger, "finished purge"; + "n" => n, + "n written" => n_written, + "trade.time" => trade.time(), + "cur_bucket" => cur_bucket, + "gdax p5 len" => gdax_windows.p5.len(), + "gdax p5 wt avg" => gdax_windows.p5.weighted_mean(), + ); + + ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean(); + ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean(); + ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean(); + + //row_buffers.iter_mut().for_each(|x| x.clear()); + row_buffer.clear(); + + itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b','); + dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?; + row_buffer.push(b'\n'); + + wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?; + + n_written += 1; + cur_bucket += ONE_SECOND * 10; + } + + update!(trade); + + if n % PROGRESS_EVERY == 0 { + info!(logger, "calculating hard query"; + "n" => %n.thousands_sep(), + "n_written" => %n_written.thousands_sep(), + "ratios.p5" => ratios.p5, + "ratios.p15" => ratios.p15, + "ratios.p60" => ratios.p60, + ); + } + } + info!(logger, "finished with hard query"); + Ok(n) +} + + + +fn run(start: Instant, logger: &slog::Logger) -> Result { + let Opt { input_path, output_path, hard_mode } = Opt::from_args(); + + info!(logger, "beginning to count"; + "input_path" => %input_path.display(), + ); + + if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) } + + let input_file = + fs::OpenOptions::new() + .read(true) + .open(input_path) + .map_err(|e| e.to_string())?; + + let file_length = input_file.metadata().unwrap().len(); + + if file_length % encoding::SERIALIZED_SIZE as u64 != 0 || file_length == 0 { + return Err(format!("file length is not a multiple of record size: {}", file_length)) + } + + let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE; + + info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep()); + + let data: memmap::Mmap = unsafe { + memmap::Mmap::map(&input_file) + .map_err(|e| { + format!("creating Mmap failed: {}", e) + })? + }; + + info!(logger, "opening output file for writing"); + + let wtr = fs::File::create(&output_path) + .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; + + let wtr = io::BufWriter::new(wtr); + + if hard_mode { + hard_query(&data, wtr, &logger) + } else { + easy_query(&data, wtr, &logger) + } +} + +fn main() { + let start = Instant::now(); + + let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); + let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); + let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); + let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!())); + + match run(start, &logger) { + Ok(n) => { + let took = Instant::now() - start; + info!(logger, "finished in {:?}", took; + "n rows" => %n.thousands_sep(), + "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(), + ); + } + + Err(e) => { + crit!(logger, "run failed: {:?}", e); + eprintln!("\n\nError: {}", e); + std::thread::sleep(Duration::from_millis(100)); + std::process::exit(1); + } + } + +} diff --git a/src/csv.rs b/src/csv.rs index fd05f60..bc46b62 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -24,7 +24,7 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ std::process::exit(1); }}} -const PROGRESS_EVERY: usize = 1024 * 1024; +const PROGRESS_EVERY: usize = 1024 * 1024 * 2; const ONE_SECOND: u64 = 1_000_000_000; const ONE_HOUR: u64 = ONE_SECOND * 60 * 60; @@ -402,11 +402,15 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { "gdax", ]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?; - let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); - let mut row = csv::StringRecord::new(); + //let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + //let mut row = csv::StringRecord::new(); + + let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + let mut row = csv::ByteRecord::new(); // pull out first row to initialize query calculations - rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?; + //rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?; + rdr.read_byte_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?; let trade: Trade = row.deserialize(Some(&headers)) .map_err(|e| { format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row) @@ -427,11 +431,18 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { let mut n_written = 0; let mut last_time = 0; - while rdr.read_record(&mut row) + // while rdr.read_record(&mut row) + // .map_err(|e| { + // format!("reading row {} failed: {}", (n+1).thousands_sep(), e) + // })? + // { + + while rdr.read_byte_record(&mut row) .map_err(|e| { format!("reading row {} failed: {}", (n+1).thousands_sep(), e) })? { + let trade: Trade = row.deserialize(Some(&headers)) .map_err(|e| { format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row) @@ -439,6 +450,14 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { n += 1; + if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) { + info!(logger, "parsing csv file"; + "n rows" => %n.thousands_sep(), + "n written" => %n_written.thousands_sep(), + "elapsed" => ?(Instant::now() - start), + ); + } + if trade.server_time != 0 { let diff: i64 = (trade.server_time as i64 - trade.time as i64) / 1000 / 1000; assert!(diff >= std::i32::MIN as i64, "diff = {}, trade = {:?}", diff, trade); @@ -518,14 +537,6 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { _ => {} } - if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) { - info!(logger, "parsing csv file"; - "n rows" => %n.thousands_sep(), - "n written" => %n_written.thousands_sep(), - "elapsed" => ?(Instant::now() - start), - ); - } - if cfg!(debug_assertions) && n > PROGRESS_EVERY { warn!(logger, "debug mode: exiting early"; "n rows" => %n.thousands_sep(), @@ -560,7 +571,7 @@ fn main() { info!(logger, "finished in {}", took_str; "n rows" => %n.thousands_sep(), - "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(), + "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(), ); } diff --git a/src/encoding.rs b/src/encoding.rs index 0a90789..ddaa748 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -1,86 +1,25 @@ use std::num::{NonZeroU64, NonZeroU8, NonZeroI32}; use std::mem::size_of; -use std::convert::TryFrom; -use serde::{Serialize, Deserialize}; +use std::convert::{TryFrom, TryInto}; +use serde::{Serialize, Deserialize, Deserializer}; use markets::crypto::{Exchange, Currency, Ticker, Side}; -mod try_from_u8 { - use std::convert::TryFrom; - use std::fmt; - use std::marker::PhantomData; - use serde::{Serializer, Deserializer}; - use serde::de::Visitor; - use serde::ser::Error as SerError; +pub const EXCH_OFFSET : usize = 0; +pub const BASE_OFFSET : usize = 1; +pub const QUOTE_OFFSET : usize = 2; +pub const SIDE_OFFSET : usize = 3; +pub const SERVER_TIME_OFFSET : usize = 4; +pub const TIME_OFFSET : usize = 8; +pub const PRICE_OFFSET : usize = 16; +pub const AMOUNT_OFFSET : usize = 24; +pub const SERIALIZED_SIZE : usize = 32; - struct V(PhantomData); +/// `server_time` is stored in milliseconds, while `time` is nanoseconds. +/// this is what you need to multiply the stored `server_time` data by to +/// get it back to nanoseconds. +pub const SERVER_TIME_DOWNSCALE_FACTOR: u64 = 1_000_000; - impl<'de, T> Visitor<'de> for V - where T: TryFrom - { - type Value = T; - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("an integer code between 1-255") - } - - fn visit_u8(self, v: u8) -> Result - where E: serde::de::Error, - { - match T::try_from(v) { - Ok(v) => Ok(v), - Err(_) => { - Err(serde::de::Error::custom("Invalid code")) - } - } - } - - fn visit_u64(self, v: u64) -> Result - where E: serde::de::Error, - { - if v > 255 { - return Err(serde::de::Error::custom("Value greater than 255")) - } - match T::try_from(v as u8) { - Ok(v) => Ok(v), - Err(_) => { - Err(serde::de::Error::custom("Invalid code")) - } - } - } - } - - - pub fn deserialize<'de, D, T>(deserializer: D) -> Result - where D: Deserializer<'de>, - T: TryFrom - { - deserializer.deserialize_u8(V(PhantomData)) - } - - pub fn serialize(item: &T, serializer: S) -> Result - where S: Serializer, - T: Copy, - u8: From - { - match u8::from(*item) { - 0 => Err(S::Error::custom("not implemented: no code for variant or value")), - x => serializer.serialize_u8(x) - } - } -} - -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct Serde32BytesTrade { - pub time: u64, - #[serde(with = "try_from_u8")] - pub exch: Exchange, - #[serde(with = "try_from_u8")] - pub ticker: Ticker, - pub price: f64, - pub amount: f64, - pub side: Option, - pub server_time: Option, -} /// Represents the serialized form of a trades row /// @@ -122,6 +61,20 @@ pub struct PackedTrade { pub amount: f64, } +#[derive(Deserialize, Serialize, Debug, Clone)] + +pub struct CsvTrade { + pub time: u64, + pub exch: Exchange, + pub ticker: Ticker, + pub price: f64, + pub amount: f64, + #[serde(deserialize_with = "deserialize_csv_side")] + pub side: Option, + #[serde(deserialize_with = "deserialize_csv_server_time")] + pub server_time: Option, +} + #[derive(Debug, Clone)] pub struct ParseError(Box); @@ -129,99 +82,311 @@ pub struct ParseError(Box); #[repr(align(32))] pub struct PackedTradeData<'a>(&'a [u8]); -impl<'a> PackedTradeData<'a> { +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct Serde32BytesTrade { + pub time: u64, + #[serde(with = "try_from_u8")] + pub exch: Exchange, + #[serde(with = "try_from_u8")] + pub ticker: Ticker, + pub price: f64, + pub amount: f64, + pub side: Option, + pub server_time: Option, +} + +pub fn server_time_to_delta(time: u64, server_time: u64) -> i32 { + let ms = ( + (server_time / SERVER_TIME_DOWNSCALE_FACTOR) as i64 + - (time / SERVER_TIME_DOWNSCALE_FACTOR) as i64 + ) as i32; + + match ms { + // if the two values are either identical, or so close that the difference + // is washed out when we downscale, return i32::MIN as a sentinel indicating + // time == server_time + // + 0 => std::i32::MIN, + + other => other + } +} + +/// Convert a `server_time` delta back to its unix nanosecond timestamp form. +/// +/// Note: while the `server_time` delta is stored as a signed integer, to be able to express a +/// delta in both directions relative to `time`, we can't just add a negative `i64` to a +/// `u64`, it doesn't work like that. this match either subtracts the absolute value of a +/// negative delta, or adds a positive delta, to get around this conundrum. +pub fn delta_to_server_time(time: u64, delta: i32) -> Option { + const MIN_VALID: i32 = std::i32::MIN + 1; - const EXCH_OFFSET : usize = 0; - const BASE_OFFSET : usize = 1; - const QUOTE_OFFSET : usize = 2; - const SIDE_OFFSET : usize = 3; - const SERVER_TIME_OFFSET : usize = 4; - const TIME_OFFSET : usize = 8; - const PRICE_OFFSET : usize = 16; - const AMOUNT_OFFSET : usize = 24; + match delta { + 0 => None, + + // -1 is another sentinel indicating that time == server_time + std::i32::MIN => Some(time), + + x @ MIN_VALID .. 0 => Some(time - (x.abs() as u64 * SERVER_TIME_DOWNSCALE_FACTOR)), + + x @ 1 ..= std::i32::MAX => Some(time + (x as u64 * SERVER_TIME_DOWNSCALE_FACTOR)), + } +} + + +pub fn serialize<'a, 'b>(buf: &'a mut [u8], trade: &'b CsvTrade) { + assert_eq!(buf.len(), SERIALIZED_SIZE); + + buf[EXCH_OFFSET] = u8::from(trade.exch); + buf[BASE_OFFSET] = u8::from(trade.ticker.base); + buf[QUOTE_OFFSET] = u8::from(trade.ticker.quote); + + match trade.side { + Some(side) => { + buf[SIDE_OFFSET] = u8::from(side); + } + + None => { + buf[SIDE_OFFSET] = 0; + } + } + + match trade.server_time { + Some(st) => { + let delta: i32 = server_time_to_delta(trade.time, st); + (&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&delta.to_le_bytes()[..]); + } + + None => { + (&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&0i32.to_le_bytes()[..]); + } + } + + (&mut buf[TIME_OFFSET..(TIME_OFFSET + 8)]).copy_from_slice(&trade.time.to_le_bytes()[..]); + (&mut buf[PRICE_OFFSET..(PRICE_OFFSET + 8)]).copy_from_slice(&trade.price.to_le_bytes()[..]); + (&mut buf[AMOUNT_OFFSET..(AMOUNT_OFFSET + 8)]).copy_from_slice(&trade.amount.to_le_bytes()[..]); +} + + +impl<'a> PackedTradeData<'a> { + pub fn new(buf: &'a [u8]) -> Self { + assert_eq!(buf.len(), SERIALIZED_SIZE); + Self(buf) + } #[inline] pub fn exch(&self) -> Result { - Exchange::try_from(self.0[Self::EXCH_OFFSET]) + Exchange::try_from(self.0[EXCH_OFFSET]) } #[inline] pub fn base(&self) -> Result { - Currency::try_from(self.0[Self::BASE_OFFSET]) + Currency::try_from(self.0[BASE_OFFSET]) + } + #[inline] + pub fn ticker(&self) -> Ticker { + Ticker { + base: self.base().unwrap(), + quote: self.quote().unwrap(), + } } #[inline] pub fn quote(&self) -> Result { - Currency::try_from(self.0[Self::QUOTE_OFFSET]) + Currency::try_from(self.0[QUOTE_OFFSET]) } #[inline] pub fn side(&self) -> Result, markets::crypto::Error> { - match self.0[Self::SIDE_OFFSET] { + match self.0[SIDE_OFFSET] { 0 => Ok(None), other => Ok(Some(Side::try_from(other)?)), } } #[inline] - pub fn time(&self) -> Result { - atoi::atoi(&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)]) - .ok_or_else(|| { - ParseError(Box::new(format!("failed to parse integer: '{}'", - std::str::from_utf8(&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)]).unwrap_or("uft8 error") - ))) - }) + pub fn time(&self) -> u64 { + u64::from_le_bytes( + (&self.0[TIME_OFFSET..(TIME_OFFSET + 8)]).try_into().unwrap() + ) } #[inline] - pub fn price(&self) -> Result { - lexical::parse(&self.0[Self::PRICE_OFFSET..(Self::PRICE_OFFSET + 8)]) + pub fn price(&self) -> f64 { + f64::from_le_bytes( + (&self.0[PRICE_OFFSET..(PRICE_OFFSET + 8)]).try_into().unwrap() + ) } #[inline] - pub fn amount(&self) -> Result { - lexical::parse(&self.0[Self::AMOUNT_OFFSET..(Self::AMOUNT_OFFSET + 8)]) + pub fn amount(&self) -> f64 { + f64::from_le_bytes( + (&self.0[AMOUNT_OFFSET..(AMOUNT_OFFSET + 8)]).try_into().unwrap() + ) } - /// `server_time` is stored in milliseconds, while `time` is nanoseconds. - /// this is what you need to multiply the stored `server_time` data by to - /// get it back to nanoseconds. - const SERVER_TIME_DOWNSCALE_FACTOR: u64 = 1_000_000; - #[inline] - pub fn server_time(&self) -> Result, ParseError> { - let st: i32 = - atoi::atoi(&self.0[Self::SERVER_TIME_OFFSET..(Self::SERVER_TIME_OFFSET + 4)]) - .ok_or_else(|| { - ParseError(Box::new(format!("failed to parse integer: '{}'", - std::str::from_utf8(&self.0[Self::SERVER_TIME_OFFSET..(Self::SERVER_TIME_OFFSET + 4)]).unwrap_or("uft8 error") - ))) - })?; - - // while the `server_time` delta is stored as a signed integer, to be able to express a - // delta in both directions relative to `time`, we can't just add a negative `i64` to a - // `u64`, it doesn't work like that. this match either subtracts the absolute value of a - // negative delta, or adds a positive delta, to get around this conundrum. - // - // `SERVER_TIME_DOWNSCALE_FACTOR` is used to rescale the delta to nanoseconds prior to its - // being applied to `time`. + pub fn server_time(&self) -> Option { + let delta = i32::from_le_bytes( + (&self.0[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).try_into().unwrap() + ); - match st { - 0 => Ok(None), + delta_to_server_time(self.time(), delta) + } +} - x @ std::i32::MIN .. 0 => Ok(Some(self.time()? - (x.abs() as u64 * Self::SERVER_TIME_DOWNSCALE_FACTOR))), +pub fn deserialize_csv_side<'de, D>(deserializer: D) -> Result, D::Error> + where D: Deserializer<'de> +{ + let s: &str = Deserialize::deserialize(deserializer)?; + match s { + "bid" => Ok(Some(Side::Bid)), + "ask" => Ok(Some(Side::Ask)), + _ => Ok(None) + } +} - x @ 1 ..= std::i32::MAX => Ok(Some(self.time()? + (x as u64 * Self::SERVER_TIME_DOWNSCALE_FACTOR))), - } +pub fn deserialize_csv_server_time<'de, D>(deserializer: D) -> Result, D::Error> + where D: Deserializer<'de> +{ + let st: u64 = Deserialize::deserialize(deserializer)?; + match st { + 0 => Ok(None), + other => Ok(Some(other)) } } +mod try_from_u8 { + use std::convert::TryFrom; + use std::fmt; + use std::marker::PhantomData; + use serde::{Serializer, Deserializer}; + use serde::de::Visitor; + use serde::ser::Error as SerError; + + struct V(PhantomData); + + impl<'de, T> Visitor<'de> for V + where T: TryFrom + { + type Value = T; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("an integer code between 1-255") + } + + fn visit_u8(self, v: u8) -> Result + where E: serde::de::Error, + { + match T::try_from(v) { + Ok(v) => Ok(v), + Err(_) => { + Err(serde::de::Error::custom("Invalid code")) + } + } + } + + fn visit_u64(self, v: u64) -> Result + where E: serde::de::Error, + { + if v > 255 { + return Err(serde::de::Error::custom("Value greater than 255")) + } + match T::try_from(v as u8) { + Ok(v) => Ok(v), + Err(_) => { + Err(serde::de::Error::custom("Invalid code")) + } + } + } + } + + + pub fn deserialize<'de, D, T>(deserializer: D) -> Result + where D: Deserializer<'de>, + T: TryFrom + { + deserializer.deserialize_u8(V(PhantomData)) + } + + pub fn serialize(item: &T, serializer: S) -> Result + where S: Serializer, + T: Copy, + u8: From + { + match u8::from(*item) { + 0 => Err(S::Error::custom("not implemented: no code for variant or value")), + x => serializer.serialize_u8(x) + } + } +} #[allow(unused)] #[cfg(test)] mod tests { use super::*; + use std::io::{self, prelude::*}; use markets::{e, t, c}; + use approx::assert_relative_eq; + + const CSV: &str = + "time,amount,exch,price,server_time,side,ticker\n\ + 1561939200002479372,1.4894,bnce,292.7,1561939199919000064,,eth_usd\n\ + 1561939200011035644,0.0833333283662796,btfx,10809.0,1561939199927000064,bid,btc_usd\n\ + 1561939200011055712,0.8333191871643066,btfx,10809.0,1561939199927000064,bid,btc_usd\n\ + 1561939200019037617,0.083096,bnce,10854.1,1561939199935000064,,btc_usd\n\ + 1561939200026450471,0.125,okex,123.21,1561939200026450432,ask,ltc_usd\n\ + 1561939200027716312,0.704054,okex,123.21,1561939200027716352,ask,ltc_usd\n\ + 1561939200028633907,0.11,okex,123.22,1561939200028633856,bid,ltc_usd\n\ + 1561939200029908535,1.438978,okex,123.22,1561939200029908480,ask,ltc_usd\n\ + 1561939200030393495,0.257589,okex,123.22,1561939200030393600,bid,ltc_usd" + ; + + #[test] + fn parse_csv_sample_with_csv_trade() { + let csv: Vec = CSV.as_bytes().to_vec(); + let mut rdr = csv::Reader::from_reader(io::Cursor::new(csv)); + let mut rows = Vec::new(); + let headers = rdr.byte_headers().unwrap().clone(); + let mut row = csv::ByteRecord::new(); + while rdr.read_byte_record(&mut row).unwrap() { + let trade: CsvTrade = row.deserialize(Some(&headers)).unwrap(); + rows.push(trade); + } + + assert_eq!(rows[0].time, 1561939200002479372); + assert_eq!(rows[1].exch, e!(btfx)); + + let mut buf = vec![0u8; 32]; + + for (i, trade) in rows.iter().enumerate() { + assert!(trade.server_time.is_some()); + let st = trade.server_time.unwrap(); + let delta = server_time_to_delta(trade.time, st); + dbg!(i, trade, trade.time, st, + trade.time as i64 - st as i64, delta, + (trade.time / SERVER_TIME_DOWNSCALE_FACTOR) as i64 - (st / SERVER_TIME_DOWNSCALE_FACTOR) as i64, + ); + assert!(delta != 0); + let rt: u64 = delta_to_server_time(trade.time, delta).unwrap(); + let abs_diff = (rt as i64 - st as i64).abs(); + let max_allowable_diff = SERVER_TIME_DOWNSCALE_FACTOR; // * 2; + dbg!(rt, abs_diff, max_allowable_diff); + assert!(abs_diff < max_allowable_diff as i64); + + serialize(&mut buf[..], &trade); + { + let packed = PackedTradeData(&buf[..]); + assert_eq!(packed.time(), trade.time); + assert_eq!(packed.exch().unwrap(), trade.exch); + assert_eq!(packed.base().unwrap(), trade.ticker.base); + assert_eq!(packed.quote().unwrap(), trade.ticker.quote); + assert_eq!(packed.side().unwrap(), trade.side); + assert_relative_eq!(packed.price(), trade.price); + assert_relative_eq!(packed.amount(), trade.amount); + } + } + } #[test] fn verify_packed_trade_is_32_bytes() { diff --git a/src/munge.rs b/src/munge.rs index bf1ac34..e0753ab 100644 --- a/src/munge.rs +++ b/src/munge.rs @@ -15,6 +15,7 @@ use serde::{Serialize, Deserialize}; use slog::Drain; use chrono::{DateTime, Utc, NaiveDateTime}; use markets::crypto::{Exchange, Ticker, Side, Currency}; +use pipelines::encoding; macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ eprintln!($fmt, $($args)*); @@ -73,7 +74,6 @@ enum Opt { ListCodes, - /* Binarize { /// Path to CSV file with trades data #[structopt(short = "f", long = "trades-csv")] @@ -85,10 +85,14 @@ enum Opt { #[structopt(parse(from_os_str))] output_path: PathBuf, - } - */ - + }, + CountRows { + /// Path to file with binary trades data + #[structopt(short = "f", long = "input-file")] + #[structopt(parse(from_os_str))] + input_path: PathBuf, + }, } #[derive(Deserialize)] @@ -149,6 +153,122 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { let mut n = 0; match opt { + Opt::CountRows { input_path } => { + let logger = logger.new(o!("cmd" => "count-rows")); + info!(logger, "beginning to count"; + "input_path" => %input_path.display(), + ); + + if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) } + + let input_file = + fs::OpenOptions::new() + .read(true) + .open(input_path) + .map_err(|e| e.to_string())?; + + let file_length = input_file.metadata().unwrap().len(); + + if file_length % encoding::SERIALIZED_SIZE as u64 != 0 { + return Err(format!("file length is not a multiple of record size: {}", file_length)) + } + + let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE; + + info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep()); + + let data: memmap::Mmap = unsafe { + memmap::Mmap::map(&input_file) + .map_err(|e| { + format!("creating Mmap failed: {}", e) + })? + }; + + let mut n_gdax = 0; + let mut n_bmex = 0; + + for i in 0..n_records { + let j = i * encoding::SERIALIZED_SIZE; + let k = j + encoding::SERIALIZED_SIZE; + let packed = encoding::PackedTradeData::new(&data[j..k]); + n_gdax += (packed.exch().unwrap() == e!(gdax)) as usize; + n_bmex += (packed.exch().unwrap() == e!(bmex)) as usize; + n += 1; + } + + info!(logger, "finished reading flle"; + "n gdax" => n_gdax.thousands_sep(), + "n bmex" => n_bmex.thousands_sep(), + ); + } + + Opt::Binarize { trades_csv, output_path } => { + let logger = logger.new(o!("cmd" => "binarize")); + info!(logger, "beginning binarize"; + "trades_csv" => %trades_csv.display(), + "output_path" => %output_path.display(), + ); + + if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) } + + info!(logger, "opening trades_csv file"); + + let rdr = fs::File::open(&trades_csv) + .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?; + + let rdr = io::BufReader::new(rdr); + + let mut rdr = csv::Reader::from_reader(rdr); + + info!(logger, "opening output file for writing"); + + let wtr = fs::File::create(&output_path) + .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; + + let mut wtr = io::BufWriter::new(wtr); + + let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + let mut row = csv::ByteRecord::new(); + + let mut buf = vec![0u8; encoding::SERIALIZED_SIZE]; + + let mut n = 0; + let mut n_written = 0; + let mut n_bytes_written = 0; + let mut n_bytes_read = headers.as_slice().len() + headers.len() + 1; + + while rdr.read_byte_record(&mut row) + .map_err(|e| { + format!("reading row {} failed: {}", (n+1).thousands_sep(), e) + })? + { + let trade: encoding::CsvTrade = row.deserialize(Some(&headers)).map_err(|e| e.to_string())?; + n += 1; + n_bytes_read += row.as_slice().len() + row.len() + 1; + + encoding::serialize(&mut buf[..], &trade); + + let bytes_written = wtr.write(&buf[..]).map_err(|e| e.to_string())?; + assert_eq!(bytes_written, encoding::SERIALIZED_SIZE); + n_written += 1; + n_bytes_written += bytes_written; + + if n % PROGRESS_EVERY == 0 { + info!(logger, "binarizing csv"; + "elapsed" => ?(Instant::now() - start), + "n" => %n.thousands_sep(), + "n_written" => %n_written.thousands_sep(), + "mb read" => (n_bytes_read as f64 / 1024.0 / 1024.0), + "mb written" => (n_bytes_written as f64 / 1024.0 / 1024.0), + ); + } + } + info!(logger, "finished reading/converting csv"); + + assert_eq!(n_bytes_written % encoding::SERIALIZED_SIZE, 0); + + } + Opt::PrepPostgres { trades_csv, output_path } => { let logger = logger.new(o!("cmd" => "prep-postgres")); @@ -326,7 +446,7 @@ fn main() { let took = Instant::now() - start; info!(logger, "finished in {:?}", took; "n rows" => %n.thousands_sep(), - "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(), + "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(), ); }