Browse Source

working binary-serialization code

tags/v0.2.0
Jonathan Strong 4 years ago
parent
commit
110d7316b4
6 changed files with 852 additions and 147 deletions
  1. +7
    -2
      Cargo.toml
  2. +2
    -0
      justfile
  3. +402
    -0
      src/binary-serialization.rs
  4. +25
    -14
      src/csv.rs
  5. +291
    -126
      src/encoding.rs
  6. +125
    -5
      src/munge.rs

+ 7
- 2
Cargo.toml View File

@@ -1,9 +1,13 @@
[package]
name = "data-pipelines"
version = "0.1.0"
version = "0.2.0"
authors = ["Jonathan Strong <jonathan.strong@gmail.com>"]
edition = "2018"

[[bin]]
name = "binary-serialization"
path = "src/binary-serialization.rs"

[[bin]]
name = "csv"
path = "src/csv.rs"
@@ -25,7 +29,7 @@ csv = "1.1"
structopt = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
markets = { version = "0.3.1", registry = "jstrong-dev" }
markets = { version = "0.4.0", registry = "jstrong-dev" }
slog = "2"
slog-async = "2"
slog-term = "2"
@@ -39,6 +43,7 @@ clap = "2"
itertools-num = "0.1"
bincode = "1.2"
postcard = "0.5"
memmap = "0.7"

[dev-dependencies]
approx = "0.3"


+ 2
- 0
justfile View File

@@ -0,0 +1,2 @@
flush-cache:
sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'

+ 402
- 0
src/binary-serialization.rs View File

@@ -0,0 +1,402 @@
#![allow(unused)]

#[macro_use]
extern crate slog;
#[macro_use]
extern crate markets;

use std::io::{self, prelude::*};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::*;
use pretty_toa::ThousandsSep;
use structopt::StructOpt;
use serde::{Serialize, Deserialize};
use slog::Drain;
use chrono::{DateTime, Utc, NaiveDateTime};
use markets::crypto::{Exchange, Ticker, Side, Currency};
use pipelines::encoding;
use pipelines::windows::WeightedMeanWindow;

macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
eprintln!($fmt, $($args)*);
std::process::exit(1);
}}}

const PROGRESS_EVERY: usize = 1024 * 1024 * 16;
const ONE_SECOND: u64 = 1_000_000_000;
const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;


#[structopt(rename_all="kebab-case")]
#[derive(Debug, StructOpt)]
struct Opt {
/// Path to file with binary trades data
#[structopt(short = "f", long = "input-file")]
#[structopt(parse(from_os_str))]
input_path: PathBuf,

/// Where to save the query results (CSV output)
#[structopt(short = "o", long = "output-path")]
#[structopt(parse(from_os_str))]
output_path: PathBuf,

#[structopt(short = "z", long = "hard-mode")]
hard_mode: bool,
}

fn nanos_to_utc(nanos: u64) -> DateTime<Utc> {
const ONE_SECOND: u64 = 1_000_000_000;
let sec: i64 = (nanos / ONE_SECOND) as i64;
let nsec: u32 = (nanos % ONE_SECOND) as u32;
let naive = NaiveDateTime::from_timestamp(sec, nsec);
DateTime::from_utc(naive, Utc)
}

fn per_sec(n: usize, span: Duration) -> f64 {
if n == 0 || span < Duration::from_micros(1) { return 0.0 }
let s: f64 = span.as_nanos() as f64 / 1e9f64;
n as f64 / s
}

fn nanos(utc: DateTime<Utc>) -> u64 {
(utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64)
}

fn easy_query<W>(
data: &memmap::Mmap,
mut wtr: W,
logger: &slog::Logger,
) -> Result<usize, String>
where W: Write
{
let logger = logger.new(o!("easy-mode" => "whatever, man"));
info!(logger, "beginning easy mode");

let n_records = data.len() / encoding::SERIALIZED_SIZE;

let mut n = 0;
let mut n_written = 0;

let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);

let mut row_buffer: Vec<u8> = Vec::with_capacity(512);

writeln!(&mut wtr, "time,ratio,bmex,gdax")
.map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;

assert!(n_records > 0);
let first = encoding::PackedTradeData::new(records.next().unwrap());
n += 1;

let mut cur_hour = first.time() - first.time() % ONE_HOUR;
let mut next_hour = cur_hour + ONE_HOUR;

let mut bmex_total = 0.0;
let mut bmex_amount = 0.0;
let mut n_bmex = 0;

let mut gdax_total = 0.0;
let mut gdax_amount = 0.0;
let mut n_gdax = 0;

macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
($trade:ident) => {{
match ($trade.exch(), $trade.base(), $trade.quote()) {
(Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
bmex_total += $trade.price() * $trade.amount();
bmex_amount += $trade.amount();
n_bmex += 1;
}

(Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
gdax_total += $trade.price() * $trade.amount();
gdax_amount += $trade.amount();
n_gdax += 1;

}
_ => {}
}
}}
}

update!(first);

for record in records {
n += 1;

let trade = encoding::PackedTradeData::new(record);

if trade.time() > next_hour {
row_buffer.clear();
itoa::write(&mut row_buffer, cur_hour).map_err(|e| format!("serializing number to buffer failed: {}", e))?;

if n_bmex == 0 || n_gdax == 0 {
row_buffer.write(",NaN,NaN,NaN\n".as_bytes()).unwrap();
} else {
let bmex_wt_avg = bmex_total / bmex_amount;
let gdax_wt_avg = gdax_total / gdax_amount;
let ratio = bmex_wt_avg / gdax_wt_avg;

row_buffer.push(b',');
dtoa::write(&mut row_buffer, ratio).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
row_buffer.push(b',');
dtoa::write(&mut row_buffer, bmex_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
row_buffer.push(b',');
dtoa::write(&mut row_buffer, gdax_wt_avg).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
row_buffer.push(b'\n');
}

wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;
n_written += 1;

bmex_total = 0.0;
bmex_amount = 0.0;
gdax_total = 0.0;
gdax_amount = 0.0;
n_bmex = 0;
n_gdax = 0;

cur_hour = next_hour;
next_hour += ONE_HOUR;

// if we are skipping hours in between the last and current row, we
// need to write a NaN row for the hours that had no data
while next_hour <= trade.time() {
writeln!(&mut wtr, "{},NaN,NaN,NaN", cur_hour)
.map_err(|e| format!("writing output row failed: {}", e))?;

n_written += 1;
cur_hour = next_hour;
next_hour += ONE_HOUR;
}
}

update!(trade);
if n % PROGRESS_EVERY == 0 {
info!(logger, "calculating query";
"n" => %n.thousands_sep(),
"n_written" => %n_written.thousands_sep(),
);
}
}
info!(logger, "finished with easy query");
Ok(n)

}

fn hard_query<W>(
data: &memmap::Mmap,
mut wtr: W,
logger: &slog::Logger,
) -> Result<usize, String>
where W: Write
{
let logger = logger.new(o!("hard-mode" => "challenge accepted"));
info!(logger, "beginning hard mode");

let n_records = data.len() / encoding::SERIALIZED_SIZE;

let mut n = 0;
let mut n_written = 0;

let mut records = data.chunks_exact(encoding::SERIALIZED_SIZE);

// pull out first row to initialize query calculations
assert!(n_records > 0);
let first = encoding::PackedTradeData::new(records.next().unwrap());
n += 1;

let mut cur_bucket = first.time() - (first.time() % (ONE_SECOND * 10)) + ONE_SECOND * 10;

#[derive(Default, Clone)]
struct Lookbacks<T> {
pub p5: T,
pub p15: T,
pub p60: T,
}

let mut ratios: Lookbacks<f64> = Default::default();

let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
Lookbacks {
p5: WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
};

let mut gdax_windows = bmex_windows.clone();

let mut row_buffer: Vec<u8> = Vec::with_capacity(512);

macro_rules! update { // in macro to avoid repeating code once outside loop, and again in loop body
($trade:ident) => {{
match ($trade.exch(), $trade.base(), $trade.quote()) {
(Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
bmex_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
bmex_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
bmex_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
}

(Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
gdax_windows.p5 .push($trade.time(), $trade.price(), $trade.amount());
gdax_windows.p15.push($trade.time(), $trade.price(), $trade.amount());
gdax_windows.p60.push($trade.time(), $trade.price(), $trade.amount());
}
_ => {}
}
}}
}

writeln!(&mut wtr, "time,r5,r15,r60")
.map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;

update!(first);

for record in records {
n += 1;

let trade = encoding::PackedTradeData::new(record);

if trade.time() > cur_bucket {
debug!(logger, "about to purge";
"n" => n,
"n written" => n_written,
"trade.time" => trade.time(),
"cur_bucket" => cur_bucket,
"gdax p5 len" => gdax_windows.p5.len(),
"gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
);

bmex_windows.p5 .purge(cur_bucket);
bmex_windows.p15.purge(cur_bucket);
bmex_windows.p60.purge(cur_bucket);

gdax_windows.p5 .purge(cur_bucket);
gdax_windows.p15.purge(cur_bucket);
gdax_windows.p60.purge(cur_bucket);

debug!(logger, "finished purge";
"n" => n,
"n written" => n_written,
"trade.time" => trade.time(),
"cur_bucket" => cur_bucket,
"gdax p5 len" => gdax_windows.p5.len(),
"gdax p5 wt avg" => gdax_windows.p5.weighted_mean(),
);

ratios.p5 = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();

//row_buffers.iter_mut().for_each(|x| x.clear());
row_buffer.clear();

itoa::write(&mut row_buffer, cur_bucket).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
row_buffer.push(b',');
dtoa::write(&mut row_buffer, ratios.p5 ).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
row_buffer.push(b',');
dtoa::write(&mut row_buffer, ratios.p15).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
row_buffer.push(b',');
dtoa::write(&mut row_buffer, ratios.p60).map_err(|e| format!("serializing number to buffer failed: {}", e))?;
row_buffer.push(b'\n');

wtr.write_all(&row_buffer[..]).map_err(|e| format!("writing row failed: {}", e))?;

n_written += 1;
cur_bucket += ONE_SECOND * 10;
}

update!(trade);
if n % PROGRESS_EVERY == 0 {
info!(logger, "calculating hard query";
"n" => %n.thousands_sep(),
"n_written" => %n_written.thousands_sep(),
"ratios.p5" => ratios.p5,
"ratios.p15" => ratios.p15,
"ratios.p60" => ratios.p60,
);
}
}
info!(logger, "finished with hard query");
Ok(n)
}



fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
let Opt { input_path, output_path, hard_mode } = Opt::from_args();

info!(logger, "beginning to count";
"input_path" => %input_path.display(),
);

if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) }

let input_file =
fs::OpenOptions::new()
.read(true)
.open(input_path)
.map_err(|e| e.to_string())?;

let file_length = input_file.metadata().unwrap().len();

if file_length % encoding::SERIALIZED_SIZE as u64 != 0 || file_length == 0 {
return Err(format!("file length is not a multiple of record size: {}", file_length))
}

let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE;

info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep());

let data: memmap::Mmap = unsafe {
memmap::Mmap::map(&input_file)
.map_err(|e| {
format!("creating Mmap failed: {}", e)
})?
};

info!(logger, "opening output file for writing");

let wtr = fs::File::create(&output_path)
.map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;

let wtr = io::BufWriter::new(wtr);

if hard_mode {
hard_query(&data, wtr, &logger)
} else {
easy_query(&data, wtr, &logger)
}
}

fn main() {
let start = Instant::now();

let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));

match run(start, &logger) {
Ok(n) => {
let took = Instant::now() - start;
info!(logger, "finished in {:?}", took;
"n rows" => %n.thousands_sep(),
"rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
);
}

Err(e) => {
crit!(logger, "run failed: {:?}", e);
eprintln!("\n\nError: {}", e);
std::thread::sleep(Duration::from_millis(100));
std::process::exit(1);
}
}

}

+ 25
- 14
src/csv.rs View File

@@ -24,7 +24,7 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
std::process::exit(1);
}}}

const PROGRESS_EVERY: usize = 1024 * 1024;
const PROGRESS_EVERY: usize = 1024 * 1024 * 2;
const ONE_SECOND: u64 = 1_000_000_000;
const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;

@@ -402,11 +402,15 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
"gdax",
]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;

let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
let mut row = csv::StringRecord::new();
//let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
//let mut row = csv::StringRecord::new();

let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
let mut row = csv::ByteRecord::new();

// pull out first row to initialize query calculations
rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
//rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
rdr.read_byte_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
let trade: Trade = row.deserialize(Some(&headers))
.map_err(|e| {
format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
@@ -427,11 +431,18 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
let mut n_written = 0;
let mut last_time = 0;

while rdr.read_record(&mut row)
// while rdr.read_record(&mut row)
// .map_err(|e| {
// format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
// })?
// {

while rdr.read_byte_record(&mut row)
.map_err(|e| {
format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
})?
{

let trade: Trade = row.deserialize(Some(&headers))
.map_err(|e| {
format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
@@ -439,6 +450,14 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {

n += 1;

if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
info!(logger, "parsing csv file";
"n rows" => %n.thousands_sep(),
"n written" => %n_written.thousands_sep(),
"elapsed" => ?(Instant::now() - start),
);
}

if trade.server_time != 0 {
let diff: i64 = (trade.server_time as i64 - trade.time as i64) / 1000 / 1000;
assert!(diff >= std::i32::MIN as i64, "diff = {}, trade = {:?}", diff, trade);
@@ -518,14 +537,6 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
_ => {}
}

if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
info!(logger, "parsing csv file";
"n rows" => %n.thousands_sep(),
"n written" => %n_written.thousands_sep(),
"elapsed" => ?(Instant::now() - start),
);
}

if cfg!(debug_assertions) && n > PROGRESS_EVERY {
warn!(logger, "debug mode: exiting early";
"n rows" => %n.thousands_sep(),
@@ -560,7 +571,7 @@ fn main() {

info!(logger, "finished in {}", took_str;
"n rows" => %n.thousands_sep(),
"rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
"rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
);
}



+ 291
- 126
src/encoding.rs View File

@@ -1,86 +1,25 @@
use std::num::{NonZeroU64, NonZeroU8, NonZeroI32};
use std::mem::size_of;
use std::convert::TryFrom;
use serde::{Serialize, Deserialize};
use std::convert::{TryFrom, TryInto};
use serde::{Serialize, Deserialize, Deserializer};
use markets::crypto::{Exchange, Currency, Ticker, Side};

mod try_from_u8 {
use std::convert::TryFrom;
use std::fmt;
use std::marker::PhantomData;
use serde::{Serializer, Deserializer};
use serde::de::Visitor;
use serde::ser::Error as SerError;
pub const EXCH_OFFSET : usize = 0;
pub const BASE_OFFSET : usize = 1;
pub const QUOTE_OFFSET : usize = 2;
pub const SIDE_OFFSET : usize = 3;
pub const SERVER_TIME_OFFSET : usize = 4;
pub const TIME_OFFSET : usize = 8;
pub const PRICE_OFFSET : usize = 16;
pub const AMOUNT_OFFSET : usize = 24;
pub const SERIALIZED_SIZE : usize = 32;

struct V<T>(PhantomData<T>);
/// `server_time` is stored in milliseconds, while `time` is nanoseconds.
/// this is what you need to multiply the stored `server_time` data by to
/// get it back to nanoseconds.
pub const SERVER_TIME_DOWNSCALE_FACTOR: u64 = 1_000_000;

impl<'de, T> Visitor<'de> for V<T>
where T: TryFrom<u8>
{
type Value = T;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("an integer code between 1-255")
}

fn visit_u8<E>(self, v: u8) -> Result<Self::Value, E>
where E: serde::de::Error,
{
match T::try_from(v) {
Ok(v) => Ok(v),
Err(_) => {
Err(serde::de::Error::custom("Invalid code"))
}
}
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where E: serde::de::Error,
{
if v > 255 {
return Err(serde::de::Error::custom("Value greater than 255"))
}
match T::try_from(v as u8) {
Ok(v) => Ok(v),
Err(_) => {
Err(serde::de::Error::custom("Invalid code"))
}
}
}
}


pub fn deserialize<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where D: Deserializer<'de>,
T: TryFrom<u8>
{
deserializer.deserialize_u8(V(PhantomData))
}

pub fn serialize<S, T>(item: &T, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer,
T: Copy,
u8: From<T>
{
match u8::from(*item) {
0 => Err(S::Error::custom("not implemented: no code for variant or value")),
x => serializer.serialize_u8(x)
}
}
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Serde32BytesTrade {
pub time: u64,
#[serde(with = "try_from_u8")]
pub exch: Exchange,
#[serde(with = "try_from_u8")]
pub ticker: Ticker,
pub price: f64,
pub amount: f64,
pub side: Option<Side>,
pub server_time: Option<NonZeroI32>,
}

/// Represents the serialized form of a trades row
///
@@ -122,6 +61,20 @@ pub struct PackedTrade {
pub amount: f64,
}

#[derive(Deserialize, Serialize, Debug, Clone)]

pub struct CsvTrade {
pub time: u64,
pub exch: Exchange,
pub ticker: Ticker,
pub price: f64,
pub amount: f64,
#[serde(deserialize_with = "deserialize_csv_side")]
pub side: Option<Side>,
#[serde(deserialize_with = "deserialize_csv_server_time")]
pub server_time: Option<u64>,
}

#[derive(Debug, Clone)]
pub struct ParseError(Box<String>);

@@ -129,99 +82,311 @@ pub struct ParseError(Box<String>);
#[repr(align(32))]
pub struct PackedTradeData<'a>(&'a [u8]);

impl<'a> PackedTradeData<'a> {
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Serde32BytesTrade {
pub time: u64,
#[serde(with = "try_from_u8")]
pub exch: Exchange,
#[serde(with = "try_from_u8")]
pub ticker: Ticker,
pub price: f64,
pub amount: f64,
pub side: Option<Side>,
pub server_time: Option<NonZeroI32>,
}

pub fn server_time_to_delta(time: u64, server_time: u64) -> i32 {
let ms = (
(server_time / SERVER_TIME_DOWNSCALE_FACTOR) as i64
- (time / SERVER_TIME_DOWNSCALE_FACTOR) as i64
) as i32;

match ms {
// if the two values are either identical, or so close that the difference
// is washed out when we downscale, return i32::MIN as a sentinel indicating
// time == server_time
//
0 => std::i32::MIN,

other => other
}
}

/// Convert a `server_time` delta back to its unix nanosecond timestamp form.
///
/// Note: while the `server_time` delta is stored as a signed integer, to be able to express a
/// delta in both directions relative to `time`, we can't just add a negative `i64` to a
/// `u64`, it doesn't work like that. this match either subtracts the absolute value of a
/// negative delta, or adds a positive delta, to get around this conundrum.
pub fn delta_to_server_time(time: u64, delta: i32) -> Option<u64> {
const MIN_VALID: i32 = std::i32::MIN + 1;

const EXCH_OFFSET : usize = 0;
const BASE_OFFSET : usize = 1;
const QUOTE_OFFSET : usize = 2;
const SIDE_OFFSET : usize = 3;
const SERVER_TIME_OFFSET : usize = 4;
const TIME_OFFSET : usize = 8;
const PRICE_OFFSET : usize = 16;
const AMOUNT_OFFSET : usize = 24;
match delta {
0 => None,

// -1 is another sentinel indicating that time == server_time
std::i32::MIN => Some(time),

x @ MIN_VALID .. 0 => Some(time - (x.abs() as u64 * SERVER_TIME_DOWNSCALE_FACTOR)),

x @ 1 ..= std::i32::MAX => Some(time + (x as u64 * SERVER_TIME_DOWNSCALE_FACTOR)),
}
}


pub fn serialize<'a, 'b>(buf: &'a mut [u8], trade: &'b CsvTrade) {
assert_eq!(buf.len(), SERIALIZED_SIZE);

buf[EXCH_OFFSET] = u8::from(trade.exch);
buf[BASE_OFFSET] = u8::from(trade.ticker.base);
buf[QUOTE_OFFSET] = u8::from(trade.ticker.quote);

match trade.side {
Some(side) => {
buf[SIDE_OFFSET] = u8::from(side);
}

None => {
buf[SIDE_OFFSET] = 0;
}
}

match trade.server_time {
Some(st) => {
let delta: i32 = server_time_to_delta(trade.time, st);
(&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&delta.to_le_bytes()[..]);
}

None => {
(&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&0i32.to_le_bytes()[..]);
}
}
(&mut buf[TIME_OFFSET..(TIME_OFFSET + 8)]).copy_from_slice(&trade.time.to_le_bytes()[..]);
(&mut buf[PRICE_OFFSET..(PRICE_OFFSET + 8)]).copy_from_slice(&trade.price.to_le_bytes()[..]);
(&mut buf[AMOUNT_OFFSET..(AMOUNT_OFFSET + 8)]).copy_from_slice(&trade.amount.to_le_bytes()[..]);
}


impl<'a> PackedTradeData<'a> {
pub fn new(buf: &'a [u8]) -> Self {
assert_eq!(buf.len(), SERIALIZED_SIZE);
Self(buf)
}

#[inline]
pub fn exch(&self) -> Result<Exchange, markets::crypto::Error> {
Exchange::try_from(self.0[Self::EXCH_OFFSET])
Exchange::try_from(self.0[EXCH_OFFSET])
}

#[inline]
pub fn base(&self) -> Result<Currency, markets::crypto::Error> {
Currency::try_from(self.0[Self::BASE_OFFSET])
Currency::try_from(self.0[BASE_OFFSET])
}
#[inline]
pub fn ticker(&self) -> Ticker {
Ticker {
base: self.base().unwrap(),
quote: self.quote().unwrap(),
}
}

#[inline]
pub fn quote(&self) -> Result<Currency, markets::crypto::Error> {
Currency::try_from(self.0[Self::QUOTE_OFFSET])
Currency::try_from(self.0[QUOTE_OFFSET])
}

#[inline]
pub fn side(&self) -> Result<Option<Side>, markets::crypto::Error> {
match self.0[Self::SIDE_OFFSET] {
match self.0[SIDE_OFFSET] {
0 => Ok(None),
other => Ok(Some(Side::try_from(other)?)),
}
}

#[inline]
pub fn time(&self) -> Result<u64, ParseError> {
atoi::atoi(&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)])
.ok_or_else(|| {
ParseError(Box::new(format!("failed to parse integer: '{}'",
std::str::from_utf8(&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)]).unwrap_or("uft8 error")
)))
})
pub fn time(&self) -> u64 {
u64::from_le_bytes(
(&self.0[TIME_OFFSET..(TIME_OFFSET + 8)]).try_into().unwrap()
)
}

#[inline]
pub fn price(&self) -> Result<f64, lexical::Error> {
lexical::parse(&self.0[Self::PRICE_OFFSET..(Self::PRICE_OFFSET + 8)])
pub fn price(&self) -> f64 {
f64::from_le_bytes(
(&self.0[PRICE_OFFSET..(PRICE_OFFSET + 8)]).try_into().unwrap()
)
}

#[inline]
pub fn amount(&self) -> Result<f64, lexical::Error> {
lexical::parse(&self.0[Self::AMOUNT_OFFSET..(Self::AMOUNT_OFFSET + 8)])
pub fn amount(&self) -> f64 {
f64::from_le_bytes(
(&self.0[AMOUNT_OFFSET..(AMOUNT_OFFSET + 8)]).try_into().unwrap()
)
}

/// `server_time` is stored in milliseconds, while `time` is nanoseconds.
/// this is what you need to multiply the stored `server_time` data by to
/// get it back to nanoseconds.
const SERVER_TIME_DOWNSCALE_FACTOR: u64 = 1_000_000;

#[inline]
pub fn server_time(&self) -> Result<Option<u64>, ParseError> {
let st: i32 =
atoi::atoi(&self.0[Self::SERVER_TIME_OFFSET..(Self::SERVER_TIME_OFFSET + 4)])
.ok_or_else(|| {
ParseError(Box::new(format!("failed to parse integer: '{}'",
std::str::from_utf8(&self.0[Self::SERVER_TIME_OFFSET..(Self::SERVER_TIME_OFFSET + 4)]).unwrap_or("uft8 error")
)))
})?;

// while the `server_time` delta is stored as a signed integer, to be able to express a
// delta in both directions relative to `time`, we can't just add a negative `i64` to a
// `u64`, it doesn't work like that. this match either subtracts the absolute value of a
// negative delta, or adds a positive delta, to get around this conundrum.
//
// `SERVER_TIME_DOWNSCALE_FACTOR` is used to rescale the delta to nanoseconds prior to its
// being applied to `time`.
pub fn server_time(&self) -> Option<u64> {
let delta = i32::from_le_bytes(
(&self.0[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).try_into().unwrap()
);

match st {
0 => Ok(None),
delta_to_server_time(self.time(), delta)
}
}

x @ std::i32::MIN .. 0 => Ok(Some(self.time()? - (x.abs() as u64 * Self::SERVER_TIME_DOWNSCALE_FACTOR))),
pub fn deserialize_csv_side<'de, D>(deserializer: D) -> Result<Option<Side>, D::Error>
where D: Deserializer<'de>
{
let s: &str = Deserialize::deserialize(deserializer)?;
match s {
"bid" => Ok(Some(Side::Bid)),
"ask" => Ok(Some(Side::Ask)),
_ => Ok(None)
}
}

x @ 1 ..= std::i32::MAX => Ok(Some(self.time()? + (x as u64 * Self::SERVER_TIME_DOWNSCALE_FACTOR))),
}
pub fn deserialize_csv_server_time<'de, D>(deserializer: D) -> Result<Option<u64>, D::Error>
where D: Deserializer<'de>
{
let st: u64 = Deserialize::deserialize(deserializer)?;
match st {
0 => Ok(None),
other => Ok(Some(other))
}
}

mod try_from_u8 {
use std::convert::TryFrom;
use std::fmt;
use std::marker::PhantomData;
use serde::{Serializer, Deserializer};
use serde::de::Visitor;
use serde::ser::Error as SerError;

struct V<T>(PhantomData<T>);

impl<'de, T> Visitor<'de> for V<T>
where T: TryFrom<u8>
{
type Value = T;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("an integer code between 1-255")
}

fn visit_u8<E>(self, v: u8) -> Result<Self::Value, E>
where E: serde::de::Error,
{
match T::try_from(v) {
Ok(v) => Ok(v),
Err(_) => {
Err(serde::de::Error::custom("Invalid code"))
}
}
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where E: serde::de::Error,
{
if v > 255 {
return Err(serde::de::Error::custom("Value greater than 255"))
}
match T::try_from(v as u8) {
Ok(v) => Ok(v),
Err(_) => {
Err(serde::de::Error::custom("Invalid code"))
}
}
}
}


pub fn deserialize<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where D: Deserializer<'de>,
T: TryFrom<u8>
{
deserializer.deserialize_u8(V(PhantomData))
}

pub fn serialize<S, T>(item: &T, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer,
T: Copy,
u8: From<T>
{
match u8::from(*item) {
0 => Err(S::Error::custom("not implemented: no code for variant or value")),
x => serializer.serialize_u8(x)
}
}
}

#[allow(unused)]
#[cfg(test)]
mod tests {
use super::*;
use std::io::{self, prelude::*};
use markets::{e, t, c};
use approx::assert_relative_eq;

const CSV: &str =
"time,amount,exch,price,server_time,side,ticker\n\
1561939200002479372,1.4894,bnce,292.7,1561939199919000064,,eth_usd\n\
1561939200011035644,0.0833333283662796,btfx,10809.0,1561939199927000064,bid,btc_usd\n\
1561939200011055712,0.8333191871643066,btfx,10809.0,1561939199927000064,bid,btc_usd\n\
1561939200019037617,0.083096,bnce,10854.1,1561939199935000064,,btc_usd\n\
1561939200026450471,0.125,okex,123.21,1561939200026450432,ask,ltc_usd\n\
1561939200027716312,0.704054,okex,123.21,1561939200027716352,ask,ltc_usd\n\
1561939200028633907,0.11,okex,123.22,1561939200028633856,bid,ltc_usd\n\
1561939200029908535,1.438978,okex,123.22,1561939200029908480,ask,ltc_usd\n\
1561939200030393495,0.257589,okex,123.22,1561939200030393600,bid,ltc_usd"
;

#[test]
fn parse_csv_sample_with_csv_trade() {
let csv: Vec<u8> = CSV.as_bytes().to_vec();
let mut rdr = csv::Reader::from_reader(io::Cursor::new(csv));
let mut rows = Vec::new();
let headers = rdr.byte_headers().unwrap().clone();
let mut row = csv::ByteRecord::new();
while rdr.read_byte_record(&mut row).unwrap() {
let trade: CsvTrade = row.deserialize(Some(&headers)).unwrap();
rows.push(trade);
}

assert_eq!(rows[0].time, 1561939200002479372);
assert_eq!(rows[1].exch, e!(btfx));

let mut buf = vec![0u8; 32];

for (i, trade) in rows.iter().enumerate() {
assert!(trade.server_time.is_some());
let st = trade.server_time.unwrap();
let delta = server_time_to_delta(trade.time, st);
dbg!(i, trade, trade.time, st,
trade.time as i64 - st as i64, delta,
(trade.time / SERVER_TIME_DOWNSCALE_FACTOR) as i64 - (st / SERVER_TIME_DOWNSCALE_FACTOR) as i64,
);
assert!(delta != 0);
let rt: u64 = delta_to_server_time(trade.time, delta).unwrap();
let abs_diff = (rt as i64 - st as i64).abs();
let max_allowable_diff = SERVER_TIME_DOWNSCALE_FACTOR; // * 2;
dbg!(rt, abs_diff, max_allowable_diff);
assert!(abs_diff < max_allowable_diff as i64);

serialize(&mut buf[..], &trade);
{
let packed = PackedTradeData(&buf[..]);
assert_eq!(packed.time(), trade.time);
assert_eq!(packed.exch().unwrap(), trade.exch);
assert_eq!(packed.base().unwrap(), trade.ticker.base);
assert_eq!(packed.quote().unwrap(), trade.ticker.quote);
assert_eq!(packed.side().unwrap(), trade.side);
assert_relative_eq!(packed.price(), trade.price);
assert_relative_eq!(packed.amount(), trade.amount);
}
}
}

#[test]
fn verify_packed_trade_is_32_bytes() {


+ 125
- 5
src/munge.rs View File

@@ -15,6 +15,7 @@ use serde::{Serialize, Deserialize};
use slog::Drain;
use chrono::{DateTime, Utc, NaiveDateTime};
use markets::crypto::{Exchange, Ticker, Side, Currency};
use pipelines::encoding;

macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
eprintln!($fmt, $($args)*);
@@ -73,7 +74,6 @@ enum Opt {

ListCodes,

/*
Binarize {
/// Path to CSV file with trades data
#[structopt(short = "f", long = "trades-csv")]
@@ -85,10 +85,14 @@ enum Opt {
#[structopt(parse(from_os_str))]
output_path: PathBuf,

}
*/

},

CountRows {
/// Path to file with binary trades data
#[structopt(short = "f", long = "input-file")]
#[structopt(parse(from_os_str))]
input_path: PathBuf,
},
}

#[derive(Deserialize)]
@@ -149,6 +153,122 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
let mut n = 0;

match opt {
Opt::CountRows { input_path } => {
let logger = logger.new(o!("cmd" => "count-rows"));
info!(logger, "beginning to count";
"input_path" => %input_path.display(),
);

if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) }

let input_file =
fs::OpenOptions::new()
.read(true)
.open(input_path)
.map_err(|e| e.to_string())?;

let file_length = input_file.metadata().unwrap().len();

if file_length % encoding::SERIALIZED_SIZE as u64 != 0 {
return Err(format!("file length is not a multiple of record size: {}", file_length))
}

let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE;

info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep());

let data: memmap::Mmap = unsafe {
memmap::Mmap::map(&input_file)
.map_err(|e| {
format!("creating Mmap failed: {}", e)
})?
};

let mut n_gdax = 0;
let mut n_bmex = 0;

for i in 0..n_records {
let j = i * encoding::SERIALIZED_SIZE;
let k = j + encoding::SERIALIZED_SIZE;
let packed = encoding::PackedTradeData::new(&data[j..k]);
n_gdax += (packed.exch().unwrap() == e!(gdax)) as usize;
n_bmex += (packed.exch().unwrap() == e!(bmex)) as usize;
n += 1;
}

info!(logger, "finished reading flle";
"n gdax" => n_gdax.thousands_sep(),
"n bmex" => n_bmex.thousands_sep(),
);
}

Opt::Binarize { trades_csv, output_path } => {
let logger = logger.new(o!("cmd" => "binarize"));
info!(logger, "beginning binarize";
"trades_csv" => %trades_csv.display(),
"output_path" => %output_path.display(),
);

if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }

info!(logger, "opening trades_csv file");

let rdr = fs::File::open(&trades_csv)
.map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;

let rdr = io::BufReader::new(rdr);

let mut rdr = csv::Reader::from_reader(rdr);

info!(logger, "opening output file for writing");

let wtr = fs::File::create(&output_path)
.map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;

let mut wtr = io::BufWriter::new(wtr);

let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
let mut row = csv::ByteRecord::new();

let mut buf = vec![0u8; encoding::SERIALIZED_SIZE];

let mut n = 0;
let mut n_written = 0;
let mut n_bytes_written = 0;
let mut n_bytes_read = headers.as_slice().len() + headers.len() + 1;

while rdr.read_byte_record(&mut row)
.map_err(|e| {
format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
})?
{
let trade: encoding::CsvTrade = row.deserialize(Some(&headers)).map_err(|e| e.to_string())?;
n += 1;
n_bytes_read += row.as_slice().len() + row.len() + 1;

encoding::serialize(&mut buf[..], &trade);

let bytes_written = wtr.write(&buf[..]).map_err(|e| e.to_string())?;
assert_eq!(bytes_written, encoding::SERIALIZED_SIZE);
n_written += 1;
n_bytes_written += bytes_written;

if n % PROGRESS_EVERY == 0 {
info!(logger, "binarizing csv";
"elapsed" => ?(Instant::now() - start),
"n" => %n.thousands_sep(),
"n_written" => %n_written.thousands_sep(),
"mb read" => (n_bytes_read as f64 / 1024.0 / 1024.0),
"mb written" => (n_bytes_written as f64 / 1024.0 / 1024.0),
);
}
}
info!(logger, "finished reading/converting csv");

assert_eq!(n_bytes_written % encoding::SERIALIZED_SIZE, 0);

}

Opt::PrepPostgres { trades_csv, output_path } => {
let logger = logger.new(o!("cmd" => "prep-postgres"));

@@ -326,7 +446,7 @@ fn main() {
let took = Instant::now() - start;
info!(logger, "finished in {:?}", took;
"n rows" => %n.thousands_sep(),
"rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
"rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
);
}



Loading…
Cancel
Save