#![allow(unused_imports)] #[macro_use] extern crate serde_derive; //extern crate decimal; extern crate clap; extern crate serde; extern crate csv; extern crate itertools_num; extern crate map; use std::str::FromStr; use std::time::{Instant, Duration}; use std::{fs, io}; use std::str::from_utf8; use std::error::Error; use itertools_num::linspace; use map::Map; const N: usize = 128; const LOGSPACE: [i64; 128] = [-2134300000000, -1854700000000, -1611800000000, -1400600000000, -1217200000000, -1057700000000, -919200000000, -798800000000, -694100000000, -603200000000, -524200000000, -455500000000, -395800000000, -344000000000, -298900000000, -259700000000, -225700000000, -196100000000, -170400000000, -148100000000, -128700000000, -111800000000, -97200000000, -84400000000, -73400000000, -63800000000, -55400000000, -48100000000, -41800000000, -36300000000, -31600000000, -27400000000, -23800000000, -20700000000, -18000000000, -15600000000, -13600000000, -11800000000, -10200000000, -8900000000, -7700000000, -6700000000, -5800000000, -5000000000, -4400000000, -3800000000, -3300000000, -2900000000, -2500000000, -2100000000, -1900000000, -1600000000, -1400000000, -1200000000, -1000000000, -900000000, -800000000, -700000000, -600000000, -500000000, -400000000, -300000000, -200000000, -100000000, 100000000, 200000000, 300000000, 400000000, 500000000, 600000000, 700000000, 800000000, 900000000, 1000000000, 1200000000, 1400000000, 1600000000, 1900000000, 2100000000, 2500000000, 2900000000, 3300000000, 3800000000, 4400000000, 5000000000, 5800000000, 6700000000, 7700000000, 8900000000, 10200000000, 11800000000, 13600000000, 15600000000, 18000000000, 20700000000, 23800000000, 27400000000, 31600000000, 36300000000, 41800000000, 48100000000, 55400000000, 63800000000, 73400000000, 84400000000, 97200000000, 111800000000, 128700000000, 148100000000, 170400000000, 196100000000, 225700000000, 259700000000, 298900000000, 344000000000, 395800000000, 455500000000, 524200000000, 603200000000, 694100000000, 798800000000, 919200000000, 1057700000000, 1217200000000, 1400600000000, 1611800000000, 1854700000000, 2134300000000]; #[derive(Deserialize)] struct Trade { //<'a> { pub time: i64, //pub exch: &'a str, pub price: f64, pub amount: f64, } /// Use this to deserialize just the time column on the first pass through /// the events file. #[derive(Deserialize)] struct EventTime { pub time: i64, } struct Event { pub time: i64, //pub meta: Vec<&'a str>, //pub meta: &'a csv::StringRecord, pub data: Vec, } pub fn seconds(d: Duration) -> f64 { d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64) } fn main() -> Result<(), Box> { let start = Instant::now(); let args: clap::ArgMatches = clap::App::new("time-explorer") .version("0.1") .arg(clap::Arg::with_name("trades") .long("trades-csv") .short("t") .help("Path of csv with time (integer nanoseconds timestamp), \ price (f64), and amount (f64) columns.") .takes_value(true) .required(true)) .arg(clap::Arg::with_name("events") .long("events-csv") .short("e") .help("Path of csv file with a time (integer nanoseconds timestamp) as column 0, \ along with any other metadata columns that will be included in results") .takes_value(true) .required(true)) .arg(clap::Arg::with_name("output") .long("output-file") .short("o") .help("Path to save results csv to") .takes_value(true) .required(true)) .arg(clap::Arg::with_name("verbose") .long("verbose") .short("v")) .arg(clap::Arg::with_name("n-periods") .long("n-periods") .short("n") .help("Controls how many time buckets are evaluated") .takes_value(true) .default_value("50")) .get_matches(); //assert_eq!(N, LOGSPACE.len()); let verbose = args.is_present("verbose"); if verbose { println!("{:>8.2}s reading...", seconds(Instant::now() - start)); } let trades_csv = args.value_of("trades").unwrap(); let events_csv = args.value_of("events").unwrap(); let output = args.value_of("output").unwrap(); let n: &str = args.value_of("n-periods").unwrap(); let n: usize = usize::from_str(n)?; //println!("{:>8.2s} opening files...", seconds(Instant::now() - start)); let trades_csv = fs::OpenOptions::new() .read(true) .open(trades_csv)?; let mut trades: Vec = csv::Reader::from_reader(trades_csv) .deserialize() .map(|x| x.unwrap()) .collect(); //println!("{:>8.2} sorting trades...", seconds(Instant::now() - start)); trades.sort_by_key(|k| k.time); // for i in 1..(trades.len() - 1) { // assert!(trades[i].time >= trades[i-1].time); // } //println!("{:>8.2}s arranging in columnar format...", seconds(Instant::now() - start)); let mut times: Vec = Vec::with_capacity(trades.len()); //let mut prices: Vec = Vec::with_capacity(trades.len()); let mut amounts: Vec = Vec::with_capacity(trades.len()); let mut totals: Vec = Vec::with_capacity(trades.len()); for Trade { time, price, amount } in trades { times.push(time); totals.push(price * amount); amounts.push(amount); } // let mut headers = csv::ByteRecord::new(); // rdr.read_byte_record(&mut headers)?; // assert!(&headers.as_slice()[headers.range(0).unwrap()] == b"time"); // let mut record = csv::ByteRecord::new(); //let headers = rdr.headers()?; //assert_eq!(headers.get(0), Some("time")); let mut events: Vec = { let events_csv = fs::OpenOptions::new() .read(true) .open(events_csv)?; csv::Reader::from_reader(events_csv) .deserialize() .map(|t: Result| { let EventTime { time } = t.unwrap(); //let data = [0.0; N - 1]; let data = vec![0.0; n - 1]; Event { time, data } }).collect() }; //while rdr.read_byte_record(&mut record)? { /* for record in rdr.records() { let meta = record?; //let time = i64::from_str(from_utf8(&record[0])?)?; let time = i64::from_str(&meta[0])?; // let meta = // record.iter() // .skip(1) // //.map(|x| x.deserialize().unwrap()) // .collect(); let data = [0.0; N - 1]; events.push(Event { time, data }); } */ assert!(!events.is_empty()); // for i in 1..(events.len() - 1) { // assert!(events[i].time >= events[i-1].time); // } events.sort_by_key(|k| k.time); //println!("events[0].time={}, LOGSPACE[0]={}, events[0].time + LOGSPACE[0]={}", // events[0].time, LOGSPACE[0], events[0].time + LOGSPACE[0]); let mut cursor: usize = 0; let mut truncate_events = None; let buckets: Vec = linspace(LOGSPACE[0] as f64, LOGSPACE[N - 1] as f64, n) .map(|x| x as i64) .collect(); if verbose { println!("{:>8.2}s calculating...", seconds(Instant::now() - start)); } 'a: for (i, event) in events.iter_mut().enumerate() { //let mut min_time: i64 = event.time + LOGSPACE[0]; //let mut max_time: i64 = event.time + LOGSPACE[1]; let mut min_time: i64 = event.time + buckets[0]; let mut max_time: i64 = event.time + buckets[1]; /* let n_nan = event.data.iter().filter(|x| !x.is_finite()).count(); println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \ cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i, event.data[0], event.data[10], event.data[100], min_time, max_time, cursor, "_", times[cursor], "_", n_nan, max_time-min_time); println!(); */ // find the beginning if there are gaps 'b: while times[cursor] < min_time { if cursor >= times.len() - 1 { truncate_events = Some(i); break 'a } else { cursor += 1 } } let mut j: usize = cursor; 'c: for k in 0..(n - 2) { let mut wsum: f64 = 0.0; let mut w: f64 = 0.0; 'd: while j < times.len() - 1 && times[j] < max_time { wsum += totals[j]; w += amounts[j]; j += 1; } if w > 0.0 { event.data[k] = wsum / w; } else { event.data[k] = ::std::f64::NAN; } /* let n_nan = event.data.iter().filter(|x| !x.is_finite()).count(); println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \ cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={} \ wsum={:.3}, w={:.3}", seconds(Instant::now() - start), i, event.data[0], event.data[10], event.data[100], min_time, max_time, cursor, j, times[cursor], times[j], n_nan, max_time-min_time, wsum, w); */ min_time = max_time; max_time = event.time + buckets[k + 2]; //println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \ // cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i, // event.data[0], event.data[10], event.data[100], min_time, max_time, cursor, // j, times[cursor], n_nan, max_time-min_time); } /* let n_nan = event.data.iter().filter(|x| !x.is_finite()).count(); println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \ cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i, event.data[0], event.data[10], event.data[100], min_time, max_time, cursor, j, times[cursor], times[j], n_nan, max_time-min_time); println!(); */ if i % 100 == 0 { assert!(max_time > min_time); if verbose { //let n_nan = event.data.iter().filter(|x| !x.is_finite()).count(); println!("{:>8.2}s No. {:>5} {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2} ...", //cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i, event.data[0], event.data[20], event.data[40], event.data[60], event.data[80], event.data[100]); //min_time, max_time, cursor, //j, times[cursor], n_nan, max_time-min_time); } } } assert!(truncate_events.is_none()); // for now if verbose { println!("{:>8.2} writing...", seconds(Instant::now() - start)); } // we have to read this again because I could not figure out ownership problems let events_csv = fs::OpenOptions::new() .read(true) .open(events_csv)?; let mut events_csv = csv::Reader::from_reader(events_csv); let output_csv = fs::OpenOptions::new() .write(true) .create(true) .truncate(true) .open(output)?; let mut wtr = csv::Writer::from_writer(output_csv); let data_cols: Vec = { let mut xs = vec![0; n - 1]; for i in 0..(n - 1) { //xs[i] = ((LOGSPACE[i] + LOGSPACE[i + 1]) as f64 / 2.0) as i64; xs[i] = (buckets[i] + buckets[i + 1]) / 2; } xs }; { let headers = events_csv.byte_headers()?; //println!("{:?}", headers); for col in headers.iter() { wtr.write_field(col)?; } for col in data_cols.iter() { wtr.write_field(&format!("{}", col))?; } wtr.write_record(None::<&[u8]>)?; } let mut record = csv::ByteRecord::new(); for event in events { if !events_csv.read_byte_record(&mut record)? { panic!("failed to read from events csv") } for meta in record.iter() { wtr.write_field(meta)?; } for val in event.data.iter() { wtr.write_field(&format!("{}", val))?; } wtr.write_record(None::<&[u8]>)?; } if verbose { println!("{:>8.2} finished.", seconds(Instant::now() - start)); } Ok(()) } /* def to_tframe(version, df, trades, start): d = {'bid': {}, 'ask': {}} cursor = 0 n = 0 n_periods = 40 xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos mask = df['version'] == version #my_trades = sorted(list(zip(df.loc[mask].index, df.loc[mask, 'side'], df.loc[mask, 'gid']))) my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid']))) #idx = trades.index idx = trades.index.values.astype(np.int64) amts = trades['amount'] totals = trades['total'] assert len(idx) == len(amts) assert len(idx) == len(totals) for tm, side, gid in my_trades: print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor) #min_time = tm + timedelta(milliseconds=xs[0]) #max_time = tm + timedelta(milliseconds=xs[1]) min_time = tm + xs[0] max_time = tm + xs[1] if idx[cursor] > min_time: print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time) while idx[cursor] > min_time and cursor > 0: cursor -= 1 else: while idx[cursor] < min_time and cursor < len(idx) - 1: cursor += 1 i = 1 j = cursor d[side][gid] = {} while i < len(xs) - 1: wsum = 0.0 w = 0.0 while idx[j] < max_time: wsum += totals[j] w += amts[j] j += 1 if w > 0.0: d[side][gid][xs[i]] = wsum / w else: d[side][gid][xs[i]] = np.nan i += 1 min_time = max_time #max_time = tm + timedelta(milliseconds=xs[i]) max_time = tm + xs[i] n += 1 d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index')) d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index')) #yield (version, d) return d */