Browse Source

work through finished draft of preface

tags/v0.2.0
Jonathan Strong 1 year ago
parent
commit
6c53b9285e
9 changed files with 818 additions and 12 deletions
  1. +3
    -0
      .cargo/config
  2. +3
    -0
      .gitignore
  3. +30
    -1
      Cargo.toml
  4. +13
    -0
      pandas-naive.py
  5. +12
    -0
      pandas-read-csv.py
  6. +185
    -11
      src/csv.rs
  7. +52
    -0
      src/lib.rs
  8. +144
    -0
      src/munge.rs
  9. +376
    -0
      src/time_explorer.rs

+ 3
- 0
.cargo/config View File

@@ -0,0 +1,3 @@
[target.x86_64-unknown-linux-gnu]
rustflags = ["-C", "target-cpu=native"]


+ 3
- 0
.gitignore View File

@@ -1,3 +1,6 @@
/target
*.swp
Cargo.lock
var/
csv-bytes
csv-bytes-manual

+ 30
- 1
Cargo.toml View File

@@ -5,14 +5,43 @@ authors = ["Jonathan Strong <jonathan.strong@gmail.com>"]
edition = "2018"

[[bin]]
name = "baseline-csv"
name = "csv"
path = "src/csv.rs"

[[bin]]
name = "munge"
path = "src/munge.rs"

[[bin]]
path = "src/time_explorer.rs"
name = "time-explorer"

[lib]
path = "src/lib.rs"
name = "pipelines"

[dependencies]
csv = "1.1"
structopt = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
markets = { version = "0.2.1", registry = "jstrong-dev" }
slog = "2"
slog-async = "2"
slog-term = "2"
pretty_toa = "1"
atoi = "0.3"
lexical = "5.2"
chrono = { version = "0.4", features = ["serde"] }
clap = "2"
itertools-num = "0.1"

[profile.release]
lto = "fat"
panic = "abort"
incremental = false
codegen-units = 1

[features]
default = []
super-fast-csv-parsing = []

+ 13
- 0
pandas-naive.py View File

@@ -0,0 +1,13 @@
import sys
import pandas as pd

def main(csv_path):
df = pd.read_csv(csv_path)
print(df.info())

if __name__ == '__main__':
if len(sys.argv) < 2:
print("\n\nUSAGE:\n python pandas-naive.py <CSV_PATH>\n", file=sys.stderr)
sys.exit(1)
main(sys.argv[1])

+ 12
- 0
pandas-read-csv.py View File

@@ -0,0 +1,12 @@
import time
import sys
import pandas as pd

start = time.time()
path = sys.argv[1]
df = pd.read_csv(path, low_memory=False)
print('parsed csv file with {:,} rows in {:.1f}sec using pd.read_csv (pandas version = {})'.format(len(df), time.time()-start, pd.__version__))
print()
print(df.info())
print()
print(df.head())

+ 185
- 11
src/csv.rs View File

@@ -2,6 +2,8 @@

#[macro_use]
extern crate slog;
#[macro_use]
extern crate markets;

use std::path::PathBuf;
use std::time::*;
@@ -10,6 +12,7 @@ use std::fs;
use structopt::StructOpt;
use serde::{Serialize, Deserialize};
use slog::Drain;
use pretty_toa::ThousandsSep;
use markets::crypto::{Exchange, Ticker, Side};


@@ -19,6 +22,8 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
std::process::exit(1);
}}}

const PROGRESS_EVERY: usize = 1024 * 1024;


#[derive(Debug, StructOpt)]
struct Opt {
@@ -26,6 +31,11 @@ struct Opt {
#[structopt(short = "f", long = "trades-csv")]
#[structopt(parse(from_os_str))]
trades_csv: PathBuf,

/// Where to save the query results (CSV output)
#[structopt(short = "o", long = "output-path")]
#[structopt(parse(from_os_str))]
output_path: PathBuf,
}

#[derive(Deserialize)]
@@ -34,31 +44,195 @@ struct Trade {
pub time: u64,
pub exch: Exchange,
pub ticker: Ticker,
pub side: Option<Side>,
//pub side: Option<Side>,
pub price: f64,
pub amount: f64,
}

fn main() {
let start = Instant::now();
/*
struct HourSummary {
pub n_trades: usize,
pub
*/

let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
fn per_sec(n: usize, span: Duration) -> f64 {
if n == 0 || span < Duration::from_micros(1) { return 0.0 }
let s: f64 = span.as_nanos() as f64 / 1e9f64;
n as f64 / s
}

#[inline(always)]
fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> {
let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?)
.ok_or("parsing time failed")?;

let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
.map_err(|_| "parsing amount failed")?;

info!(logger, "initializing...");
let exch = match row.get(2).ok_or("no exch")? {
b"bmex" => e!(bmex),
b"bnce" => e!(bnce),
b"btfx" => e!(btfx),
b"gdax" => e!(gdax),
b"okex" => e!(okex),
b"bits" => e!(bits),
b"plnx" => e!(plnx),
b"krkn" => e!(krkn),
_ => return Err("illegal exch"),
};

let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
.map_err(|_| "parsing price failed")?;

let ticker = match row.get(6).ok_or("no ticker")? {
b"btc_usd" => t!(btc-usd),
b"eth_usd" => t!(eth-usd),
b"ltc_usd" => t!(ltc-usd),
b"etc_usd" => t!(etc-usd),
b"bch_usd" => t!(bch-usd),
b"xmr_usd" => t!(xmr-usd),
b"usdt_usd" => t!(usdt-usd),
_ => return Err("illegal ticker"),
};

Ok(Trade { time, amount, exch, price, ticker })
}

#[inline(always)]
fn manual_deserialize_str(row: &csv::StringRecord) -> Result<Trade, &'static str> {
let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes())
.ok_or("parsing time failed")?;

let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
.map_err(|_| "parsing amount failed")?;

let exch = match row.get(2).ok_or("no exch")? {
"bmex" => e!(bmex),
"bnce" => e!(bnce),
"btfx" => e!(btfx),
"gdax" => e!(gdax),
"okex" => e!(okex),
"bits" => e!(bits),
"plnx" => e!(plnx),
"krkn" => e!(krkn),
_ => return Err("illegal exch"),
};

let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
.map_err(|_| "parsing price failed")?;

let ticker = match row.get(6).ok_or("no ticker")? {
"btc_usd" => t!(btc-usd),
"eth_usd" => t!(eth-usd),
"ltc_usd" => t!(ltc-usd),
"etc_usd" => t!(etc-usd),
"bch_usd" => t!(bch-usd),
"xmr_usd" => t!(xmr-usd),
"usdt_usd" => t!(usdt-usd),
_ => return Err("illegal ticker"),
};

Ok(Trade { time, amount, exch, price, ticker })
}

fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
let opt = Opt::from_args();

info!(logger, "initializing...";
"trades-csv" => %opt.trades_csv.display(),
"output-path" => %opt.output_path.display()
);

if ! opt.trades_csv.exists() {
error!(logger, "path does not exist: {}", opt.trades_csv.display());
fatal!("Error: path does not exist: {}", opt.trades_csv.display());
}

info!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());
debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());

let rdr = fs::File::open(&opt.trades_csv)
.map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?;

let took = Instant::now() - start;
info!(logger, "finished in {:?}", took);
let rdr = io::BufReader::new(rdr);

let mut rdr = csv::Reader::from_reader(rdr);

// our data is ascii, so parsing with the slightly faster ByteRecord is fine
//let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
//let mut row = csv::ByteRecord::new();
//assert_eq!(headers.get(0), Some(&b"time"[..]));
//assert_eq!(headers.get(1), Some(&b"amount"[..]));
//assert_eq!(headers.get(2), Some(&b"exch"[..]));
//assert_eq!(headers.get(3), Some(&b"price"[..]));
//assert_eq!(headers.get(6), Some(&b"ticker"[..]));

//let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
let mut row = csv::StringRecord::new();

let mut n = 0;
let mut last_time = 0;

//while rdr.read_byte_record(&mut row)

while rdr.read_record(&mut row)
.map_err(|e| {
format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
})?
{
//let trade: Trade = row.deserialize(Some(&headers))
//let trade: Trade = manual_deserialize_bytes(&row)
let trade: Trade = manual_deserialize_str(&row)
.map_err(|e| {
format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
})?;

n += 1;

// verify data is sorted by time
debug_assert!(trade.time >= last_time);
last_time = trade.time;

if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
info!(logger, "parsing csv file";
"n rows" => %n.thousands_sep(),
"elapsed" => ?(Instant::now() - start),
);
}

if cfg!(debug_assertions) && n > PROGRESS_EVERY {
warn!(logger, "debug mode: exiting early";
"n rows" => %n.thousands_sep(),
"elapsed" => ?(Instant::now() - start),
);
break
}
}

Ok(n)
}

fn main() {
let start = Instant::now();

let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));

match run(start, &logger) {
Ok(n) => {
let took = Instant::now() - start;
info!(logger, "finished in {:?}", took;
"n rows" => %n.thousands_sep(),
"rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
);
}

Err(e) => {
crit!(logger, "run failed: {:?}", e);
eprintln!("\n\nError: {}", e);
std::thread::sleep(Duration::from_millis(100));
std::process::exit(1);
}
}
}

+ 52
- 0
src/lib.rs View File

@@ -0,0 +1,52 @@

#[allow(unused)]
#[cfg(test)]
mod tests {
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Trade {
pub time: i64,
pub price: f64,
pub amount: f64,
}

#[test]
fn serde_deserialize_json_example() {
assert!(matches!(
serde_json::from_str::<Trade>(r#"{"time":1527811201900505632,"price":7492.279785,"amount":0.048495,"exch":"bits","server_time":0,"side":null}"#),
Ok(Trade { time: 1527811201900505632, price: 7492.279785, amount: 0.048495 })
));

}

#[test]
fn serde_deserialize_csv_example() {
let csv = "time,amount,exch,price,server_time,side\n\
1527811201900505632,0.048495,bits,7492.279785,0,";

let mut csv_reader = csv::Reader::from_reader(csv.as_bytes());

let headers = csv_reader
.headers()
.expect("parsing row headers failed")
.clone();

let mut row = csv::StringRecord::new();

assert!(matches!(
csv_reader.read_record(&mut row),
Ok(true)
));

assert!(matches!(
row.deserialize(Some(&headers)),
Ok(Trade { time: 1527811201900505632, price: 7492.279785, amount: 0.048495 })
));

assert!(matches!(
csv_reader.read_record(&mut row),
Ok(false)
));
}
}

+ 144
- 0
src/munge.rs View File

@@ -0,0 +1,144 @@
#![allow(unused)]

#[macro_use]
extern crate slog;
#[macro_use]
extern crate markets;

use std::io::{self, prelude::*};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::*;
use pretty_toa::ThousandsSep;
use structopt::StructOpt;
use serde::{Serialize, Deserialize};
use slog::Drain;
use chrono::{DateTime, Utc};
use markets::crypto::{Exchange, Ticker, Side};

macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
eprintln!($fmt, $($args)*);
std::process::exit(1);
}}}



#[structopt(rename_all="kebab-case")]
#[derive(Debug, StructOpt)]
enum Opt {

/// Filter trades-csv by start,end range and save subset to output-path
Range {

/// Path to CSV file with trades data
#[structopt(short = "f", long = "trades-csv")]
#[structopt(parse(from_os_str))]
trades_csv: PathBuf,

/// Where to save the query results (CSV output)
#[structopt(short = "o", long = "output-path")]
#[structopt(parse(from_os_str))]
output_path: PathBuf,

/// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
start: DateTime<Utc>,

/// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
end: DateTime<Utc>,

},
}

#[derive(Deserialize)]
struct Trade {
/// Unix nanoseconds
pub time: u64,
pub exch: Exchange,
pub ticker: Ticker,
//pub side: Option<Side>,
pub price: f64,
pub amount: f64,
}

fn per_sec(n: usize, span: Duration) -> f64 {
if n == 0 || span < Duration::from_micros(1) { return 0.0 }
let s: f64 = span.as_nanos() as f64 / 1e9f64;
n as f64 / s
}

fn nanos(utc: DateTime<Utc>) -> u64 {
(utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64)
}

fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
let opt = Opt::from_args();

let mut n = 0;

match opt {
Opt::Range { trades_csv, output_path, start, end } => {
let logger = logger.new(o!("cmd" => "range"));

info!(logger, "beginning range cmd";
"trades_csv" => %trades_csv.display(),
"output_path" => %output_path.display(),
"start" => %start,
"end" => %end,
);

if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }

info!(logger, "opening trades_csv file");

let rdr = fs::File::open(&trades_csv)
.map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;

let rdr = io::BufReader::new(rdr);

let mut rdr = csv::Reader::from_reader(rdr);

let wtr = fs::File::create(&output_path)
.map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;

let wtr = io::BufWriter::new(wtr);

let mut wtr = csv::Writer::from_writer(wtr);

let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
let mut row = csv::ByteRecord::new();

let start_nanos = nanos(start);
let end_nanos = nanos(end);

}
}

Ok(n)
}

fn main() {
let start = Instant::now();

let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));

match run(start, &logger) {
Ok(n) => {
let took = Instant::now() - start;
info!(logger, "finished in {:?}", took;
"n rows" => %n.thousands_sep(),
"rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
);
}

Err(e) => {
crit!(logger, "run failed: {:?}", e);
eprintln!("\n\nError: {}", e);
std::thread::sleep(Duration::from_millis(100));
std::process::exit(1);
}
}

}

+ 376
- 0
src/time_explorer.rs View File

@@ -0,0 +1,376 @@
#![allow(unused_imports)]
#![allow(unused_labels)]

use std::str::FromStr;
use std::time::{Instant, Duration};
use std::{fs, io};
use std::io::prelude::*;
use std::str::from_utf8;
use std::error::Error;
use std::f64::NAN;
use serde::{Serialize, Deserialize};
use itertools_num::linspace;
use std::collections::HashMap as Map;

const N: usize = 128;
const LOGSPACE: [i64; 128] =
[-2134300000000, -1854700000000, -1611800000000, -1400600000000,
-1217200000000, -1057700000000, -919200000000, -798800000000,
-694100000000, -603200000000, -524200000000, -455500000000,
-395800000000, -344000000000, -298900000000, -259700000000,
-225700000000, -196100000000, -170400000000, -148100000000,
-128700000000, -111800000000, -97200000000, -84400000000,
-73400000000, -63800000000, -55400000000, -48100000000,
-41800000000, -36300000000, -31600000000, -27400000000,
-23800000000, -20700000000, -18000000000, -15600000000,
-13600000000, -11800000000, -10200000000, -8900000000,
-7700000000, -6700000000, -5800000000, -5000000000,
-4400000000, -3800000000, -3300000000, -2900000000,
-2500000000, -2100000000, -1900000000, -1600000000,
-1400000000, -1200000000, -1000000000, -900000000,
-800000000, -700000000, -600000000, -500000000,
-400000000, -300000000, -200000000, -100000000,
100000000, 200000000, 300000000, 400000000,
500000000, 600000000, 700000000, 800000000,
900000000, 1000000000, 1200000000, 1400000000,
1600000000, 1900000000, 2100000000, 2500000000,
2900000000, 3300000000, 3800000000, 4400000000,
5000000000, 5800000000, 6700000000, 7700000000,
8900000000, 10200000000, 11800000000, 13600000000,
15600000000, 18000000000, 20700000000, 23800000000,
27400000000, 31600000000, 36300000000, 41800000000,
48100000000, 55400000000, 63800000000, 73400000000,
84400000000, 97200000000, 111800000000, 128700000000,
148100000000, 170400000000, 196100000000, 225700000000,
259700000000, 298900000000, 344000000000, 395800000000,
455500000000, 524200000000, 603200000000, 694100000000,
798800000000, 919200000000, 1057700000000, 1217200000000,
1400600000000, 1611800000000, 1854700000000, 2134300000000];


#[derive(Deserialize)]
struct Trade {
pub time: i64,
pub price: f64,
pub amount: f64,
}

/// Use this to deserialize just the time column on the first pass through
/// the events file.
#[derive(Deserialize)]
struct EventTime {
pub time: i64,
}

struct Event {
pub time: i64,
pub data: Vec<f64>,
}

pub fn seconds(d: Duration) -> f64 {
d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64)
}

fn main() -> Result<(), Box<dyn Error>> {
let start = Instant::now();
let args: clap::ArgMatches = clap::App::new("time-explorer")
.version("0.1")
.arg(clap::Arg::with_name("trades")
.long("trades-csv")
.short("t")
.help("Path of csv with time (integer nanoseconds timestamp), \
price (f64), and amount (f64) columns.")
.takes_value(true)
.required(true))
.arg(clap::Arg::with_name("events")
.long("events-csv")
.short("e")
.help("Path of csv file with a time (integer nanoseconds timestamp) as column 0, \
along with any other metadata columns that will be included in results")
.takes_value(true)
.required(true))
.arg(clap::Arg::with_name("output")
.long("output-file")
.short("o")
.help("Path to save results csv to")
.takes_value(true)
.required(true))
.arg(clap::Arg::with_name("verbose")
.long("verbose")
.short("v"))
.arg(clap::Arg::with_name("n-periods")
.long("n-periods")
.short("n")
.help("Controls how many time buckets are evaluated")
.takes_value(true)
.default_value("50"))
.get_matches();

let verbose = args.is_present("verbose");

if verbose { println!("{:>8.2}s reading...", seconds(Instant::now() - start)); }

let trades_csv = args.value_of("trades").unwrap();
let events_csv = args.value_of("events").unwrap();
let output = args.value_of("output").unwrap();
let n: &str = args.value_of("n-periods").unwrap();
let n: usize = usize::from_str(n)?;

let trades_csv =
fs::OpenOptions::new()
.read(true)
.open(trades_csv)?;

let mut times: Vec<i64> = Vec::with_capacity(8192);
let mut amounts: Vec<f64> = Vec::with_capacity(8192);
let mut totals: Vec<f64> = Vec::with_capacity(8192);

#[cfg(feature = "super-fast-csv-parsing")]
{
// lookout below! MANY unwraps in here

// note: this code NOT part of original time-explorer. this code is what
// I was referring to in the "fine print" note where it says "With 10
// minutes work (knowing what I know today), I was able to get CSV parsing
// down to 3.46sec"

let mut rdr = csv::Reader::from_reader(io::BufReader::new(rdr));
let headers = rdr.byte_headers().unwrap().clone();
let mut row = csv::ByteRecord::new();
let mut col_index: [usize; 3] = [
headers.iter().position(|x| x == b"time").unwrap(),
headers.iter().position(|x| x == b"amount").unwrap(),
headers.iter().position(|x| x == b"price").unwrap(),
];

while rdr.read_byte_record(&mut row).unwrap() {
times.push(atoi::atoi(row.get(col_index[0]).unwrap()).unwrap());

let amount: f64 = lexical::parse(row.get(col_index[1]).unwrap()).unwrap();
let price: f64 = lexical::parse(row.get(col_index[2]).unwrap()).unwrap();

totals.push(price * amount);
amounts.push(amount);
}
}

#[cfg(not(feature = "super-fast-csv-parsing"))]
{
// this is what was originally in time-explorer

let mut trades: Vec<Trade> =
csv::Reader::from_reader(trades_csv)
.deserialize()
.map(|x| x.unwrap())
.collect();

trades.sort_by_key(|k| k.time);

for Trade { time, price, amount } in trades {
times.push(time);
totals.push(price * amount);
amounts.push(amount);
}
}

if verbose { println!("{:>8.2}s finished parsing trades csv (times.len() = {}) ...", seconds(Instant::now() - start), times.len()); }

let mut events: Vec<Event> = {
let events_csv =
fs::OpenOptions::new()
.read(true)
.open(events_csv)?;

csv::Reader::from_reader(events_csv)
.deserialize()
.map(|t: Result<EventTime, _>| {
let EventTime { time } = t.unwrap();
//let data = [0.0; N - 1];
let data = vec![0.0; n - 1];
Event { time, data }
}).collect()
};

assert!(!events.is_empty());

events.sort_by_key(|k| k.time);

let mut cursor: usize = 0;
let mut truncate_events = None;

let buckets: Vec<i64> =
linspace(LOGSPACE[0] as f64, LOGSPACE[N - 1] as f64, n)
.map(|x| x as i64)
.collect();
if verbose { println!("{:>8.2}s calculating...", seconds(Instant::now() - start)); }

let mut n_incomplete_buckets = 0;
let mut n_skipped_buckets = 0;
let mut n_time_buckets = 0;

'a: for (i, event) in events.iter_mut().enumerate() {

let mut min_time: i64 = event.time + buckets[0];
let mut max_time: i64 = event.time + buckets[1];

'oops: while times[cursor] > min_time && cursor > 0 { cursor -= 1; }
n_incomplete_buckets += (times[cursor] > min_time) as usize;
n_skipped_buckets += (times[cursor] > max_time) as usize;

// find the beginning if there are gaps
'b: while times[cursor] < min_time {
if cursor >= times.len() - 1 {
truncate_events = Some(i);
break 'a
} else {
cursor += 1
}
}

let mut j: usize = cursor;

'c: for k in 0..(n - 2) {
let mut wsum: f64 = 0.0;
let mut w: f64 = 0.0;

'd: while j < times.len() - 1 && times[j] < max_time {
wsum += totals[j];
w += amounts[j];
j += 1;
}

event.data[k] = if w > 0.0 { wsum / w } else { NAN };

min_time = max_time;
max_time = event.time + buckets[k + 2];
n_time_buckets += 1;
}

if i % 512 == 0 {
assert!(max_time > min_time);
if verbose {
//let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
println!("{:>8.2}s No. {:>5} {:>12.2}, {:>12.2}, {:>12.2} ...", //, {:>12.2}, {:>12.2}, {:>12.2} ...",
//cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}",
seconds(Instant::now() - start), i,
event.data[0], event.data[20], event.data[40]); //, event.data[60], event.data[80], event.data[100]);
//min_time, max_time, cursor,
//j, times[cursor], n_nan, max_time-min_time);
}
}
}

assert!(truncate_events.is_none()); // for now

if verbose { println!("{:>8.2} writing... (n_time_buckets={}, n_incomplete_buckets={}, n_skipped_buckets={})", seconds(Instant::now() - start), n_time_buckets, n_incomplete_buckets, n_skipped_buckets); }

// we have to read this again because I could not figure out ownership problems
let events_csv =
fs::OpenOptions::new()
.read(true)
.open(events_csv)?;

let mut events_csv = csv::Reader::from_reader(events_csv);

let output_csv =
fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(output)?;

let mut wtr = csv::Writer::from_writer(output_csv);

let data_cols: Vec<i64> = {
let mut xs = vec![0; n - 1];
for i in 0..(n - 1) {
xs[i] = (buckets[i] + buckets[i + 1]) / 2;
}
xs
};

{
let headers = events_csv.byte_headers()?;
for col in headers.iter() {
wtr.write_field(col)?;
}
for col in data_cols.iter() {
wtr.write_field(&format!("{}", col))?;
}
wtr.write_record(None::<&[u8]>)?;
}

let mut record = csv::ByteRecord::new();

for event in events {
if !events_csv.read_byte_record(&mut record)? { panic!("failed to read from events csv") }
for meta in record.iter() {
wtr.write_field(meta)?;
}
for val in event.data.iter() {
wtr.write_field(&format!("{}", val))?;
}
wtr.write_record(None::<&[u8]>)?;
}


if verbose { println!("{:>8.2} finished.", seconds(Instant::now() - start)); }

Ok(())
}

/*
def to_tframe(version, df, trades, start):
d = {'bid': {}, 'ask': {}}
cursor = 0
n = 0
n_periods = 40
xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos
mask = df['version'] == version
#my_trades = sorted(list(zip(df.loc[mask].index, df.loc[mask, 'side'], df.loc[mask, 'gid'])))
my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid'])))
#idx = trades.index
idx = trades.index.values.astype(np.int64)
amts = trades['amount']
totals = trades['total']
assert len(idx) == len(amts)
assert len(idx) == len(totals)
for tm, side, gid in my_trades:
print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor)
#min_time = tm + timedelta(milliseconds=xs[0])
#max_time = tm + timedelta(milliseconds=xs[1])
min_time = tm + xs[0]
max_time = tm + xs[1]
if idx[cursor] > min_time:
print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time)
while idx[cursor] > min_time and cursor > 0:
cursor -= 1
else:
while idx[cursor] < min_time and cursor < len(idx) - 1:
cursor += 1
i = 1
j = cursor
d[side][gid] = {}
while i < len(xs) - 1:
wsum = 0.0
w = 0.0
while idx[j] < max_time:
wsum += totals[j]
w += amts[j]
j += 1
if w > 0.0:
d[side][gid][xs[i]] = wsum / w
else:
d[side][gid][xs[i]] = np.nan
i += 1
min_time = max_time
#max_time = tm + timedelta(milliseconds=xs[i])
max_time = tm + xs[i]
n += 1
d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index'))
d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index'))
#yield (version, d)
return d
*/


Loading…
Cancel
Save