#![allow(unused)] #[macro_use] extern crate slog; #[macro_use] extern crate markets; use std::io::{self, prelude::*}; use std::fs; use std::path::{Path, PathBuf}; use std::time::*; use pretty_toa::ThousandsSep; use structopt::StructOpt; use serde::{Serialize, Deserialize}; use slog::Drain; use chrono::{DateTime, Utc, NaiveDateTime}; use markets::crypto::{Exchange, Ticker, Side, Currency}; use pipelines::encoding; macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ eprintln!($fmt, $($args)*); std::process::exit(1); }}} const PROGRESS_EVERY: usize = 1024 * 1024 * 4; #[structopt(rename_all="kebab-case")] #[derive(Debug, StructOpt)] enum Opt { /// Filter trades-csv by start,end range and save subset to output-path /// /// Note: csv assumed to be pre-sorted by time (ascending) /// Range { /// Path to CSV file with trades data #[structopt(short = "f", long = "trades-csv")] #[structopt(parse(from_os_str))] trades_csv: PathBuf, /// Where to save the query results (CSV output) #[structopt(short = "o", long = "output-path")] #[structopt(parse(from_os_str))] output_path: PathBuf, /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") #[structopt(short = "s", long = "start")] start: DateTime, /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") #[structopt(short = "e", long = "end")] end: DateTime, }, /// Convert the original csv info a format ready to be ingested via COPY /// /// 1. server_time of 0 -> NULL /// 2. side of "na" -> NULL PrepPostgres { /// Path to CSV file with trades data #[structopt(short = "f", long = "trades-csv")] #[structopt(parse(from_os_str))] trades_csv: PathBuf, /// Where to save the query results (CSV output) #[structopt(short = "o", long = "output-path")] #[structopt(parse(from_os_str))] output_path: PathBuf, }, ListCodes, Binarize { /// Path to CSV file with trades data #[structopt(short = "f", long = "trades-csv")] #[structopt(parse(from_os_str))] trades_csv: PathBuf, /// Where to save the binary-serialized data #[structopt(short = "o", long = "output-path")] #[structopt(parse(from_os_str))] output_path: PathBuf, }, CountRows { /// Path to file with binary trades data #[structopt(short = "f", long = "input-file")] #[structopt(parse(from_os_str))] input_path: PathBuf, }, } #[derive(Deserialize)] struct Trade { /// Unix nanoseconds pub time: u64, pub exch: Exchange, pub ticker: Ticker, //pub side: Option, pub price: f64, pub amount: f64, } #[derive(Deserialize, Debug)] struct PgBuilder<'a> { pub time: u64, pub exch: Exchange, pub ticker: Ticker, pub side: Option<&'a str>, pub price: f64, pub amount: f64, pub server_time: u64, } #[derive(Serialize, Debug)] struct PgRow { pub time: DateTime, pub exch: u8, pub base: u8, pub quote: u8, pub amount: f64, pub price: f64, pub side: Option, pub server_time: Option>, } fn nanos_to_utc(nanos: u64) -> DateTime { const ONE_SECOND: u64 = 1_000_000_000; let sec: i64 = (nanos / ONE_SECOND) as i64; let nsec: u32 = (nanos % ONE_SECOND) as u32; let naive = NaiveDateTime::from_timestamp(sec, nsec); DateTime::from_utc(naive, Utc) } fn per_sec(n: usize, span: Duration) -> f64 { if n == 0 || span < Duration::from_micros(1) { return 0.0 } let s: f64 = span.as_nanos() as f64 / 1e9f64; n as f64 / s } fn nanos(utc: DateTime) -> u64 { (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64) } fn run(start: Instant, logger: &slog::Logger) -> Result { let opt = Opt::from_args(); let mut n = 0; match opt { Opt::CountRows { input_path } => { let logger = logger.new(o!("cmd" => "count-rows")); info!(logger, "beginning to count"; "input_path" => %input_path.display(), ); if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) } let input_file = fs::OpenOptions::new() .read(true) .open(input_path) .map_err(|e| e.to_string())?; let file_length = input_file.metadata().unwrap().len(); if file_length % encoding::SERIALIZED_SIZE as u64 != 0 { return Err(format!("file length is not a multiple of record size: {}", file_length)) } let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE; info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep()); let data: memmap::Mmap = unsafe { memmap::Mmap::map(&input_file) .map_err(|e| { format!("creating Mmap failed: {}", e) })? }; let mut n_gdax = 0; let mut n_bmex = 0; for i in 0..n_records { let j = i * encoding::SERIALIZED_SIZE; let k = j + encoding::SERIALIZED_SIZE; let packed = encoding::PackedTradeData::new(&data[j..k]); n_gdax += (packed.exch().unwrap() == e!(gdax)) as usize; n_bmex += (packed.exch().unwrap() == e!(bmex)) as usize; n += 1; } info!(logger, "finished reading flle"; "n gdax" => n_gdax.thousands_sep(), "n bmex" => n_bmex.thousands_sep(), ); } Opt::Binarize { trades_csv, output_path } => { let logger = logger.new(o!("cmd" => "binarize")); info!(logger, "beginning binarize"; "trades_csv" => %trades_csv.display(), "output_path" => %output_path.display(), ); if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) } info!(logger, "opening trades_csv file"); let rdr = fs::File::open(&trades_csv) .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?; let rdr = io::BufReader::new(rdr); let mut rdr = csv::Reader::from_reader(rdr); info!(logger, "opening output file for writing"); let wtr = fs::File::create(&output_path) .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; let mut wtr = io::BufWriter::new(wtr); let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); let mut row = csv::ByteRecord::new(); let mut buf = vec![0u8; encoding::SERIALIZED_SIZE]; let mut n = 0; let mut n_written = 0; let mut n_bytes_written = 0; let mut n_bytes_read = headers.as_slice().len() + headers.len() + 1; while rdr.read_byte_record(&mut row) .map_err(|e| { format!("reading row {} failed: {}", (n+1).thousands_sep(), e) })? { let trade: encoding::CsvTrade = row.deserialize(Some(&headers)).map_err(|e| e.to_string())?; n += 1; n_bytes_read += row.as_slice().len() + row.len() + 1; encoding::serialize(&mut buf[..], &trade); let bytes_written = wtr.write(&buf[..]).map_err(|e| e.to_string())?; assert_eq!(bytes_written, encoding::SERIALIZED_SIZE); n_written += 1; n_bytes_written += bytes_written; if n % PROGRESS_EVERY == 0 { info!(logger, "binarizing csv"; "elapsed" => ?(Instant::now() - start), "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep(), "mb read" => (n_bytes_read as f64 / 1024.0 / 1024.0), "mb written" => (n_bytes_written as f64 / 1024.0 / 1024.0), ); } } info!(logger, "finished reading/converting csv"); assert_eq!(n_bytes_written % encoding::SERIALIZED_SIZE, 0); } Opt::PrepPostgres { trades_csv, output_path } => { let logger = logger.new(o!("cmd" => "prep-postgres")); info!(logger, "beginning prep-postgres cmd"; "trades_csv" => %trades_csv.display(), "output_path" => %output_path.display(), ); if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) } info!(logger, "opening trades_csv file"); let rdr = fs::File::open(&trades_csv) .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?; let rdr = io::BufReader::new(rdr); let mut rdr = csv::Reader::from_reader(rdr); info!(logger, "opening output file for writing"); let wtr = fs::File::create(&output_path) .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; let wtr = io::BufWriter::new(wtr); let mut wtr = csv::Writer::from_writer(wtr); let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); let mut row = csv::StringRecord::new(); //wtr.write_record(&headers).map_err(|e| format!("writing headers row failed: {}", e))?; while rdr.read_record(&mut row) .map_err(|e| { format!("reading row {} failed: {}", (n+1).thousands_sep(), e) })? { let bldr: PgBuilder = row.deserialize(Some(&headers)).map_err(|e| format!("deser failed: {}", e))?; let PgBuilder { time, exch, ticker, side, price, amount, server_time } = bldr; let time = nanos_to_utc(time); let exch = u8::from(exch); let base = u8::from(ticker.base); let quote = u8::from(ticker.quote); let side: Option = match side { Some("bid") => Some(1), Some("ask") => Some(2), _ => None, }; let server_time = match server_time { 0 => None, x => Some(nanos_to_utc(x)), }; let pg_row = PgRow { time, exch, base, quote, amount, price, side, server_time }; wtr.serialize(&pg_row).map_err(|e| format!("serializing PgRow to csv failed: {}", e))?; n += 1; if n % PROGRESS_EVERY == 0 { info!(logger, "parsing/writing csv rows"; "n" => %n.thousands_sep()); } } } Opt::ListCodes => { println!("side: {:?} {}", Side::Bid, u8::from(Side::Bid)); println!("side: {:?} {}", Side::Ask, u8::from(Side::Ask)); println!(); for exch in Exchange::all() { println!("INSERT INTO exchanges (id, symbol) VALUES ({}, '{}');", u8::from(exch), exch.as_str()); } for currency in Currency::all() { println!("INSERT INTO currencies (id, symbol) VALUES ({}, '{}');", u8::from(currency), currency.as_str()); } } Opt::Range { trades_csv, output_path, start, end } => { let logger = logger.new(o!("cmd" => "range")); info!(logger, "beginning range cmd"; "trades_csv" => %trades_csv.display(), "output_path" => %output_path.display(), "start" => %start, "end" => %end, ); if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) } info!(logger, "opening trades_csv file"); let rdr = fs::File::open(&trades_csv) .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?; let rdr = io::BufReader::new(rdr); let mut rdr = csv::Reader::from_reader(rdr); info!(logger, "opening output file for writing"); let wtr = fs::File::create(&output_path) .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; let wtr = io::BufWriter::new(wtr); let mut wtr = csv::Writer::from_writer(wtr); let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); let time_col: usize = headers.iter().position(|x| x == b"time").ok_or_else(|| { String::from("no column in headers named 'time'") })?; let mut row = csv::ByteRecord::new(); let start_nanos = nanos(start); let end_nanos = nanos(end); let mut n_written = 0; let mut time: u64 = 0; info!(logger, "writing headers row to output file"); wtr.write_byte_record(&headers).map_err(|e| format!("writing csv headers row failed: {}", e))?; info!(logger, "entering csv parsing loop"); 'a: while rdr.read_byte_record(&mut row) .map_err(|e| { format!("reading row {} failed: {}", (n+1).thousands_sep(), e) })? { let time_bytes = row.get(time_col).ok_or_else(|| "time column not present for row")?; time = atoi::atoi(time_bytes).ok_or_else(|| { format!("failed to parse 'time' col value '{}' as integer", std::str::from_utf8(time_bytes).unwrap_or("utf8err")) })?; n += 1; if n % PROGRESS_EVERY == 0 { info!(logger, "parsing csv rows"; "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep()); } if time < start_nanos { continue 'a } if time > end_nanos { break 'a } wtr.write_byte_record(&row).map_err(|e| format!("writing parsed csv row to output file failed: {}", e))?; n_written += 1; } info!(logger, "broke out of read csv loop"; "time" => time, "end_nanos" => end_nanos, "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep()); info!(logger, "dropping wtr"); drop(wtr); } } Ok(n) } fn main() { let start = Instant::now(); let decorator = slog_term::TermDecorator::new().stdout().force_color().build(); let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse(); let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse(); let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!())); match run(start, &logger) { Ok(n) => { let took = Instant::now() - start; info!(logger, "finished in {:?}", took; "n rows" => %n.thousands_sep(), "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(), ); } Err(e) => { crit!(logger, "run failed: {:?}", e); eprintln!("\n\nError: {}", e); std::thread::sleep(Duration::from_millis(100)); std::process::exit(1); } } }