Browse Source

postgres baseline prep

tags/v0.2.0
Jonathan Strong 4 years ago
parent
commit
c8f1cf2b85
4 changed files with 691 additions and 1 deletions
  1. +1
    -0
      .gitignore
  2. +380
    -0
      notebooks/python-example-of-query-for-1h.ipynb
  3. +121
    -0
      postgres-creat-trades-table.sql
  4. +189
    -1
      src/munge.rs

+ 1
- 0
.gitignore View File

@@ -4,3 +4,4 @@ Cargo.lock
var/ var/
csv-bytes csv-bytes
csv-bytes-manual csv-bytes-manual
**/.ipynb_checkpoints/

+ 380
- 0
notebooks/python-example-of-query-for-1h.ipynb View File

@@ -0,0 +1,380 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>time</th>\n",
" <th>amount</th>\n",
" <th>exch</th>\n",
" <th>price</th>\n",
" <th>server_time</th>\n",
" <th>side</th>\n",
" <th>ticker</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <td>0</td>\n",
" <td>1531094401700852527</td>\n",
" <td>0.0801</td>\n",
" <td>bits</td>\n",
" <td>6706.6001</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>1</td>\n",
" <td>1531094401780298519</td>\n",
" <td>0.0284</td>\n",
" <td>bits</td>\n",
" <td>6706.6099</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>2</td>\n",
" <td>1531094402305708472</td>\n",
" <td>0.0050</td>\n",
" <td>btfx</td>\n",
" <td>6707.0000</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>3</td>\n",
" <td>1531094403455657797</td>\n",
" <td>0.0050</td>\n",
" <td>btfx</td>\n",
" <td>6706.7002</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>4</td>\n",
" <td>1531094403592663872</td>\n",
" <td>0.0658</td>\n",
" <td>btfx</td>\n",
" <td>6705.8999</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" time amount exch price server_time side ticker\n",
"0 1531094401700852527 0.0801 bits 6706.6001 0 na btc_usd\n",
"1 1531094401780298519 0.0284 bits 6706.6099 0 na btc_usd\n",
"2 1531094402305708472 0.0050 btfx 6707.0000 0 na btc_usd\n",
"3 1531094403455657797 0.0050 btfx 6706.7002 0 na btc_usd\n",
"4 1531094403592663872 0.0658 btfx 6705.8999 0 na btc_usd"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.read_csv('/xfs/trades.csv', nrows=1024 * 1024)\n",
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>time</th>\n",
" <th>amount</th>\n",
" <th>exch</th>\n",
" <th>price</th>\n",
" <th>server_time</th>\n",
" <th>side</th>\n",
" <th>ticker</th>\n",
" </tr>\n",
" <tr>\n",
" <th>time</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <td>2018-07-09 00:00:01.700852527+00:00</td>\n",
" <td>1531094401700852527</td>\n",
" <td>0.0801</td>\n",
" <td>bits</td>\n",
" <td>6706.6001</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>2018-07-09 00:00:01.780298519+00:00</td>\n",
" <td>1531094401780298519</td>\n",
" <td>0.0284</td>\n",
" <td>bits</td>\n",
" <td>6706.6099</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>2018-07-09 00:00:02.305708472+00:00</td>\n",
" <td>1531094402305708472</td>\n",
" <td>0.0050</td>\n",
" <td>btfx</td>\n",
" <td>6707.0000</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>2018-07-09 00:00:03.455657797+00:00</td>\n",
" <td>1531094403455657797</td>\n",
" <td>0.0050</td>\n",
" <td>btfx</td>\n",
" <td>6706.7002</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" <tr>\n",
" <td>2018-07-09 00:00:03.592663872+00:00</td>\n",
" <td>1531094403592663872</td>\n",
" <td>0.0658</td>\n",
" <td>btfx</td>\n",
" <td>6705.8999</td>\n",
" <td>0</td>\n",
" <td>na</td>\n",
" <td>btc_usd</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" time amount exch \\\n",
"time \n",
"2018-07-09 00:00:01.700852527+00:00 1531094401700852527 0.0801 bits \n",
"2018-07-09 00:00:01.780298519+00:00 1531094401780298519 0.0284 bits \n",
"2018-07-09 00:00:02.305708472+00:00 1531094402305708472 0.0050 btfx \n",
"2018-07-09 00:00:03.455657797+00:00 1531094403455657797 0.0050 btfx \n",
"2018-07-09 00:00:03.592663872+00:00 1531094403592663872 0.0658 btfx \n",
"\n",
" price server_time side ticker \n",
"time \n",
"2018-07-09 00:00:01.700852527+00:00 6706.6001 0 na btc_usd \n",
"2018-07-09 00:00:01.780298519+00:00 6706.6099 0 na btc_usd \n",
"2018-07-09 00:00:02.305708472+00:00 6707.0000 0 na btc_usd \n",
"2018-07-09 00:00:03.455657797+00:00 6706.7002 0 na btc_usd \n",
"2018-07-09 00:00:03.592663872+00:00 6705.8999 0 na btc_usd "
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.index = pd.to_datetime(df['time'], utc=True)\n",
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(Timestamp('2018-07-09 00:00:01.700852527+0000', tz='UTC'),\n",
" Timestamp('2018-07-11 19:27:34.453569864+0000', tz='UTC'),\n",
" Timedelta('2 days 19:27:32.752717'))"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.index[0], df.index[-1], df.index[-1] - df.index[0]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1.000578250137079"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"start = \"2018-07-10T04:00:00Z\" # <- randomly selected start/end \n",
"end = \"2018-07-10T05:00:00Z\" # important point is, 1h span\n",
"\n",
"target_hr = (df.index >= start) & (df.index < end)\n",
"assert target_hr.sum() > 0\n",
"\n",
"of_btcusd = df['ticker'] == 'btc_usd'\n",
"assert of_btcusd.sum() > 0\n",
"\n",
"of_bmex = df['exch'] == 'bmex'\n",
"of_gdax = df['exch'] == 'gdax'\n",
"\n",
"assert of_bmex.sum() > 0\n",
"assert of_gdax.sum() > 0\n",
"\n",
"bmex_hr = df.loc[target_hr & of_bmex & of_btcusd, ['price', 'amount']]\n",
"gdax_hr = df.loc[target_hr & of_gdax & of_btcusd, ['price', 'amount']]\n",
"\n",
"bmex_size_wt_price = (\n",
" (bmex_hr['price'] * bmex_hr['amount']).sum()\n",
" / bmex_hr['amount'].sum()\n",
")\n",
"\n",
"gdax_size_wt_price = (\n",
" (gdax_hr['price'] * gdax_hr['amount']).sum()\n",
" / gdax_hr['amount'].sum()\n",
")\n",
"\n",
"ratio = bmex_size_wt_price / gdax_size_wt_price # <- final answer (for start,end hr at least)\n",
"ratio"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"((2714, 2), (1305, 2))"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"bmex_hr.shape, gdax_hr.shape"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(6642.42451802828, 6638.585754905495)"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"bmex_size_wt_price, gdax_size_wt_price"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

+ 121
- 0
postgres-creat-trades-table.sql View File

@@ -0,0 +1,121 @@
CREATE SEQUENCE public.trades_id_seq
INCREMENT 1
MINVALUE 1
MAXVALUE 9223372036854775807
START 1
CACHE 1;
ALTER TABLE public.trades_id_seq
OWNER TO jstrong;

-- CREATE SEQUENCE public.exchangess_id_seq
-- INCREMENT 1
-- MINVALUE 1
-- MAXVALUE 255
-- START 1
-- CACHE 1;
-- ALTER TABLE public.exchangess_id_seq
-- OWNER TO jstrong;
--
-- CREATE SEQUENCE public.currencies_id_seq
-- INCREMENT 1
-- MINVALUE 1
-- MAXVALUE 255
-- START 1
-- CACHE 1;
-- ALTER TABLE public.currencies_id_seq
-- OWNER TO jstrong;


CREATE TABLE public.exchanges
(

id smallint NOT NULL,
symbol character varying(4) NOT NULL,

CONSTRAINT exchanges_pkey PRIMARY KEY (id)

) WITH ( OIDS=FALSE );

CREATE TABLE public.currencies
(
id smallint NOT NULL,
symbol character varying(6) NOT NULL,

CONSTRAINT currencies_pkey PRIMARY KEY (id)

) WITH ( OIDS=FALSE );

CREATE TABLE public.trades
(
id integer NOT NULL DEFAULT nextval('trades_id_seq'::regclass),
"time" timestamp with time zone NOT NULL,
exch smallint NOT NULL,
base smallint NOT NULL,
quote smallint NOT NULL,
amount double precision NOT NULL,
price double precision NOT NULL,
side smallint NULL, -- side has no fk ... bid=1, ask=2
server_time timestamp with time zone NULL,
CONSTRAINT trades_pkey PRIMARY KEY (id),

CONSTRAINT exch_fk FOREIGN KEY (exch)
REFERENCES public.exchanges (id) MATCH SIMPLE
ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED,

CONSTRAINT base_fk FOREIGN KEY (base)
REFERENCES public.currencies (id) MATCH SIMPLE
ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED,

CONSTRAINT quote_fk FOREIGN KEY (quote)
REFERENCES public.currencies (id) MATCH SIMPLE
ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED
)
WITH (
OIDS=FALSE
);

ALTER TABLE public.trades
OWNER TO jstrong;

CREATE INDEX trades_time_abcdefg
ON public.trades
USING btree
("time");

CREATE INDEX trades_base_f6b2eeda
ON public.trades
USING btree
(base);

CREATE INDEX trades_quote_0d5895fc
ON public.trades
USING btree
(quote);

CREATE INDEX trades_exchange_5d5c6971
ON public.trades
USING btree
(exch);

-- CREATE INDEX trades_side_23985593
-- ON public.trades
-- USING btree
-- (side);

-- fill in exchanges/currencies

insert into public.exchanges (id, symbol) values(1, 'plnx');
insert into public.exchanges (id, symbol) values(2, 'krkn');
insert into public.exchanges (id, symbol) values(3, 'gdax');
insert into public.exchanges (id, symbol) values(5, 'bits');
insert into public.exchanges (id, symbol) values(6, 'bmex');
insert into public.exchanges (id, symbol) values(7, 'btfx');
insert into public.exchanges (id, symbol) values(8, 'bnce');
insert into public.exchanges (id, symbol) values(9, 'okex');
insert into public.currencies (id, symbol) values(1, 'btc');
insert into public.currencies (id, symbol) values(2, 'eth');
insert into public.currencies (id, symbol) values(3, 'xmr');
insert into public.currencies (id, symbol) values(5, 'ltc');
insert into public.currencies (id, symbol) values(15, 'etc');
insert into public.currencies (id, symbol) values(4, 'usdt');
insert into public.currencies (id, symbol) values(100, 'usd');

+ 189
- 1
src/munge.rs View File

@@ -13,7 +13,7 @@ use pretty_toa::ThousandsSep;
use structopt::StructOpt; use structopt::StructOpt;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use slog::Drain; use slog::Drain;
use chrono::{DateTime, Utc};
use chrono::{DateTime, Utc, NaiveDateTime};
use markets::crypto::{Exchange, Ticker, Side}; use markets::crypto::{Exchange, Ticker, Side};


macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
@@ -21,6 +21,7 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
std::process::exit(1); std::process::exit(1);
}}} }}}


const PROGRESS_EVERY: usize = 1024 * 1024 * 4;




#[structopt(rename_all="kebab-case")] #[structopt(rename_all="kebab-case")]
@@ -28,6 +29,9 @@ macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
enum Opt { enum Opt {


/// Filter trades-csv by start,end range and save subset to output-path /// Filter trades-csv by start,end range and save subset to output-path
///
/// Note: csv assumed to be pre-sorted by time (ascending)
///
Range { Range {


/// Path to CSV file with trades data /// Path to CSV file with trades data
@@ -41,12 +45,35 @@ enum Opt {
output_path: PathBuf, output_path: PathBuf,


/// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
#[structopt(short = "s", long = "start")]
start: DateTime<Utc>, start: DateTime<Utc>,


/// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ") /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
#[structopt(short = "e", long = "end")]
end: DateTime<Utc>, end: DateTime<Utc>,


}, },

/// Convert the original csv info a format ready to be ingested via COPY
///
/// 1. server_time of 0 -> NULL
/// 2. side of "na" -> NULL
PrepPostgres {

/// Path to CSV file with trades data
#[structopt(short = "f", long = "trades-csv")]
#[structopt(parse(from_os_str))]
trades_csv: PathBuf,

/// Where to save the query results (CSV output)
#[structopt(short = "o", long = "output-path")]
#[structopt(parse(from_os_str))]
output_path: PathBuf,
},

ListCodes,


} }


#[derive(Deserialize)] #[derive(Deserialize)]
@@ -60,6 +87,37 @@ struct Trade {
pub amount: f64, pub amount: f64,
} }


#[derive(Deserialize, Debug)]
struct PgBuilder<'a> {
pub time: u64,
pub exch: Exchange,
pub ticker: Ticker,
pub side: Option<&'a str>,
pub price: f64,
pub amount: f64,
pub server_time: u64,
}

#[derive(Serialize, Debug)]
struct PgRow {
pub time: DateTime<Utc>,
pub exch: u8,
pub base: u8,
pub quote: u8,
pub amount: f64,
pub price: f64,
pub side: Option<u8>,
pub server_time: Option<DateTime<Utc>>,
}

fn nanos_to_utc(nanos: u64) -> DateTime<Utc> {
const ONE_SECOND: u64 = 1_000_000_000;
let sec: i64 = (nanos / ONE_SECOND) as i64;
let nsec: u32 = (nanos % ONE_SECOND) as u32;
let naive = NaiveDateTime::from_timestamp(sec, nsec);
DateTime::from_utc(naive, Utc)
}

fn per_sec(n: usize, span: Duration) -> f64 { fn per_sec(n: usize, span: Duration) -> f64 {
if n == 0 || span < Duration::from_micros(1) { return 0.0 } if n == 0 || span < Duration::from_micros(1) { return 0.0 }
let s: f64 = span.as_nanos() as f64 / 1e9f64; let s: f64 = span.as_nanos() as f64 / 1e9f64;
@@ -76,6 +134,90 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
let mut n = 0; let mut n = 0;


match opt { match opt {
Opt::PrepPostgres { trades_csv, output_path } => {
let logger = logger.new(o!("cmd" => "prep-postgres"));

info!(logger, "beginning prep-postgres cmd";
"trades_csv" => %trades_csv.display(),
"output_path" => %output_path.display(),
);

if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }

info!(logger, "opening trades_csv file");

let rdr = fs::File::open(&trades_csv)
.map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;

let rdr = io::BufReader::new(rdr);

let mut rdr = csv::Reader::from_reader(rdr);

info!(logger, "opening output file for writing");

let wtr = fs::File::create(&output_path)
.map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;

let wtr = io::BufWriter::new(wtr);

let mut wtr = csv::Writer::from_writer(wtr);

let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
let mut row = csv::StringRecord::new();

//wtr.write_record(&headers).map_err(|e| format!("writing headers row failed: {}", e))?;

while rdr.read_record(&mut row)
.map_err(|e| {
format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
})?
{
let bldr: PgBuilder = row.deserialize(Some(&headers)).map_err(|e| format!("deser failed: {}", e))?;
let PgBuilder { time, exch, ticker, side, price, amount, server_time } = bldr;
let time = nanos_to_utc(time);
let exch = u8::from(exch);
let base = u8::from(ticker.base);
let quote = u8::from(ticker.quote);
let side: Option<u8> = match side {
Some("bid") => Some(1),
Some("ask") => Some(2),
_ => None,
};
let server_time = match server_time {
0 => None,
x => Some(nanos_to_utc(x)),
};
let pg_row = PgRow { time, exch, base, quote, amount, price, side, server_time };

wtr.serialize(&pg_row).map_err(|e| format!("serializing PgRow to csv failed: {}", e))?;

n += 1;
if n % PROGRESS_EVERY == 0 {
info!(logger, "parsing/writing csv rows"; "n" => %n.thousands_sep());
}
}
}

Opt::ListCodes => {
println!("side: {:?} {}", Side::Bid, u8::from(Side::Bid));
println!("side: {:?} {}", Side::Ask, u8::from(Side::Ask));
println!();

for exch in
&[e!(plnx), e!(krkn), e!(gdax), e!(bits),
e!(bmex), e!(btfx), e!(bnce), e!(okex), ]
{
println!("insert into exchanges (id, symbol) values({}, \"{}\");", u8::from(*exch), exch.as_str());
}

for currency in
&[c!(btc), c!(eth), c!(xmr), c!(ltc), c!(etc), c!(usdt), c!(usd)]
{
println!("insert into currencies (id, symbol) values({}, \"{}\");", u8::from(*currency), currency.as_str());
}

}

Opt::Range { trades_csv, output_path, start, end } => { Opt::Range { trades_csv, output_path, start, end } => {
let logger = logger.new(o!("cmd" => "range")); let logger = logger.new(o!("cmd" => "range"));


@@ -97,6 +239,8 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {


let mut rdr = csv::Reader::from_reader(rdr); let mut rdr = csv::Reader::from_reader(rdr);


info!(logger, "opening output file for writing");

let wtr = fs::File::create(&output_path) let wtr = fs::File::create(&output_path)
.map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?; .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;


@@ -105,11 +249,55 @@ fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
let mut wtr = csv::Writer::from_writer(wtr); let mut wtr = csv::Writer::from_writer(wtr);


let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone(); let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();

let time_col: usize = headers.iter().position(|x| x == b"time").ok_or_else(|| {
String::from("no column in headers named 'time'")
})?;

let mut row = csv::ByteRecord::new(); let mut row = csv::ByteRecord::new();


let start_nanos = nanos(start); let start_nanos = nanos(start);
let end_nanos = nanos(end); let end_nanos = nanos(end);


let mut n_written = 0;
let mut time: u64 = 0;

info!(logger, "writing headers row to output file");
wtr.write_byte_record(&headers).map_err(|e| format!("writing csv headers row failed: {}", e))?;

info!(logger, "entering csv parsing loop");

'a: while rdr.read_byte_record(&mut row)
.map_err(|e| {
format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
})?
{
let time_bytes = row.get(time_col).ok_or_else(|| "time column not present for row")?;

time = atoi::atoi(time_bytes).ok_or_else(|| {
format!("failed to parse 'time' col value '{}' as integer", std::str::from_utf8(time_bytes).unwrap_or("utf8err"))
})?;

n += 1;

if n % PROGRESS_EVERY == 0 {
info!(logger, "parsing csv rows"; "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep());
}

if time < start_nanos { continue 'a }

if time > end_nanos { break 'a }

wtr.write_byte_record(&row).map_err(|e| format!("writing parsed csv row to output file failed: {}", e))?;

n_written += 1;
}

info!(logger, "broke out of read csv loop"; "time" => time, "end_nanos" => end_nanos, "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep());

info!(logger, "dropping wtr");

drop(wtr);
} }
} }




Loading…
Cancel
Save