You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

munge.rs 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. #![allow(unused)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::io::{self, prelude::*};
  7. use std::fs;
  8. use std::path::{Path, PathBuf};
  9. use std::time::*;
  10. use pretty_toa::ThousandsSep;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use chrono::{DateTime, Utc, NaiveDateTime};
  15. use markets::crypto::{Exchange, Ticker, Side, Currency};
  16. use pipelines::encoding;
  17. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  18. eprintln!($fmt, $($args)*);
  19. std::process::exit(1);
  20. }}}
  21. const PROGRESS_EVERY: usize = 1024 * 1024 * 4;
  22. #[structopt(rename_all="kebab-case")]
  23. #[derive(Debug, StructOpt)]
  24. enum Opt {
  25. /// Filter trades-csv by start,end range and save subset to output-path
  26. ///
  27. /// Note: csv assumed to be pre-sorted by time (ascending)
  28. ///
  29. Range {
  30. /// Path to CSV file with trades data
  31. #[structopt(short = "f", long = "trades-csv")]
  32. #[structopt(parse(from_os_str))]
  33. trades_csv: PathBuf,
  34. /// Where to save the query results (CSV output)
  35. #[structopt(short = "o", long = "output-path")]
  36. #[structopt(parse(from_os_str))]
  37. output_path: PathBuf,
  38. /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
  39. #[structopt(short = "s", long = "start")]
  40. start: DateTime<Utc>,
  41. /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
  42. #[structopt(short = "e", long = "end")]
  43. end: DateTime<Utc>,
  44. },
  45. /// Convert the original csv info a format ready to be ingested via COPY
  46. ///
  47. /// 1. server_time of 0 -> NULL
  48. /// 2. side of "na" -> NULL
  49. PrepPostgres {
  50. /// Path to CSV file with trades data
  51. #[structopt(short = "f", long = "trades-csv")]
  52. #[structopt(parse(from_os_str))]
  53. trades_csv: PathBuf,
  54. /// Where to save the query results (CSV output)
  55. #[structopt(short = "o", long = "output-path")]
  56. #[structopt(parse(from_os_str))]
  57. output_path: PathBuf,
  58. },
  59. ListCodes,
  60. Binarize {
  61. /// Path to CSV file with trades data
  62. #[structopt(short = "f", long = "trades-csv")]
  63. #[structopt(parse(from_os_str))]
  64. trades_csv: PathBuf,
  65. /// Where to save the binary-serialized data
  66. #[structopt(short = "o", long = "output-path")]
  67. #[structopt(parse(from_os_str))]
  68. output_path: PathBuf,
  69. },
  70. CountRows {
  71. /// Path to file with binary trades data
  72. #[structopt(short = "f", long = "input-file")]
  73. #[structopt(parse(from_os_str))]
  74. input_path: PathBuf,
  75. },
  76. }
  77. #[derive(Deserialize)]
  78. struct Trade {
  79. /// Unix nanoseconds
  80. pub time: u64,
  81. pub exch: Exchange,
  82. pub ticker: Ticker,
  83. //pub side: Option<Side>,
  84. pub price: f64,
  85. pub amount: f64,
  86. }
  87. #[derive(Deserialize, Debug)]
  88. struct PgBuilder<'a> {
  89. pub time: u64,
  90. pub exch: Exchange,
  91. pub ticker: Ticker,
  92. pub side: Option<&'a str>,
  93. pub price: f64,
  94. pub amount: f64,
  95. pub server_time: u64,
  96. }
  97. #[derive(Serialize, Debug)]
  98. struct PgRow {
  99. pub time: DateTime<Utc>,
  100. pub exch: u8,
  101. pub base: u8,
  102. pub quote: u8,
  103. pub amount: f64,
  104. pub price: f64,
  105. pub side: Option<u8>,
  106. pub server_time: Option<DateTime<Utc>>,
  107. }
  108. fn nanos_to_utc(nanos: u64) -> DateTime<Utc> {
  109. const ONE_SECOND: u64 = 1_000_000_000;
  110. let sec: i64 = (nanos / ONE_SECOND) as i64;
  111. let nsec: u32 = (nanos % ONE_SECOND) as u32;
  112. let naive = NaiveDateTime::from_timestamp(sec, nsec);
  113. DateTime::from_utc(naive, Utc)
  114. }
  115. fn per_sec(n: usize, span: Duration) -> f64 {
  116. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  117. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  118. n as f64 / s
  119. }
  120. fn nanos(utc: DateTime<Utc>) -> u64 {
  121. (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64)
  122. }
  123. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  124. let opt = Opt::from_args();
  125. let mut n = 0;
  126. match opt {
  127. Opt::CountRows { input_path } => {
  128. let logger = logger.new(o!("cmd" => "count-rows"));
  129. info!(logger, "beginning to count";
  130. "input_path" => %input_path.display(),
  131. );
  132. if ! input_path.exists() { return Err(format!("--input-file path does not exist: {}", input_path.display())) }
  133. let input_file =
  134. fs::OpenOptions::new()
  135. .read(true)
  136. .open(input_path)
  137. .map_err(|e| e.to_string())?;
  138. let file_length = input_file.metadata().unwrap().len();
  139. if file_length % encoding::SERIALIZED_SIZE as u64 != 0 {
  140. return Err(format!("file length is not a multiple of record size: {}", file_length))
  141. }
  142. let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE;
  143. info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep());
  144. let data: memmap::Mmap = unsafe {
  145. memmap::Mmap::map(&input_file)
  146. .map_err(|e| {
  147. format!("creating Mmap failed: {}", e)
  148. })?
  149. };
  150. let mut n_gdax = 0;
  151. let mut n_bmex = 0;
  152. for i in 0..n_records {
  153. let j = i * encoding::SERIALIZED_SIZE;
  154. let k = j + encoding::SERIALIZED_SIZE;
  155. let packed = encoding::PackedTradeData::new(&data[j..k]);
  156. n_gdax += (packed.exch().unwrap() == e!(gdax)) as usize;
  157. n_bmex += (packed.exch().unwrap() == e!(bmex)) as usize;
  158. n += 1;
  159. }
  160. info!(logger, "finished reading flle";
  161. "n gdax" => n_gdax.thousands_sep(),
  162. "n bmex" => n_bmex.thousands_sep(),
  163. );
  164. }
  165. Opt::Binarize { trades_csv, output_path } => {
  166. let logger = logger.new(o!("cmd" => "binarize"));
  167. info!(logger, "beginning binarize";
  168. "trades_csv" => %trades_csv.display(),
  169. "output_path" => %output_path.display(),
  170. );
  171. if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }
  172. info!(logger, "opening trades_csv file");
  173. let rdr = fs::File::open(&trades_csv)
  174. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;
  175. let rdr = io::BufReader::new(rdr);
  176. let mut rdr = csv::Reader::from_reader(rdr);
  177. info!(logger, "opening output file for writing");
  178. let wtr = fs::File::create(&output_path)
  179. .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
  180. let mut wtr = io::BufWriter::new(wtr);
  181. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  182. let mut row = csv::ByteRecord::new();
  183. let mut buf = vec![0u8; encoding::SERIALIZED_SIZE];
  184. let mut n = 0;
  185. let mut n_written = 0;
  186. let mut n_bytes_written = 0;
  187. let mut n_bytes_read = headers.as_slice().len() + headers.len() + 1;
  188. while rdr.read_byte_record(&mut row)
  189. .map_err(|e| {
  190. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  191. })?
  192. {
  193. let trade: encoding::CsvTrade = row.deserialize(Some(&headers)).map_err(|e| e.to_string())?;
  194. n += 1;
  195. n_bytes_read += row.as_slice().len() + row.len() + 1;
  196. encoding::serialize(&mut buf[..], &trade);
  197. let bytes_written = wtr.write(&buf[..]).map_err(|e| e.to_string())?;
  198. assert_eq!(bytes_written, encoding::SERIALIZED_SIZE);
  199. n_written += 1;
  200. n_bytes_written += bytes_written;
  201. if n % PROGRESS_EVERY == 0 {
  202. info!(logger, "binarizing csv";
  203. "elapsed" => ?(Instant::now() - start),
  204. "n" => %n.thousands_sep(),
  205. "n_written" => %n_written.thousands_sep(),
  206. "mb read" => (n_bytes_read as f64 / 1024.0 / 1024.0),
  207. "mb written" => (n_bytes_written as f64 / 1024.0 / 1024.0),
  208. );
  209. }
  210. }
  211. info!(logger, "finished reading/converting csv");
  212. assert_eq!(n_bytes_written % encoding::SERIALIZED_SIZE, 0);
  213. }
  214. Opt::PrepPostgres { trades_csv, output_path } => {
  215. let logger = logger.new(o!("cmd" => "prep-postgres"));
  216. info!(logger, "beginning prep-postgres cmd";
  217. "trades_csv" => %trades_csv.display(),
  218. "output_path" => %output_path.display(),
  219. );
  220. if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }
  221. info!(logger, "opening trades_csv file");
  222. let rdr = fs::File::open(&trades_csv)
  223. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;
  224. let rdr = io::BufReader::new(rdr);
  225. let mut rdr = csv::Reader::from_reader(rdr);
  226. info!(logger, "opening output file for writing");
  227. let wtr = fs::File::create(&output_path)
  228. .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
  229. let wtr = io::BufWriter::new(wtr);
  230. let mut wtr = csv::Writer::from_writer(wtr);
  231. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  232. let mut row = csv::StringRecord::new();
  233. //wtr.write_record(&headers).map_err(|e| format!("writing headers row failed: {}", e))?;
  234. while rdr.read_record(&mut row)
  235. .map_err(|e| {
  236. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  237. })?
  238. {
  239. let bldr: PgBuilder = row.deserialize(Some(&headers)).map_err(|e| format!("deser failed: {}", e))?;
  240. let PgBuilder { time, exch, ticker, side, price, amount, server_time } = bldr;
  241. let time = nanos_to_utc(time);
  242. let exch = u8::from(exch);
  243. let base = u8::from(ticker.base);
  244. let quote = u8::from(ticker.quote);
  245. let side: Option<u8> = match side {
  246. Some("bid") => Some(1),
  247. Some("ask") => Some(2),
  248. _ => None,
  249. };
  250. let server_time = match server_time {
  251. 0 => None,
  252. x => Some(nanos_to_utc(x)),
  253. };
  254. let pg_row = PgRow { time, exch, base, quote, amount, price, side, server_time };
  255. wtr.serialize(&pg_row).map_err(|e| format!("serializing PgRow to csv failed: {}", e))?;
  256. n += 1;
  257. if n % PROGRESS_EVERY == 0 {
  258. info!(logger, "parsing/writing csv rows"; "n" => %n.thousands_sep());
  259. }
  260. }
  261. }
  262. Opt::ListCodes => {
  263. println!("side: {:?} {}", Side::Bid, u8::from(Side::Bid));
  264. println!("side: {:?} {}", Side::Ask, u8::from(Side::Ask));
  265. println!();
  266. for exch in Exchange::all() {
  267. println!("INSERT INTO exchanges (id, symbol) VALUES ({}, '{}');", u8::from(exch), exch.as_str());
  268. }
  269. for currency in Currency::all() {
  270. println!("INSERT INTO currencies (id, symbol) VALUES ({}, '{}');", u8::from(currency), currency.as_str());
  271. }
  272. }
  273. Opt::Range { trades_csv, output_path, start, end } => {
  274. let logger = logger.new(o!("cmd" => "range"));
  275. info!(logger, "beginning range cmd";
  276. "trades_csv" => %trades_csv.display(),
  277. "output_path" => %output_path.display(),
  278. "start" => %start,
  279. "end" => %end,
  280. );
  281. if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }
  282. info!(logger, "opening trades_csv file");
  283. let rdr = fs::File::open(&trades_csv)
  284. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;
  285. let rdr = io::BufReader::new(rdr);
  286. let mut rdr = csv::Reader::from_reader(rdr);
  287. info!(logger, "opening output file for writing");
  288. let wtr = fs::File::create(&output_path)
  289. .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
  290. let wtr = io::BufWriter::new(wtr);
  291. let mut wtr = csv::Writer::from_writer(wtr);
  292. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  293. let time_col: usize = headers.iter().position(|x| x == b"time").ok_or_else(|| {
  294. String::from("no column in headers named 'time'")
  295. })?;
  296. let mut row = csv::ByteRecord::new();
  297. let start_nanos = nanos(start);
  298. let end_nanos = nanos(end);
  299. let mut n_written = 0;
  300. let mut time: u64 = 0;
  301. info!(logger, "writing headers row to output file");
  302. wtr.write_byte_record(&headers).map_err(|e| format!("writing csv headers row failed: {}", e))?;
  303. info!(logger, "entering csv parsing loop");
  304. 'a: while rdr.read_byte_record(&mut row)
  305. .map_err(|e| {
  306. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  307. })?
  308. {
  309. let time_bytes = row.get(time_col).ok_or_else(|| "time column not present for row")?;
  310. time = atoi::atoi(time_bytes).ok_or_else(|| {
  311. format!("failed to parse 'time' col value '{}' as integer", std::str::from_utf8(time_bytes).unwrap_or("utf8err"))
  312. })?;
  313. n += 1;
  314. if n % PROGRESS_EVERY == 0 {
  315. info!(logger, "parsing csv rows"; "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep());
  316. }
  317. if time < start_nanos { continue 'a }
  318. if time > end_nanos { break 'a }
  319. wtr.write_byte_record(&row).map_err(|e| format!("writing parsed csv row to output file failed: {}", e))?;
  320. n_written += 1;
  321. }
  322. info!(logger, "broke out of read csv loop"; "time" => time, "end_nanos" => end_nanos, "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep());
  323. info!(logger, "dropping wtr");
  324. drop(wtr);
  325. }
  326. }
  327. Ok(n)
  328. }
  329. fn main() {
  330. let start = Instant::now();
  331. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  332. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  333. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  334. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  335. match run(start, &logger) {
  336. Ok(n) => {
  337. let took = Instant::now() - start;
  338. info!(logger, "finished in {:?}", took;
  339. "n rows" => %n.thousands_sep(),
  340. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 100.0).thousands_sep(),
  341. );
  342. }
  343. Err(e) => {
  344. crit!(logger, "run failed: {:?}", e);
  345. eprintln!("\n\nError: {}", e);
  346. std::thread::sleep(Duration::from_millis(100));
  347. std::process::exit(1);
  348. }
  349. }
  350. }