|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- #![allow(unused_imports)]
-
- #[macro_use] extern crate serde_derive;
-
- //extern crate decimal;
- extern crate clap;
- extern crate serde;
- extern crate csv;
- extern crate itertools_num;
-
- extern crate map;
-
- use std::str::FromStr;
- use std::time::{Instant, Duration};
- use std::{fs, io};
- use std::str::from_utf8;
- use std::error::Error;
- use itertools_num::linspace;
- use map::Map;
-
- const N: usize = 128;
- const LOGSPACE: [i64; 128] =
- [-2134300000000, -1854700000000, -1611800000000, -1400600000000,
- -1217200000000, -1057700000000, -919200000000, -798800000000,
- -694100000000, -603200000000, -524200000000, -455500000000,
- -395800000000, -344000000000, -298900000000, -259700000000,
- -225700000000, -196100000000, -170400000000, -148100000000,
- -128700000000, -111800000000, -97200000000, -84400000000,
- -73400000000, -63800000000, -55400000000, -48100000000,
- -41800000000, -36300000000, -31600000000, -27400000000,
- -23800000000, -20700000000, -18000000000, -15600000000,
- -13600000000, -11800000000, -10200000000, -8900000000,
- -7700000000, -6700000000, -5800000000, -5000000000,
- -4400000000, -3800000000, -3300000000, -2900000000,
- -2500000000, -2100000000, -1900000000, -1600000000,
- -1400000000, -1200000000, -1000000000, -900000000,
- -800000000, -700000000, -600000000, -500000000,
- -400000000, -300000000, -200000000, -100000000,
- 100000000, 200000000, 300000000, 400000000,
- 500000000, 600000000, 700000000, 800000000,
- 900000000, 1000000000, 1200000000, 1400000000,
- 1600000000, 1900000000, 2100000000, 2500000000,
- 2900000000, 3300000000, 3800000000, 4400000000,
- 5000000000, 5800000000, 6700000000, 7700000000,
- 8900000000, 10200000000, 11800000000, 13600000000,
- 15600000000, 18000000000, 20700000000, 23800000000,
- 27400000000, 31600000000, 36300000000, 41800000000,
- 48100000000, 55400000000, 63800000000, 73400000000,
- 84400000000, 97200000000, 111800000000, 128700000000,
- 148100000000, 170400000000, 196100000000, 225700000000,
- 259700000000, 298900000000, 344000000000, 395800000000,
- 455500000000, 524200000000, 603200000000, 694100000000,
- 798800000000, 919200000000, 1057700000000, 1217200000000,
- 1400600000000, 1611800000000, 1854700000000, 2134300000000];
-
- #[derive(Deserialize)]
- struct Trade { //<'a> {
- pub time: i64,
- //pub exch: &'a str,
- pub price: f64,
- pub amount: f64,
- }
-
- /// Use this to deserialize just the time column on the first pass through
- /// the events file.
- #[derive(Deserialize)]
- struct EventTime {
- pub time: i64,
- }
-
- struct Event {
- pub time: i64,
- //pub meta: Vec<&'a str>,
- //pub meta: &'a csv::StringRecord,
- pub data: Vec<f64>,
- }
-
- pub fn seconds(d: Duration) -> f64 {
- d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64)
- }
-
- fn main() -> Result<(), Box<Error>> {
- let start = Instant::now();
- let args: clap::ArgMatches = clap::App::new("time-explorer")
- .version("0.1")
- .arg(clap::Arg::with_name("trades")
- .long("trades-csv")
- .short("t")
- .help("Path of csv with time (integer nanoseconds timestamp), \
- price (f64), and amount (f64) columns.")
- .takes_value(true)
- .required(true))
- .arg(clap::Arg::with_name("events")
- .long("events-csv")
- .short("e")
- .help("Path of csv file with a time (integer nanoseconds timestamp) as column 0, \
- along with any other metadata columns that will be included in results")
- .takes_value(true)
- .required(true))
- .arg(clap::Arg::with_name("output")
- .long("output-file")
- .short("o")
- .help("Path to save results csv to")
- .takes_value(true)
- .required(true))
- .arg(clap::Arg::with_name("verbose")
- .long("verbose")
- .short("v"))
- .arg(clap::Arg::with_name("n-periods")
- .long("n-periods")
- .short("n")
- .help("Controls how many time buckets are evaluated")
- .takes_value(true)
- .default_value("50"))
- .get_matches();
-
- //assert_eq!(N, LOGSPACE.len());
-
- let verbose = args.is_present("verbose");
-
- if verbose { println!("{:>8.2}s reading...", seconds(Instant::now() - start)); }
-
- let trades_csv = args.value_of("trades").unwrap();
- let events_csv = args.value_of("events").unwrap();
- let output = args.value_of("output").unwrap();
- let n: &str = args.value_of("n-periods").unwrap();
- let n: usize = usize::from_str(n)?;
- //println!("{:>8.2s} opening files...", seconds(Instant::now() - start));
- let trades_csv =
- fs::OpenOptions::new()
- .read(true)
- .open(trades_csv)?;
-
- let mut trades: Vec<Trade> =
- csv::Reader::from_reader(trades_csv)
- .deserialize()
- .map(|x| x.unwrap())
- .collect();
-
- //println!("{:>8.2} sorting trades...", seconds(Instant::now() - start));
- trades.sort_by_key(|k| k.time);
-
- // for i in 1..(trades.len() - 1) {
- // assert!(trades[i].time >= trades[i-1].time);
- // }
-
- //println!("{:>8.2}s arranging in columnar format...", seconds(Instant::now() - start));
-
- let mut times: Vec<i64> = Vec::with_capacity(trades.len());
- //let mut prices: Vec<f64> = Vec::with_capacity(trades.len());
- let mut amounts: Vec<f64> = Vec::with_capacity(trades.len());
- let mut totals: Vec<f64> = Vec::with_capacity(trades.len());
-
- for Trade { time, price, amount } in trades {
- times.push(time);
- totals.push(price * amount);
- amounts.push(amount);
- }
-
- // let mut headers = csv::ByteRecord::new();
- // rdr.read_byte_record(&mut headers)?;
- // assert!(&headers.as_slice()[headers.range(0).unwrap()] == b"time");
- // let mut record = csv::ByteRecord::new();
-
- //let headers = rdr.headers()?;
- //assert_eq!(headers.get(0), Some("time"));
-
- let mut events: Vec<Event> = {
- let events_csv =
- fs::OpenOptions::new()
- .read(true)
- .open(events_csv)?;
-
- csv::Reader::from_reader(events_csv)
- .deserialize()
- .map(|t: Result<EventTime, _>| {
- let EventTime { time } = t.unwrap();
- //let data = [0.0; N - 1];
- let data = vec![0.0; n - 1];
- Event { time, data }
- }).collect()
- };
- //while rdr.read_byte_record(&mut record)? {
-
- /*
- for record in rdr.records() {
- let meta = record?;
- //let time = i64::from_str(from_utf8(&record[0])?)?;
- let time = i64::from_str(&meta[0])?;
- // let meta =
- // record.iter()
- // .skip(1)
- // //.map(|x| x.deserialize().unwrap())
- // .collect();
- let data = [0.0; N - 1];
- events.push(Event { time, data });
- }
- */
-
- assert!(!events.is_empty());
-
- // for i in 1..(events.len() - 1) {
- // assert!(events[i].time >= events[i-1].time);
- // }
-
- events.sort_by_key(|k| k.time);
- //println!("events[0].time={}, LOGSPACE[0]={}, events[0].time + LOGSPACE[0]={}",
- // events[0].time, LOGSPACE[0], events[0].time + LOGSPACE[0]);
- let mut cursor: usize = 0;
- let mut truncate_events = None;
-
- let buckets: Vec<i64> =
- linspace(LOGSPACE[0] as f64, LOGSPACE[N - 1] as f64, n)
- .map(|x| x as i64)
- .collect();
-
- if verbose { println!("{:>8.2}s calculating...", seconds(Instant::now() - start)); }
-
- 'a: for (i, event) in events.iter_mut().enumerate() {
- //let mut min_time: i64 = event.time + LOGSPACE[0];
- //let mut max_time: i64 = event.time + LOGSPACE[1];
-
- let mut min_time: i64 = event.time + buckets[0];
- let mut max_time: i64 = event.time + buckets[1];
-
-
- /*
- let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
- println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
- cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i,
- event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
- "_", times[cursor], "_", n_nan, max_time-min_time);
-
- println!();
- */
-
- // find the beginning if there are gaps
- 'b: while times[cursor] < min_time {
- if cursor >= times.len() - 1 {
- truncate_events = Some(i);
- break 'a
- } else {
- cursor += 1
- }
- }
-
- let mut j: usize = cursor;
-
- 'c: for k in 0..(n - 2) {
- let mut wsum: f64 = 0.0;
- let mut w: f64 = 0.0;
-
- 'd: while j < times.len() - 1 && times[j] < max_time {
- wsum += totals[j];
- w += amounts[j];
- j += 1;
- }
-
- if w > 0.0 {
- event.data[k] = wsum / w;
- } else {
- event.data[k] = ::std::f64::NAN;
- }
-
- /*
- let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
- println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
- cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={} \
- wsum={:.3}, w={:.3}", seconds(Instant::now() - start), i,
- event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
- j, times[cursor], times[j], n_nan, max_time-min_time, wsum, w);
- */
-
- min_time = max_time;
- max_time = event.time + buckets[k + 2];
-
- //println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
- // cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i,
- // event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
- // j, times[cursor], n_nan, max_time-min_time);
-
- }
-
- /*
- let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
- println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
- cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i,
- event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
- j, times[cursor], times[j], n_nan, max_time-min_time);
- println!();
- */
-
-
- if i % 100 == 0 {
- assert!(max_time > min_time);
- if verbose {
- //let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
- println!("{:>8.2}s No. {:>5} {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2} ...",
- //cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}",
- seconds(Instant::now() - start), i,
- event.data[0], event.data[20], event.data[40], event.data[60], event.data[80], event.data[100]);
- //min_time, max_time, cursor,
- //j, times[cursor], n_nan, max_time-min_time);
- }
- }
- }
-
- assert!(truncate_events.is_none()); // for now
-
- if verbose { println!("{:>8.2} writing...", seconds(Instant::now() - start)); }
-
- // we have to read this again because I could not figure out ownership problems
- let events_csv =
- fs::OpenOptions::new()
- .read(true)
- .open(events_csv)?;
-
- let mut events_csv = csv::Reader::from_reader(events_csv);
-
- let output_csv =
- fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .open(output)?;
-
- let mut wtr = csv::Writer::from_writer(output_csv);
-
- let data_cols: Vec<i64> = {
- let mut xs = vec![0; n - 1];
- for i in 0..(n - 1) {
- //xs[i] = ((LOGSPACE[i] + LOGSPACE[i + 1]) as f64 / 2.0) as i64;
- xs[i] = (buckets[i] + buckets[i + 1]) / 2;
- }
- xs
- };
-
- {
- let headers = events_csv.byte_headers()?;
- //println!("{:?}", headers);
- for col in headers.iter() {
- wtr.write_field(col)?;
- }
- for col in data_cols.iter() {
- wtr.write_field(&format!("{}", col))?;
- }
- wtr.write_record(None::<&[u8]>)?;
- }
-
- let mut record = csv::ByteRecord::new();
-
- for event in events {
- if !events_csv.read_byte_record(&mut record)? { panic!("failed to read from events csv") }
- for meta in record.iter() {
- wtr.write_field(meta)?;
- }
- for val in event.data.iter() {
- wtr.write_field(&format!("{}", val))?;
- }
- wtr.write_record(None::<&[u8]>)?;
- }
-
-
- if verbose { println!("{:>8.2} finished.", seconds(Instant::now() - start)); }
-
- Ok(())
- }
-
- /*
- def to_tframe(version, df, trades, start):
- d = {'bid': {}, 'ask': {}}
- cursor = 0
- n = 0
- n_periods = 40
- xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos
- mask = df['version'] == version
- #my_trades = sorted(list(zip(df.loc[mask].index, df.loc[mask, 'side'], df.loc[mask, 'gid'])))
- my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid'])))
- #idx = trades.index
- idx = trades.index.values.astype(np.int64)
- amts = trades['amount']
- totals = trades['total']
- assert len(idx) == len(amts)
- assert len(idx) == len(totals)
- for tm, side, gid in my_trades:
- print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor)
- #min_time = tm + timedelta(milliseconds=xs[0])
- #max_time = tm + timedelta(milliseconds=xs[1])
- min_time = tm + xs[0]
- max_time = tm + xs[1]
- if idx[cursor] > min_time:
- print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time)
- while idx[cursor] > min_time and cursor > 0:
- cursor -= 1
- else:
- while idx[cursor] < min_time and cursor < len(idx) - 1:
- cursor += 1
- i = 1
- j = cursor
- d[side][gid] = {}
- while i < len(xs) - 1:
- wsum = 0.0
- w = 0.0
- while idx[j] < max_time:
- wsum += totals[j]
- w += amts[j]
- j += 1
- if w > 0.0:
- d[side][gid][xs[i]] = wsum / w
- else:
- d[side][gid][xs[i]] = np.nan
- i += 1
- min_time = max_time
- #max_time = tm + timedelta(milliseconds=xs[i])
- max_time = tm + xs[i]
- n += 1
- d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index'))
- d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index'))
- #yield (version, d)
- return d
- */
-
|