diff --git a/.gitignore b/.gitignore index ba1e735..9a23fdf 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ Cargo.lock var/ csv-bytes csv-bytes-manual +**/.ipynb_checkpoints/ diff --git a/notebooks/python-example-of-query-for-1h.ipynb b/notebooks/python-example-of-query-for-1h.ipynb new file mode 100644 index 0000000..6875d50 --- /dev/null +++ b/notebooks/python-example-of-query-for-1h.ipynb @@ -0,0 +1,380 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
timeamountexchpriceserver_timesideticker
015310944017008525270.0801bits6706.60010nabtc_usd
115310944017802985190.0284bits6706.60990nabtc_usd
215310944023057084720.0050btfx6707.00000nabtc_usd
315310944034556577970.0050btfx6706.70020nabtc_usd
415310944035926638720.0658btfx6705.89990nabtc_usd
\n", + "
" + ], + "text/plain": [ + " time amount exch price server_time side ticker\n", + "0 1531094401700852527 0.0801 bits 6706.6001 0 na btc_usd\n", + "1 1531094401780298519 0.0284 bits 6706.6099 0 na btc_usd\n", + "2 1531094402305708472 0.0050 btfx 6707.0000 0 na btc_usd\n", + "3 1531094403455657797 0.0050 btfx 6706.7002 0 na btc_usd\n", + "4 1531094403592663872 0.0658 btfx 6705.8999 0 na btc_usd" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.read_csv('/xfs/trades.csv', nrows=1024 * 1024)\n", + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
timeamountexchpriceserver_timesideticker
time
2018-07-09 00:00:01.700852527+00:0015310944017008525270.0801bits6706.60010nabtc_usd
2018-07-09 00:00:01.780298519+00:0015310944017802985190.0284bits6706.60990nabtc_usd
2018-07-09 00:00:02.305708472+00:0015310944023057084720.0050btfx6707.00000nabtc_usd
2018-07-09 00:00:03.455657797+00:0015310944034556577970.0050btfx6706.70020nabtc_usd
2018-07-09 00:00:03.592663872+00:0015310944035926638720.0658btfx6705.89990nabtc_usd
\n", + "
" + ], + "text/plain": [ + " time amount exch \\\n", + "time \n", + "2018-07-09 00:00:01.700852527+00:00 1531094401700852527 0.0801 bits \n", + "2018-07-09 00:00:01.780298519+00:00 1531094401780298519 0.0284 bits \n", + "2018-07-09 00:00:02.305708472+00:00 1531094402305708472 0.0050 btfx \n", + "2018-07-09 00:00:03.455657797+00:00 1531094403455657797 0.0050 btfx \n", + "2018-07-09 00:00:03.592663872+00:00 1531094403592663872 0.0658 btfx \n", + "\n", + " price server_time side ticker \n", + "time \n", + "2018-07-09 00:00:01.700852527+00:00 6706.6001 0 na btc_usd \n", + "2018-07-09 00:00:01.780298519+00:00 6706.6099 0 na btc_usd \n", + "2018-07-09 00:00:02.305708472+00:00 6707.0000 0 na btc_usd \n", + "2018-07-09 00:00:03.455657797+00:00 6706.7002 0 na btc_usd \n", + "2018-07-09 00:00:03.592663872+00:00 6705.8999 0 na btc_usd " + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.index = pd.to_datetime(df['time'], utc=True)\n", + "df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(Timestamp('2018-07-09 00:00:01.700852527+0000', tz='UTC'),\n", + " Timestamp('2018-07-11 19:27:34.453569864+0000', tz='UTC'),\n", + " Timedelta('2 days 19:27:32.752717'))" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.index[0], df.index[-1], df.index[-1] - df.index[0]" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1.000578250137079" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "start = \"2018-07-10T04:00:00Z\" # <- randomly selected start/end \n", + "end = \"2018-07-10T05:00:00Z\" # important point is, 1h span\n", + "\n", + "target_hr = (df.index >= start) & (df.index < end)\n", + "assert target_hr.sum() > 0\n", + "\n", + "of_btcusd = df['ticker'] == 'btc_usd'\n", + "assert of_btcusd.sum() > 0\n", + "\n", + "of_bmex = df['exch'] == 'bmex'\n", + "of_gdax = df['exch'] == 'gdax'\n", + "\n", + "assert of_bmex.sum() > 0\n", + "assert of_gdax.sum() > 0\n", + "\n", + "bmex_hr = df.loc[target_hr & of_bmex & of_btcusd, ['price', 'amount']]\n", + "gdax_hr = df.loc[target_hr & of_gdax & of_btcusd, ['price', 'amount']]\n", + "\n", + "bmex_size_wt_price = (\n", + " (bmex_hr['price'] * bmex_hr['amount']).sum()\n", + " / bmex_hr['amount'].sum()\n", + ")\n", + "\n", + "gdax_size_wt_price = (\n", + " (gdax_hr['price'] * gdax_hr['amount']).sum()\n", + " / gdax_hr['amount'].sum()\n", + ")\n", + "\n", + "ratio = bmex_size_wt_price / gdax_size_wt_price # <- final answer (for start,end hr at least)\n", + "ratio" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "((2714, 2), (1305, 2))" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bmex_hr.shape, gdax_hr.shape" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(6642.42451802828, 6638.585754905495)" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bmex_size_wt_price, gdax_size_wt_price" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/postgres-creat-trades-table.sql b/postgres-creat-trades-table.sql new file mode 100644 index 0000000..284e66d --- /dev/null +++ b/postgres-creat-trades-table.sql @@ -0,0 +1,121 @@ +CREATE SEQUENCE public.trades_id_seq + INCREMENT 1 + MINVALUE 1 + MAXVALUE 9223372036854775807 + START 1 + CACHE 1; +ALTER TABLE public.trades_id_seq + OWNER TO jstrong; + +-- CREATE SEQUENCE public.exchangess_id_seq +-- INCREMENT 1 +-- MINVALUE 1 +-- MAXVALUE 255 +-- START 1 +-- CACHE 1; +-- ALTER TABLE public.exchangess_id_seq +-- OWNER TO jstrong; +-- +-- CREATE SEQUENCE public.currencies_id_seq +-- INCREMENT 1 +-- MINVALUE 1 +-- MAXVALUE 255 +-- START 1 +-- CACHE 1; +-- ALTER TABLE public.currencies_id_seq +-- OWNER TO jstrong; + + +CREATE TABLE public.exchanges +( + + id smallint NOT NULL, + symbol character varying(4) NOT NULL, + + CONSTRAINT exchanges_pkey PRIMARY KEY (id) + +) WITH ( OIDS=FALSE ); + +CREATE TABLE public.currencies +( + id smallint NOT NULL, + symbol character varying(6) NOT NULL, + + CONSTRAINT currencies_pkey PRIMARY KEY (id) + +) WITH ( OIDS=FALSE ); + +CREATE TABLE public.trades +( + id integer NOT NULL DEFAULT nextval('trades_id_seq'::regclass), + "time" timestamp with time zone NOT NULL, + exch smallint NOT NULL, + base smallint NOT NULL, + quote smallint NOT NULL, + amount double precision NOT NULL, + price double precision NOT NULL, + side smallint NULL, -- side has no fk ... bid=1, ask=2 + server_time timestamp with time zone NULL, + CONSTRAINT trades_pkey PRIMARY KEY (id), + + CONSTRAINT exch_fk FOREIGN KEY (exch) + REFERENCES public.exchanges (id) MATCH SIMPLE + ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED, + + CONSTRAINT base_fk FOREIGN KEY (base) + REFERENCES public.currencies (id) MATCH SIMPLE + ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED, + + CONSTRAINT quote_fk FOREIGN KEY (quote) + REFERENCES public.currencies (id) MATCH SIMPLE + ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED +) +WITH ( + OIDS=FALSE +); + +ALTER TABLE public.trades + OWNER TO jstrong; + +CREATE INDEX trades_time_abcdefg + ON public.trades + USING btree + ("time"); + +CREATE INDEX trades_base_f6b2eeda + ON public.trades + USING btree + (base); + +CREATE INDEX trades_quote_0d5895fc + ON public.trades + USING btree + (quote); + +CREATE INDEX trades_exchange_5d5c6971 + ON public.trades + USING btree + (exch); + +-- CREATE INDEX trades_side_23985593 +-- ON public.trades +-- USING btree +-- (side); + +-- fill in exchanges/currencies + +insert into public.exchanges (id, symbol) values(1, 'plnx'); +insert into public.exchanges (id, symbol) values(2, 'krkn'); +insert into public.exchanges (id, symbol) values(3, 'gdax'); +insert into public.exchanges (id, symbol) values(5, 'bits'); +insert into public.exchanges (id, symbol) values(6, 'bmex'); +insert into public.exchanges (id, symbol) values(7, 'btfx'); +insert into public.exchanges (id, symbol) values(8, 'bnce'); +insert into public.exchanges (id, symbol) values(9, 'okex'); +insert into public.currencies (id, symbol) values(1, 'btc'); +insert into public.currencies (id, symbol) values(2, 'eth'); +insert into public.currencies (id, symbol) values(3, 'xmr'); +insert into public.currencies (id, symbol) values(5, 'ltc'); +insert into public.currencies (id, symbol) values(15, 'etc'); +insert into public.currencies (id, symbol) values(4, 'usdt'); +insert into public.currencies (id, symbol) values(100, 'usd'); diff --git a/src/munge.rs b/src/munge.rs index 77afccf..3f3c34f 100644 --- a/src/munge.rs +++ b/src/munge.rs @@ -13,7 +13,7 @@ use pretty_toa::ThousandsSep; use structopt::StructOpt; use serde::{Serialize, Deserialize}; use slog::Drain; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Utc, NaiveDateTime}; use markets::crypto::{Exchange, Ticker, Side}; macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ @@ -21,6 +21,7 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ std::process::exit(1); }}} +const PROGRESS_EVERY: usize = 1024 * 1024 * 4; #[structopt(rename_all="kebab-case")] @@ -28,6 +29,9 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ enum Opt { /// Filter trades-csv by start,end range and save subset to output-path + /// + /// Note: csv assumed to be pre-sorted by time (ascending) + /// Range { /// Path to CSV file with trades data @@ -41,12 +45,35 @@ enum Opt { output_path: PathBuf, /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") + #[structopt(short = "s", long = "start")] start: DateTime, /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") + #[structopt(short = "e", long = "end")] end: DateTime, }, + + /// Convert the original csv info a format ready to be ingested via COPY + /// + /// 1. server_time of 0 -> NULL + /// 2. side of "na" -> NULL + PrepPostgres { + + /// Path to CSV file with trades data + #[structopt(short = "f", long = "trades-csv")] + #[structopt(parse(from_os_str))] + trades_csv: PathBuf, + + /// Where to save the query results (CSV output) + #[structopt(short = "o", long = "output-path")] + #[structopt(parse(from_os_str))] + output_path: PathBuf, + }, + + ListCodes, + + } #[derive(Deserialize)] @@ -60,6 +87,37 @@ struct Trade { pub amount: f64, } +#[derive(Deserialize, Debug)] +struct PgBuilder<'a> { + pub time: u64, + pub exch: Exchange, + pub ticker: Ticker, + pub side: Option<&'a str>, + pub price: f64, + pub amount: f64, + pub server_time: u64, +} + +#[derive(Serialize, Debug)] +struct PgRow { + pub time: DateTime, + pub exch: u8, + pub base: u8, + pub quote: u8, + pub amount: f64, + pub price: f64, + pub side: Option, + pub server_time: Option>, +} + +fn nanos_to_utc(nanos: u64) -> DateTime { + const ONE_SECOND: u64 = 1_000_000_000; + let sec: i64 = (nanos / ONE_SECOND) as i64; + let nsec: u32 = (nanos % ONE_SECOND) as u32; + let naive = NaiveDateTime::from_timestamp(sec, nsec); + DateTime::from_utc(naive, Utc) +} + fn per_sec(n: usize, span: Duration) -> f64 { if n == 0 || span < Duration::from_micros(1) { return 0.0 } let s: f64 = span.as_nanos() as f64 / 1e9f64; @@ -76,6 +134,90 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { let mut n = 0; match opt { + Opt::PrepPostgres { trades_csv, output_path } => { + let logger = logger.new(o!("cmd" => "prep-postgres")); + + info!(logger, "beginning prep-postgres cmd"; + "trades_csv" => %trades_csv.display(), + "output_path" => %output_path.display(), + ); + + if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) } + + info!(logger, "opening trades_csv file"); + + let rdr = fs::File::open(&trades_csv) + .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?; + + let rdr = io::BufReader::new(rdr); + + let mut rdr = csv::Reader::from_reader(rdr); + + info!(logger, "opening output file for writing"); + + let wtr = fs::File::create(&output_path) + .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; + + let wtr = io::BufWriter::new(wtr); + + let mut wtr = csv::Writer::from_writer(wtr); + + let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + let mut row = csv::StringRecord::new(); + + //wtr.write_record(&headers).map_err(|e| format!("writing headers row failed: {}", e))?; + + while rdr.read_record(&mut row) + .map_err(|e| { + format!("reading row {} failed: {}", (n+1).thousands_sep(), e) + })? + { + let bldr: PgBuilder = row.deserialize(Some(&headers)).map_err(|e| format!("deser failed: {}", e))?; + let PgBuilder { time, exch, ticker, side, price, amount, server_time } = bldr; + let time = nanos_to_utc(time); + let exch = u8::from(exch); + let base = u8::from(ticker.base); + let quote = u8::from(ticker.quote); + let side: Option = match side { + Some("bid") => Some(1), + Some("ask") => Some(2), + _ => None, + }; + let server_time = match server_time { + 0 => None, + x => Some(nanos_to_utc(x)), + }; + let pg_row = PgRow { time, exch, base, quote, amount, price, side, server_time }; + + wtr.serialize(&pg_row).map_err(|e| format!("serializing PgRow to csv failed: {}", e))?; + + n += 1; + if n % PROGRESS_EVERY == 0 { + info!(logger, "parsing/writing csv rows"; "n" => %n.thousands_sep()); + } + } + } + + Opt::ListCodes => { + println!("side: {:?} {}", Side::Bid, u8::from(Side::Bid)); + println!("side: {:?} {}", Side::Ask, u8::from(Side::Ask)); + println!(); + + for exch in + &[e!(plnx), e!(krkn), e!(gdax), e!(bits), + e!(bmex), e!(btfx), e!(bnce), e!(okex), ] + { + println!("insert into exchanges (id, symbol) values({}, \"{}\");", u8::from(*exch), exch.as_str()); + } + + for currency in + &[c!(btc), c!(eth), c!(xmr), c!(ltc), c!(etc), c!(usdt), c!(usd)] + { + println!("insert into currencies (id, symbol) values({}, \"{}\");", u8::from(*currency), currency.as_str()); + } + + } + Opt::Range { trades_csv, output_path, start, end } => { let logger = logger.new(o!("cmd" => "range")); @@ -97,6 +239,8 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { let mut rdr = csv::Reader::from_reader(rdr); + info!(logger, "opening output file for writing"); + let wtr = fs::File::create(&output_path) .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; @@ -105,11 +249,55 @@ fn run(start: Instant, logger: &slog::Logger) -> Result { let mut wtr = csv::Writer::from_writer(wtr); let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); + + let time_col: usize = headers.iter().position(|x| x == b"time").ok_or_else(|| { + String::from("no column in headers named 'time'") + })?; + let mut row = csv::ByteRecord::new(); let start_nanos = nanos(start); let end_nanos = nanos(end); + let mut n_written = 0; + let mut time: u64 = 0; + + info!(logger, "writing headers row to output file"); + wtr.write_byte_record(&headers).map_err(|e| format!("writing csv headers row failed: {}", e))?; + + info!(logger, "entering csv parsing loop"); + + 'a: while rdr.read_byte_record(&mut row) + .map_err(|e| { + format!("reading row {} failed: {}", (n+1).thousands_sep(), e) + })? + { + let time_bytes = row.get(time_col).ok_or_else(|| "time column not present for row")?; + + time = atoi::atoi(time_bytes).ok_or_else(|| { + format!("failed to parse 'time' col value '{}' as integer", std::str::from_utf8(time_bytes).unwrap_or("utf8err")) + })?; + + n += 1; + + if n % PROGRESS_EVERY == 0 { + info!(logger, "parsing csv rows"; "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep()); + } + + if time < start_nanos { continue 'a } + + if time > end_nanos { break 'a } + + wtr.write_byte_record(&row).map_err(|e| format!("writing parsed csv row to output file failed: {}", e))?; + + n_written += 1; + } + + info!(logger, "broke out of read csv loop"; "time" => time, "end_nanos" => end_nanos, "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep()); + + info!(logger, "dropping wtr"); + + drop(wtr); } }