Browse Source

examples from fast data pipelines preface

master
Jonathan Strong 8 months ago
parent
commit
08ae4312c5
3 changed files with 6486 additions and 0 deletions
  1. +6009
    -0
      python-68min-analysis-stdout.txt
  2. +55
    -0
      python-68min-analysis-the-function.py
  3. +422
    -0
      time_explorer_first_pass.rs

+ 6009
- 0
python-68min-analysis-stdout.txt
File diff suppressed because it is too large
View File


+ 55
- 0
python-68min-analysis-the-function.py View File

@@ -0,0 +1,55 @@

def to_tframe(version, df, trades, start):
d = {'bid': {}, 'ask': {}}
cursor = 0
n = 0
n_periods = 40
xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos
mask = df['version'] == version
my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid'])))
idx = trades.index.values.astype(np.int64)
amts = trades['amount']
totals = trades['total']
assert len(idx) == len(amts)
assert len(idx) == len(totals)
for tm, side, gid in my_trades:
print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor)
min_time = tm + xs[0]
max_time = tm + xs[1]
if idx[cursor] > min_time:
print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time)
while idx[cursor] > min_time and cursor > 0:
cursor -= 1
else:
while idx[cursor] < min_time and cursor < len(idx) - 1:
cursor += 1
i = 1
j = cursor
d[side][gid] = {}
while i < len(xs) - 1:
wsum = 0.0
w = 0.0
while idx[j] < max_time:
wsum += totals[j]
w += amts[j]
j += 1
if w > 0.0:
d[side][gid][xs[i]] = wsum / w
else:
d[side][gid][xs[i]] = np.nan
i += 1
min_time = max_time
max_time = tm + xs[i]
n += 1
d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index'))
d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index'))
return d


# code used to perform `to_tframe` on all versions:

start = time.time()
tresults = {}
for version in version_counts.index:
tresults[version] = to_tframe(version, df, trades, start)
print '{} finished'.format(time.time() - start)

+ 422
- 0
time_explorer_first_pass.rs View File

@@ -0,0 +1,422 @@
#![allow(unused_imports)]

#[macro_use] extern crate serde_derive;

//extern crate decimal;
extern crate clap;
extern crate serde;
extern crate csv;
extern crate itertools_num;

extern crate map;

use std::str::FromStr;
use std::time::{Instant, Duration};
use std::{fs, io};
use std::str::from_utf8;
use std::error::Error;
use itertools_num::linspace;
use map::Map;

const N: usize = 128;
const LOGSPACE: [i64; 128] =
[-2134300000000, -1854700000000, -1611800000000, -1400600000000,
-1217200000000, -1057700000000, -919200000000, -798800000000,
-694100000000, -603200000000, -524200000000, -455500000000,
-395800000000, -344000000000, -298900000000, -259700000000,
-225700000000, -196100000000, -170400000000, -148100000000,
-128700000000, -111800000000, -97200000000, -84400000000,
-73400000000, -63800000000, -55400000000, -48100000000,
-41800000000, -36300000000, -31600000000, -27400000000,
-23800000000, -20700000000, -18000000000, -15600000000,
-13600000000, -11800000000, -10200000000, -8900000000,
-7700000000, -6700000000, -5800000000, -5000000000,
-4400000000, -3800000000, -3300000000, -2900000000,
-2500000000, -2100000000, -1900000000, -1600000000,
-1400000000, -1200000000, -1000000000, -900000000,
-800000000, -700000000, -600000000, -500000000,
-400000000, -300000000, -200000000, -100000000,
100000000, 200000000, 300000000, 400000000,
500000000, 600000000, 700000000, 800000000,
900000000, 1000000000, 1200000000, 1400000000,
1600000000, 1900000000, 2100000000, 2500000000,
2900000000, 3300000000, 3800000000, 4400000000,
5000000000, 5800000000, 6700000000, 7700000000,
8900000000, 10200000000, 11800000000, 13600000000,
15600000000, 18000000000, 20700000000, 23800000000,
27400000000, 31600000000, 36300000000, 41800000000,
48100000000, 55400000000, 63800000000, 73400000000,
84400000000, 97200000000, 111800000000, 128700000000,
148100000000, 170400000000, 196100000000, 225700000000,
259700000000, 298900000000, 344000000000, 395800000000,
455500000000, 524200000000, 603200000000, 694100000000,
798800000000, 919200000000, 1057700000000, 1217200000000,
1400600000000, 1611800000000, 1854700000000, 2134300000000];

#[derive(Deserialize)]
struct Trade { //<'a> {
pub time: i64,
//pub exch: &'a str,
pub price: f64,
pub amount: f64,
}

/// Use this to deserialize just the time column on the first pass through
/// the events file.
#[derive(Deserialize)]
struct EventTime {
pub time: i64,
}

struct Event {
pub time: i64,
//pub meta: Vec<&'a str>,
//pub meta: &'a csv::StringRecord,
pub data: Vec<f64>,
}

pub fn seconds(d: Duration) -> f64 {
d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64)
}

fn main() -> Result<(), Box<Error>> {
let start = Instant::now();
let args: clap::ArgMatches = clap::App::new("time-explorer")
.version("0.1")
.arg(clap::Arg::with_name("trades")
.long("trades-csv")
.short("t")
.help("Path of csv with time (integer nanoseconds timestamp), \
price (f64), and amount (f64) columns.")
.takes_value(true)
.required(true))
.arg(clap::Arg::with_name("events")
.long("events-csv")
.short("e")
.help("Path of csv file with a time (integer nanoseconds timestamp) as column 0, \
along with any other metadata columns that will be included in results")
.takes_value(true)
.required(true))
.arg(clap::Arg::with_name("output")
.long("output-file")
.short("o")
.help("Path to save results csv to")
.takes_value(true)
.required(true))
.arg(clap::Arg::with_name("verbose")
.long("verbose")
.short("v"))
.arg(clap::Arg::with_name("n-periods")
.long("n-periods")
.short("n")
.help("Controls how many time buckets are evaluated")
.takes_value(true)
.default_value("50"))
.get_matches();

//assert_eq!(N, LOGSPACE.len());

let verbose = args.is_present("verbose");

if verbose { println!("{:>8.2}s reading...", seconds(Instant::now() - start)); }

let trades_csv = args.value_of("trades").unwrap();
let events_csv = args.value_of("events").unwrap();
let output = args.value_of("output").unwrap();
let n: &str = args.value_of("n-periods").unwrap();
let n: usize = usize::from_str(n)?;
//println!("{:>8.2s} opening files...", seconds(Instant::now() - start));
let trades_csv =
fs::OpenOptions::new()
.read(true)
.open(trades_csv)?;

let mut trades: Vec<Trade> =
csv::Reader::from_reader(trades_csv)
.deserialize()
.map(|x| x.unwrap())
.collect();

//println!("{:>8.2} sorting trades...", seconds(Instant::now() - start));
trades.sort_by_key(|k| k.time);

// for i in 1..(trades.len() - 1) {
// assert!(trades[i].time >= trades[i-1].time);
// }

//println!("{:>8.2}s arranging in columnar format...", seconds(Instant::now() - start));

let mut times: Vec<i64> = Vec::with_capacity(trades.len());
//let mut prices: Vec<f64> = Vec::with_capacity(trades.len());
let mut amounts: Vec<f64> = Vec::with_capacity(trades.len());
let mut totals: Vec<f64> = Vec::with_capacity(trades.len());

for Trade { time, price, amount } in trades {
times.push(time);
totals.push(price * amount);
amounts.push(amount);
}

// let mut headers = csv::ByteRecord::new();
// rdr.read_byte_record(&mut headers)?;
// assert!(&headers.as_slice()[headers.range(0).unwrap()] == b"time");
// let mut record = csv::ByteRecord::new();

//let headers = rdr.headers()?;
//assert_eq!(headers.get(0), Some("time"));

let mut events: Vec<Event> = {
let events_csv =
fs::OpenOptions::new()
.read(true)
.open(events_csv)?;

csv::Reader::from_reader(events_csv)
.deserialize()
.map(|t: Result<EventTime, _>| {
let EventTime { time } = t.unwrap();
//let data = [0.0; N - 1];
let data = vec![0.0; n - 1];
Event { time, data }
}).collect()
};
//while rdr.read_byte_record(&mut record)? {

/*
for record in rdr.records() {
let meta = record?;
//let time = i64::from_str(from_utf8(&record[0])?)?;
let time = i64::from_str(&meta[0])?;
// let meta =
// record.iter()
// .skip(1)
// //.map(|x| x.deserialize().unwrap())
// .collect();
let data = [0.0; N - 1];
events.push(Event { time, data });
}
*/

assert!(!events.is_empty());

// for i in 1..(events.len() - 1) {
// assert!(events[i].time >= events[i-1].time);
// }

events.sort_by_key(|k| k.time);
//println!("events[0].time={}, LOGSPACE[0]={}, events[0].time + LOGSPACE[0]={}",
// events[0].time, LOGSPACE[0], events[0].time + LOGSPACE[0]);
let mut cursor: usize = 0;
let mut truncate_events = None;

let buckets: Vec<i64> =
linspace(LOGSPACE[0] as f64, LOGSPACE[N - 1] as f64, n)
.map(|x| x as i64)
.collect();
if verbose { println!("{:>8.2}s calculating...", seconds(Instant::now() - start)); }

'a: for (i, event) in events.iter_mut().enumerate() {
//let mut min_time: i64 = event.time + LOGSPACE[0];
//let mut max_time: i64 = event.time + LOGSPACE[1];

let mut min_time: i64 = event.time + buckets[0];
let mut max_time: i64 = event.time + buckets[1];


/*
let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i,
event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
"_", times[cursor], "_", n_nan, max_time-min_time);

println!();
*/

// find the beginning if there are gaps
'b: while times[cursor] < min_time {
if cursor >= times.len() - 1 {
truncate_events = Some(i);
break 'a
} else {
cursor += 1
}
}

let mut j: usize = cursor;

'c: for k in 0..(n - 2) {
let mut wsum: f64 = 0.0;
let mut w: f64 = 0.0;

'd: while j < times.len() - 1 && times[j] < max_time {
wsum += totals[j];
w += amounts[j];
j += 1;
}

if w > 0.0 {
event.data[k] = wsum / w;
} else {
event.data[k] = ::std::f64::NAN;
}

/*
let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={} \
wsum={:.3}, w={:.3}", seconds(Instant::now() - start), i,
event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
j, times[cursor], times[j], n_nan, max_time-min_time, wsum, w);
*/

min_time = max_time;
max_time = event.time + buckets[k + 2];

//println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
// cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i,
// event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
// j, times[cursor], n_nan, max_time-min_time);

}

/*
let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
println!("{:>8.2} event {:>5} {:>8.2}, {:>8.2}, {:>8.2}, ... min_time={}, max_time={}, \
cursor={}, j={}, times[cursor]={}, times[j]={}, n_nan={}, max_time-min_time={}", seconds(Instant::now() - start), i,
event.data[0], event.data[10], event.data[100], min_time, max_time, cursor,
j, times[cursor], times[j], n_nan, max_time-min_time);
println!();
*/


if i % 100 == 0 {
assert!(max_time > min_time);
if verbose {
//let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
println!("{:>8.2}s No. {:>5} {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2}, {:>12.2} ...",
//cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}",
seconds(Instant::now() - start), i,
event.data[0], event.data[20], event.data[40], event.data[60], event.data[80], event.data[100]);
//min_time, max_time, cursor,
//j, times[cursor], n_nan, max_time-min_time);
}
}
}

assert!(truncate_events.is_none()); // for now

if verbose { println!("{:>8.2} writing...", seconds(Instant::now() - start)); }

// we have to read this again because I could not figure out ownership problems
let events_csv =
fs::OpenOptions::new()
.read(true)
.open(events_csv)?;

let mut events_csv = csv::Reader::from_reader(events_csv);

let output_csv =
fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(output)?;

let mut wtr = csv::Writer::from_writer(output_csv);

let data_cols: Vec<i64> = {
let mut xs = vec![0; n - 1];
for i in 0..(n - 1) {
//xs[i] = ((LOGSPACE[i] + LOGSPACE[i + 1]) as f64 / 2.0) as i64;
xs[i] = (buckets[i] + buckets[i + 1]) / 2;
}
xs
};

{
let headers = events_csv.byte_headers()?;
//println!("{:?}", headers);
for col in headers.iter() {
wtr.write_field(col)?;
}
for col in data_cols.iter() {
wtr.write_field(&format!("{}", col))?;
}
wtr.write_record(None::<&[u8]>)?;
}

let mut record = csv::ByteRecord::new();

for event in events {
if !events_csv.read_byte_record(&mut record)? { panic!("failed to read from events csv") }
for meta in record.iter() {
wtr.write_field(meta)?;
}
for val in event.data.iter() {
wtr.write_field(&format!("{}", val))?;
}
wtr.write_record(None::<&[u8]>)?;
}


if verbose { println!("{:>8.2} finished.", seconds(Instant::now() - start)); }

Ok(())
}

/*
def to_tframe(version, df, trades, start):
d = {'bid': {}, 'ask': {}}
cursor = 0
n = 0
n_periods = 40
xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos
mask = df['version'] == version
#my_trades = sorted(list(zip(df.loc[mask].index, df.loc[mask, 'side'], df.loc[mask, 'gid'])))
my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid'])))
#idx = trades.index
idx = trades.index.values.astype(np.int64)
amts = trades['amount']
totals = trades['total']
assert len(idx) == len(amts)
assert len(idx) == len(totals)
for tm, side, gid in my_trades:
print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor)
#min_time = tm + timedelta(milliseconds=xs[0])
#max_time = tm + timedelta(milliseconds=xs[1])
min_time = tm + xs[0]
max_time = tm + xs[1]
if idx[cursor] > min_time:
print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time)
while idx[cursor] > min_time and cursor > 0:
cursor -= 1
else:
while idx[cursor] < min_time and cursor < len(idx) - 1:
cursor += 1
i = 1
j = cursor
d[side][gid] = {}
while i < len(xs) - 1:
wsum = 0.0
w = 0.0
while idx[j] < max_time:
wsum += totals[j]
w += amts[j]
j += 1
if w > 0.0:
d[side][gid][xs[i]] = wsum / w
else:
d[side][gid][xs[i]] = np.nan
i += 1
min_time = max_time
#max_time = tm + timedelta(milliseconds=xs[i])
max_time = tm + xs[i]
n += 1
d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index'))
d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index'))
#yield (version, d)
return d
*/


Loading…
Cancel
Save