From 6c53b9285ee1f62cb31f6b093cf04bc726f2c8d2 Mon Sep 17 00:00:00 2001 From: Jonathan Strong Date: Wed, 1 Apr 2020 23:35:47 -0400 Subject: [PATCH] work through finished draft of preface --- .cargo/config | 3 + .gitignore | 3 + Cargo.toml | 31 +++- pandas-naive.py | 13 ++ pandas-read-csv.py | 12 ++ src/csv.rs | 196 ++++++++++++++++++++-- src/lib.rs | 52 ++++++ src/munge.rs | 144 +++++++++++++++++ src/time_explorer.rs | 376 +++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 818 insertions(+), 12 deletions(-) create mode 100644 .cargo/config create mode 100644 pandas-naive.py create mode 100644 pandas-read-csv.py create mode 100644 src/lib.rs create mode 100644 src/munge.rs create mode 100644 src/time_explorer.rs diff --git a/.cargo/config b/.cargo/config new file mode 100644 index 0000000..35ab781 --- /dev/null +++ b/.cargo/config @@ -0,0 +1,3 @@ +[target.x86_64-unknown-linux-gnu] +rustflags = ["-C", "target-cpu=native"] + diff --git a/.gitignore b/.gitignore index 992c4be..ba1e735 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ /target *.swp Cargo.lock +var/ +csv-bytes +csv-bytes-manual diff --git a/Cargo.toml b/Cargo.toml index bd36530..e079f48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,14 +5,43 @@ authors = ["Jonathan Strong "] edition = "2018" [[bin]] -name = "baseline-csv" +name = "csv" path = "src/csv.rs" +[[bin]] +name = "munge" +path = "src/munge.rs" + +[[bin]] +path = "src/time_explorer.rs" +name = "time-explorer" + +[lib] +path = "src/lib.rs" +name = "pipelines" + [dependencies] csv = "1.1" structopt = "0.3" serde = { version = "1", features = ["derive"] } +serde_json = "1" markets = { version = "0.2.1", registry = "jstrong-dev" } slog = "2" slog-async = "2" slog-term = "2" +pretty_toa = "1" +atoi = "0.3" +lexical = "5.2" +chrono = { version = "0.4", features = ["serde"] } +clap = "2" +itertools-num = "0.1" + +[profile.release] +lto = "fat" +panic = "abort" +incremental = false +codegen-units = 1 + +[features] +default = [] +super-fast-csv-parsing = [] diff --git a/pandas-naive.py b/pandas-naive.py new file mode 100644 index 0000000..ebf8cd3 --- /dev/null +++ b/pandas-naive.py @@ -0,0 +1,13 @@ +import sys +import pandas as pd + +def main(csv_path): + df = pd.read_csv(csv_path) + print(df.info()) + +if __name__ == '__main__': + if len(sys.argv) < 2: + print("\n\nUSAGE:\n python pandas-naive.py \n", file=sys.stderr) + sys.exit(1) + main(sys.argv[1]) + diff --git a/pandas-read-csv.py b/pandas-read-csv.py new file mode 100644 index 0000000..e5f2ed8 --- /dev/null +++ b/pandas-read-csv.py @@ -0,0 +1,12 @@ +import time +import sys +import pandas as pd + +start = time.time() +path = sys.argv[1] +df = pd.read_csv(path, low_memory=False) +print('parsed csv file with {:,} rows in {:.1f}sec using pd.read_csv (pandas version = {})'.format(len(df), time.time()-start, pd.__version__)) +print() +print(df.info()) +print() +print(df.head()) diff --git a/src/csv.rs b/src/csv.rs index 9da7269..de7db21 100644 --- a/src/csv.rs +++ b/src/csv.rs @@ -2,6 +2,8 @@ #[macro_use] extern crate slog; +#[macro_use] +extern crate markets; use std::path::PathBuf; use std::time::*; @@ -10,6 +12,7 @@ use std::fs; use structopt::StructOpt; use serde::{Serialize, Deserialize}; use slog::Drain; +use pretty_toa::ThousandsSep; use markets::crypto::{Exchange, Ticker, Side}; @@ -19,6 +22,8 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ std::process::exit(1); }}} +const PROGRESS_EVERY: usize = 1024 * 1024; + #[derive(Debug, StructOpt)] struct Opt { @@ -26,6 +31,11 @@ struct Opt { #[structopt(short = "f", long = "trades-csv")] #[structopt(parse(from_os_str))] trades_csv: PathBuf, + + /// Where to save the query results (CSV output) + #[structopt(short = "o", long = "output-path")] + #[structopt(parse(from_os_str))] + output_path: PathBuf, } #[derive(Deserialize)] @@ -34,31 +44,195 @@ struct Trade { pub time: u64, pub exch: Exchange, pub ticker: Ticker, - pub side: Option, + //pub side: Option, pub price: f64, pub amount: f64, } -fn main() { - let start = Instant::now(); +/* +struct HourSummary { + pub n_trades: usize, + pub +*/ - let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); - let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); - let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); - let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!())); +fn per_sec(n: usize, span: Duration) -> f64 { + if n == 0 || span < Duration::from_micros(1) { return 0.0 } + let s: f64 = span.as_nanos() as f64 / 1e9f64; + n as f64 / s +} + +#[inline(always)] +fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result { + let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?) + .ok_or("parsing time failed")?; + + let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?) + .map_err(|_| "parsing amount failed")?; - info!(logger, "initializing..."); + let exch = match row.get(2).ok_or("no exch")? { + b"bmex" => e!(bmex), + b"bnce" => e!(bnce), + b"btfx" => e!(btfx), + b"gdax" => e!(gdax), + b"okex" => e!(okex), + b"bits" => e!(bits), + b"plnx" => e!(plnx), + b"krkn" => e!(krkn), + _ => return Err("illegal exch"), + }; + let price: f64 = lexical::parse(row.get(3).ok_or("no price")?) + .map_err(|_| "parsing price failed")?; + let ticker = match row.get(6).ok_or("no ticker")? { + b"btc_usd" => t!(btc-usd), + b"eth_usd" => t!(eth-usd), + b"ltc_usd" => t!(ltc-usd), + b"etc_usd" => t!(etc-usd), + b"bch_usd" => t!(bch-usd), + b"xmr_usd" => t!(xmr-usd), + b"usdt_usd" => t!(usdt-usd), + _ => return Err("illegal ticker"), + }; + + Ok(Trade { time, amount, exch, price, ticker }) +} + +#[inline(always)] +fn manual_deserialize_str(row: &csv::StringRecord) -> Result { + let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes()) + .ok_or("parsing time failed")?; + + let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?) + .map_err(|_| "parsing amount failed")?; + + let exch = match row.get(2).ok_or("no exch")? { + "bmex" => e!(bmex), + "bnce" => e!(bnce), + "btfx" => e!(btfx), + "gdax" => e!(gdax), + "okex" => e!(okex), + "bits" => e!(bits), + "plnx" => e!(plnx), + "krkn" => e!(krkn), + _ => return Err("illegal exch"), + }; + + let price: f64 = lexical::parse(row.get(3).ok_or("no price")?) + .map_err(|_| "parsing price failed")?; + + let ticker = match row.get(6).ok_or("no ticker")? { + "btc_usd" => t!(btc-usd), + "eth_usd" => t!(eth-usd), + "ltc_usd" => t!(ltc-usd), + "etc_usd" => t!(etc-usd), + "bch_usd" => t!(bch-usd), + "xmr_usd" => t!(xmr-usd), + "usdt_usd" => t!(usdt-usd), + _ => return Err("illegal ticker"), + }; + + Ok(Trade { time, amount, exch, price, ticker }) +} + +fn run(start: Instant, logger: &slog::Logger) -> Result { let opt = Opt::from_args(); + info!(logger, "initializing..."; + "trades-csv" => %opt.trades_csv.display(), + "output-path" => %opt.output_path.display() + ); + if ! opt.trades_csv.exists() { error!(logger, "path does not exist: {}", opt.trades_csv.display()); fatal!("Error: path does not exist: {}", opt.trades_csv.display()); } - info!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display()); + debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display()); + + let rdr = fs::File::open(&opt.trades_csv) + .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?; - let took = Instant::now() - start; - info!(logger, "finished in {:?}", took); + let rdr = io::BufReader::new(rdr); + + let mut rdr = csv::Reader::from_reader(rdr); + + // our data is ascii, so parsing with the slightly faster ByteRecord is fine + //let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + //let mut row = csv::ByteRecord::new(); + //assert_eq!(headers.get(0), Some(&b"time"[..])); + //assert_eq!(headers.get(1), Some(&b"amount"[..])); + //assert_eq!(headers.get(2), Some(&b"exch"[..])); + //assert_eq!(headers.get(3), Some(&b"price"[..])); + //assert_eq!(headers.get(6), Some(&b"ticker"[..])); + + //let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + let mut row = csv::StringRecord::new(); + + let mut n = 0; + let mut last_time = 0; + + //while rdr.read_byte_record(&mut row) + + while rdr.read_record(&mut row) + .map_err(|e| { + format!("reading row {} failed: {}", (n+1).thousands_sep(), e) + })? + { + //let trade: Trade = row.deserialize(Some(&headers)) + //let trade: Trade = manual_deserialize_bytes(&row) + let trade: Trade = manual_deserialize_str(&row) + .map_err(|e| { + format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row) + })?; + + n += 1; + + // verify data is sorted by time + debug_assert!(trade.time >= last_time); + last_time = trade.time; + + if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) { + info!(logger, "parsing csv file"; + "n rows" => %n.thousands_sep(), + "elapsed" => ?(Instant::now() - start), + ); + } + + if cfg!(debug_assertions) && n > PROGRESS_EVERY { + warn!(logger, "debug mode: exiting early"; + "n rows" => %n.thousands_sep(), + "elapsed" => ?(Instant::now() - start), + ); + break + } + } + + Ok(n) +} + +fn main() { + let start = Instant::now(); + + let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); + let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); + let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); + let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!())); + + match run(start, &logger) { + Ok(n) => { + let took = Instant::now() - start; + info!(logger, "finished in {:?}", took; + "n rows" => %n.thousands_sep(), + "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(), + ); + } + + Err(e) => { + crit!(logger, "run failed: {:?}", e); + eprintln!("\n\nError: {}", e); + std::thread::sleep(Duration::from_millis(100)); + std::process::exit(1); + } + } } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..977b105 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,52 @@ + +#[allow(unused)] +#[cfg(test)] +mod tests { + use serde::Deserialize; + + #[derive(Debug, Deserialize)] + struct Trade { + pub time: i64, + pub price: f64, + pub amount: f64, + } + + #[test] + fn serde_deserialize_json_example() { + assert!(matches!( + serde_json::from_str::(r#"{"time":1527811201900505632,"price":7492.279785,"amount":0.048495,"exch":"bits","server_time":0,"side":null}"#), + Ok(Trade { time: 1527811201900505632, price: 7492.279785, amount: 0.048495 }) + )); + + } + + #[test] + fn serde_deserialize_csv_example() { + let csv = "time,amount,exch,price,server_time,side\n\ + 1527811201900505632,0.048495,bits,7492.279785,0,"; + + let mut csv_reader = csv::Reader::from_reader(csv.as_bytes()); + + let headers = csv_reader + .headers() + .expect("parsing row headers failed") + .clone(); + + let mut row = csv::StringRecord::new(); + + assert!(matches!( + csv_reader.read_record(&mut row), + Ok(true) + )); + + assert!(matches!( + row.deserialize(Some(&headers)), + Ok(Trade { time: 1527811201900505632, price: 7492.279785, amount: 0.048495 }) + )); + + assert!(matches!( + csv_reader.read_record(&mut row), + Ok(false) + )); + } +} diff --git a/src/munge.rs b/src/munge.rs new file mode 100644 index 0000000..77afccf --- /dev/null +++ b/src/munge.rs @@ -0,0 +1,144 @@ +#![allow(unused)] + +#[macro_use] +extern crate slog; +#[macro_use] +extern crate markets; + +use std::io::{self, prelude::*}; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::*; +use pretty_toa::ThousandsSep; +use structopt::StructOpt; +use serde::{Serialize, Deserialize}; +use slog::Drain; +use chrono::{DateTime, Utc}; +use markets::crypto::{Exchange, Ticker, Side}; + +macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ + eprintln!($fmt, $($args)*); + std::process::exit(1); +}}} + + + +#[structopt(rename_all="kebab-case")] +#[derive(Debug, StructOpt)] +enum Opt { + + /// Filter trades-csv by start,end range and save subset to output-path + Range { + + /// Path to CSV file with trades data + #[structopt(short = "f", long = "trades-csv")] + #[structopt(parse(from_os_str))] + trades_csv: PathBuf, + + /// Where to save the query results (CSV output) + #[structopt(short = "o", long = "output-path")] + #[structopt(parse(from_os_str))] + output_path: PathBuf, + + /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") + start: DateTime, + + /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") + end: DateTime, + + }, +} + +#[derive(Deserialize)] +struct Trade { + /// Unix nanoseconds + pub time: u64, + pub exch: Exchange, + pub ticker: Ticker, + //pub side: Option, + pub price: f64, + pub amount: f64, +} + +fn per_sec(n: usize, span: Duration) -> f64 { + if n == 0 || span < Duration::from_micros(1) { return 0.0 } + let s: f64 = span.as_nanos() as f64 / 1e9f64; + n as f64 / s +} + +fn nanos(utc: DateTime) -> u64 { + (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64) +} + +fn run(start: Instant, logger: &slog::Logger) -> Result { + let opt = Opt::from_args(); + + let mut n = 0; + + match opt { + Opt::Range { trades_csv, output_path, start, end } => { + let logger = logger.new(o!("cmd" => "range")); + + info!(logger, "beginning range cmd"; + "trades_csv" => %trades_csv.display(), + "output_path" => %output_path.display(), + "start" => %start, + "end" => %end, + ); + + if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) } + + info!(logger, "opening trades_csv file"); + + let rdr = fs::File::open(&trades_csv) + .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?; + + let rdr = io::BufReader::new(rdr); + + let mut rdr = csv::Reader::from_reader(rdr); + + let wtr = fs::File::create(&output_path) + .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; + + let wtr = io::BufWriter::new(wtr); + + let mut wtr = csv::Writer::from_writer(wtr); + + let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + let mut row = csv::ByteRecord::new(); + + let start_nanos = nanos(start); + let end_nanos = nanos(end); + + } + } + + Ok(n) +} + +fn main() { + let start = Instant::now(); + + let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); + let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); + let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); + let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!())); + + match run(start, &logger) { + Ok(n) => { + let took = Instant::now() - start; + info!(logger, "finished in {:?}", took; + "n rows" => %n.thousands_sep(), + "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(), + ); + } + + Err(e) => { + crit!(logger, "run failed: {:?}", e); + eprintln!("\n\nError: {}", e); + std::thread::sleep(Duration::from_millis(100)); + std::process::exit(1); + } + } + +} diff --git a/src/time_explorer.rs b/src/time_explorer.rs new file mode 100644 index 0000000..424207d --- /dev/null +++ b/src/time_explorer.rs @@ -0,0 +1,376 @@ +#![allow(unused_imports)] +#![allow(unused_labels)] + +use std::str::FromStr; +use std::time::{Instant, Duration}; +use std::{fs, io}; +use std::io::prelude::*; +use std::str::from_utf8; +use std::error::Error; +use std::f64::NAN; +use serde::{Serialize, Deserialize}; +use itertools_num::linspace; +use std::collections::HashMap as Map; + +const N: usize = 128; +const LOGSPACE: [i64; 128] = + [-2134300000000, -1854700000000, -1611800000000, -1400600000000, + -1217200000000, -1057700000000, -919200000000, -798800000000, + -694100000000, -603200000000, -524200000000, -455500000000, + -395800000000, -344000000000, -298900000000, -259700000000, + -225700000000, -196100000000, -170400000000, -148100000000, + -128700000000, -111800000000, -97200000000, -84400000000, + -73400000000, -63800000000, -55400000000, -48100000000, + -41800000000, -36300000000, -31600000000, -27400000000, + -23800000000, -20700000000, -18000000000, -15600000000, + -13600000000, -11800000000, -10200000000, -8900000000, + -7700000000, -6700000000, -5800000000, -5000000000, + -4400000000, -3800000000, -3300000000, -2900000000, + -2500000000, -2100000000, -1900000000, -1600000000, + -1400000000, -1200000000, -1000000000, -900000000, + -800000000, -700000000, -600000000, -500000000, + -400000000, -300000000, -200000000, -100000000, + 100000000, 200000000, 300000000, 400000000, + 500000000, 600000000, 700000000, 800000000, + 900000000, 1000000000, 1200000000, 1400000000, + 1600000000, 1900000000, 2100000000, 2500000000, + 2900000000, 3300000000, 3800000000, 4400000000, + 5000000000, 5800000000, 6700000000, 7700000000, + 8900000000, 10200000000, 11800000000, 13600000000, + 15600000000, 18000000000, 20700000000, 23800000000, + 27400000000, 31600000000, 36300000000, 41800000000, + 48100000000, 55400000000, 63800000000, 73400000000, + 84400000000, 97200000000, 111800000000, 128700000000, + 148100000000, 170400000000, 196100000000, 225700000000, + 259700000000, 298900000000, 344000000000, 395800000000, + 455500000000, 524200000000, 603200000000, 694100000000, + 798800000000, 919200000000, 1057700000000, 1217200000000, + 1400600000000, 1611800000000, 1854700000000, 2134300000000]; + + +#[derive(Deserialize)] +struct Trade { + pub time: i64, + pub price: f64, + pub amount: f64, +} + +/// Use this to deserialize just the time column on the first pass through +/// the events file. +#[derive(Deserialize)] +struct EventTime { + pub time: i64, +} + +struct Event { + pub time: i64, + pub data: Vec, +} + +pub fn seconds(d: Duration) -> f64 { + d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64) +} + +fn main() -> Result<(), Box> { + let start = Instant::now(); + let args: clap::ArgMatches = clap::App::new("time-explorer") + .version("0.1") + .arg(clap::Arg::with_name("trades") + .long("trades-csv") + .short("t") + .help("Path of csv with time (integer nanoseconds timestamp), \ + price (f64), and amount (f64) columns.") + .takes_value(true) + .required(true)) + .arg(clap::Arg::with_name("events") + .long("events-csv") + .short("e") + .help("Path of csv file with a time (integer nanoseconds timestamp) as column 0, \ + along with any other metadata columns that will be included in results") + .takes_value(true) + .required(true)) + .arg(clap::Arg::with_name("output") + .long("output-file") + .short("o") + .help("Path to save results csv to") + .takes_value(true) + .required(true)) + .arg(clap::Arg::with_name("verbose") + .long("verbose") + .short("v")) + .arg(clap::Arg::with_name("n-periods") + .long("n-periods") + .short("n") + .help("Controls how many time buckets are evaluated") + .takes_value(true) + .default_value("50")) + .get_matches(); + + let verbose = args.is_present("verbose"); + + if verbose { println!("{:>8.2}s reading...", seconds(Instant::now() - start)); } + + let trades_csv = args.value_of("trades").unwrap(); + let events_csv = args.value_of("events").unwrap(); + let output = args.value_of("output").unwrap(); + let n: &str = args.value_of("n-periods").unwrap(); + let n: usize = usize::from_str(n)?; + + let trades_csv = + fs::OpenOptions::new() + .read(true) + .open(trades_csv)?; + + let mut times: Vec = Vec::with_capacity(8192); + let mut amounts: Vec = Vec::with_capacity(8192); + let mut totals: Vec = Vec::with_capacity(8192); + + #[cfg(feature = "super-fast-csv-parsing")] + { + // lookout below! MANY unwraps in here + + // note: this code NOT part of original time-explorer. this code is what + // I was referring to in the "fine print" note where it says "With 10 + // minutes work (knowing what I know today), I was able to get CSV parsing + // down to 3.46sec" + + let mut rdr = csv::Reader::from_reader(io::BufReader::new(rdr)); + let headers = rdr.byte_headers().unwrap().clone(); + let mut row = csv::ByteRecord::new(); + let mut col_index: [usize; 3] = [ + headers.iter().position(|x| x == b"time").unwrap(), + headers.iter().position(|x| x == b"amount").unwrap(), + headers.iter().position(|x| x == b"price").unwrap(), + ]; + + while rdr.read_byte_record(&mut row).unwrap() { + times.push(atoi::atoi(row.get(col_index[0]).unwrap()).unwrap()); + + let amount: f64 = lexical::parse(row.get(col_index[1]).unwrap()).unwrap(); + let price: f64 = lexical::parse(row.get(col_index[2]).unwrap()).unwrap(); + + totals.push(price * amount); + amounts.push(amount); + } + } + + #[cfg(not(feature = "super-fast-csv-parsing"))] + { + // this is what was originally in time-explorer + + let mut trades: Vec = + csv::Reader::from_reader(trades_csv) + .deserialize() + .map(|x| x.unwrap()) + .collect(); + + trades.sort_by_key(|k| k.time); + + for Trade { time, price, amount } in trades { + times.push(time); + totals.push(price * amount); + amounts.push(amount); + } + } + + if verbose { println!("{:>8.2}s finished parsing trades csv (times.len() = {}) ...", seconds(Instant::now() - start), times.len()); } + + let mut events: Vec = { + let events_csv = + fs::OpenOptions::new() + .read(true) + .open(events_csv)?; + + csv::Reader::from_reader(events_csv) + .deserialize() + .map(|t: Result| { + let EventTime { time } = t.unwrap(); + //let data = [0.0; N - 1]; + let data = vec![0.0; n - 1]; + Event { time, data } + }).collect() + }; + + assert!(!events.is_empty()); + + events.sort_by_key(|k| k.time); + + let mut cursor: usize = 0; + let mut truncate_events = None; + + let buckets: Vec = + linspace(LOGSPACE[0] as f64, LOGSPACE[N - 1] as f64, n) + .map(|x| x as i64) + .collect(); + + if verbose { println!("{:>8.2}s calculating...", seconds(Instant::now() - start)); } + + let mut n_incomplete_buckets = 0; + let mut n_skipped_buckets = 0; + let mut n_time_buckets = 0; + + 'a: for (i, event) in events.iter_mut().enumerate() { + + let mut min_time: i64 = event.time + buckets[0]; + let mut max_time: i64 = event.time + buckets[1]; + + 'oops: while times[cursor] > min_time && cursor > 0 { cursor -= 1; } + n_incomplete_buckets += (times[cursor] > min_time) as usize; + n_skipped_buckets += (times[cursor] > max_time) as usize; + + // find the beginning if there are gaps + 'b: while times[cursor] < min_time { + if cursor >= times.len() - 1 { + truncate_events = Some(i); + break 'a + } else { + cursor += 1 + } + } + + let mut j: usize = cursor; + + 'c: for k in 0..(n - 2) { + let mut wsum: f64 = 0.0; + let mut w: f64 = 0.0; + + 'd: while j < times.len() - 1 && times[j] < max_time { + wsum += totals[j]; + w += amounts[j]; + j += 1; + } + + event.data[k] = if w > 0.0 { wsum / w } else { NAN }; + + min_time = max_time; + max_time = event.time + buckets[k + 2]; + n_time_buckets += 1; + } + + if i % 512 == 0 { + assert!(max_time > min_time); + if verbose { + //let n_nan = event.data.iter().filter(|x| !x.is_finite()).count(); + println!("{:>8.2}s No. {:>5} {:>12.2}, {:>12.2}, {:>12.2} ...", //, {:>12.2}, {:>12.2}, {:>12.2} ...", + //cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}", + seconds(Instant::now() - start), i, + event.data[0], event.data[20], event.data[40]); //, event.data[60], event.data[80], event.data[100]); + //min_time, max_time, cursor, + //j, times[cursor], n_nan, max_time-min_time); + } + } + } + + assert!(truncate_events.is_none()); // for now + + if verbose { println!("{:>8.2} writing... (n_time_buckets={}, n_incomplete_buckets={}, n_skipped_buckets={})", seconds(Instant::now() - start), n_time_buckets, n_incomplete_buckets, n_skipped_buckets); } + + // we have to read this again because I could not figure out ownership problems + let events_csv = + fs::OpenOptions::new() + .read(true) + .open(events_csv)?; + + let mut events_csv = csv::Reader::from_reader(events_csv); + + let output_csv = + fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output)?; + + let mut wtr = csv::Writer::from_writer(output_csv); + + let data_cols: Vec = { + let mut xs = vec![0; n - 1]; + for i in 0..(n - 1) { + xs[i] = (buckets[i] + buckets[i + 1]) / 2; + } + xs + }; + + { + let headers = events_csv.byte_headers()?; + for col in headers.iter() { + wtr.write_field(col)?; + } + for col in data_cols.iter() { + wtr.write_field(&format!("{}", col))?; + } + wtr.write_record(None::<&[u8]>)?; + } + + let mut record = csv::ByteRecord::new(); + + for event in events { + if !events_csv.read_byte_record(&mut record)? { panic!("failed to read from events csv") } + for meta in record.iter() { + wtr.write_field(meta)?; + } + for val in event.data.iter() { + wtr.write_field(&format!("{}", val))?; + } + wtr.write_record(None::<&[u8]>)?; + } + + + if verbose { println!("{:>8.2} finished.", seconds(Instant::now() - start)); } + + Ok(()) +} + +/* +def to_tframe(version, df, trades, start): + d = {'bid': {}, 'ask': {}} + cursor = 0 + n = 0 + n_periods = 40 + xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos + mask = df['version'] == version + #my_trades = sorted(list(zip(df.loc[mask].index, df.loc[mask, 'side'], df.loc[mask, 'gid']))) + my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid']))) + #idx = trades.index + idx = trades.index.values.astype(np.int64) + amts = trades['amount'] + totals = trades['total'] + assert len(idx) == len(amts) + assert len(idx) == len(totals) + for tm, side, gid in my_trades: + print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor) + #min_time = tm + timedelta(milliseconds=xs[0]) + #max_time = tm + timedelta(milliseconds=xs[1]) + min_time = tm + xs[0] + max_time = tm + xs[1] + if idx[cursor] > min_time: + print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time) + while idx[cursor] > min_time and cursor > 0: + cursor -= 1 + else: + while idx[cursor] < min_time and cursor < len(idx) - 1: + cursor += 1 + i = 1 + j = cursor + d[side][gid] = {} + while i < len(xs) - 1: + wsum = 0.0 + w = 0.0 + while idx[j] < max_time: + wsum += totals[j] + w += amts[j] + j += 1 + if w > 0.0: + d[side][gid][xs[i]] = wsum / w + else: + d[side][gid][xs[i]] = np.nan + i += 1 + min_time = max_time + #max_time = tm + timedelta(milliseconds=xs[i]) + max_time = tm + xs[i] + n += 1 + d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index')) + d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index')) + #yield (version, d) + return d +*/ +