You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

327 lines
11KB

  1. #![allow(unused)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::io::{self, prelude::*};
  7. use std::fs;
  8. use std::path::{Path, PathBuf};
  9. use std::time::*;
  10. use pretty_toa::ThousandsSep;
  11. use structopt::StructOpt;
  12. use serde::{Serialize, Deserialize};
  13. use slog::Drain;
  14. use chrono::{DateTime, Utc, NaiveDateTime};
  15. use markets::crypto::{Exchange, Ticker, Side, Currency};
  16. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  17. eprintln!($fmt, $($args)*);
  18. std::process::exit(1);
  19. }}}
  20. const PROGRESS_EVERY: usize = 1024 * 1024 * 4;
  21. #[structopt(rename_all="kebab-case")]
  22. #[derive(Debug, StructOpt)]
  23. enum Opt {
  24. /// Filter trades-csv by start,end range and save subset to output-path
  25. ///
  26. /// Note: csv assumed to be pre-sorted by time (ascending)
  27. ///
  28. Range {
  29. /// Path to CSV file with trades data
  30. #[structopt(short = "f", long = "trades-csv")]
  31. #[structopt(parse(from_os_str))]
  32. trades_csv: PathBuf,
  33. /// Where to save the query results (CSV output)
  34. #[structopt(short = "o", long = "output-path")]
  35. #[structopt(parse(from_os_str))]
  36. output_path: PathBuf,
  37. /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
  38. #[structopt(short = "s", long = "start")]
  39. start: DateTime<Utc>,
  40. /// rfc3339 format ("YYYY-MM-DDTHH:MM:SSZ")
  41. #[structopt(short = "e", long = "end")]
  42. end: DateTime<Utc>,
  43. },
  44. /// Convert the original csv info a format ready to be ingested via COPY
  45. ///
  46. /// 1. server_time of 0 -> NULL
  47. /// 2. side of "na" -> NULL
  48. PrepPostgres {
  49. /// Path to CSV file with trades data
  50. #[structopt(short = "f", long = "trades-csv")]
  51. #[structopt(parse(from_os_str))]
  52. trades_csv: PathBuf,
  53. /// Where to save the query results (CSV output)
  54. #[structopt(short = "o", long = "output-path")]
  55. #[structopt(parse(from_os_str))]
  56. output_path: PathBuf,
  57. },
  58. ListCodes,
  59. }
  60. #[derive(Deserialize)]
  61. struct Trade {
  62. /// Unix nanoseconds
  63. pub time: u64,
  64. pub exch: Exchange,
  65. pub ticker: Ticker,
  66. //pub side: Option<Side>,
  67. pub price: f64,
  68. pub amount: f64,
  69. }
  70. #[derive(Deserialize, Debug)]
  71. struct PgBuilder<'a> {
  72. pub time: u64,
  73. pub exch: Exchange,
  74. pub ticker: Ticker,
  75. pub side: Option<&'a str>,
  76. pub price: f64,
  77. pub amount: f64,
  78. pub server_time: u64,
  79. }
  80. #[derive(Serialize, Debug)]
  81. struct PgRow {
  82. pub time: DateTime<Utc>,
  83. pub exch: u8,
  84. pub base: u8,
  85. pub quote: u8,
  86. pub amount: f64,
  87. pub price: f64,
  88. pub side: Option<u8>,
  89. pub server_time: Option<DateTime<Utc>>,
  90. }
  91. fn nanos_to_utc(nanos: u64) -> DateTime<Utc> {
  92. const ONE_SECOND: u64 = 1_000_000_000;
  93. let sec: i64 = (nanos / ONE_SECOND) as i64;
  94. let nsec: u32 = (nanos % ONE_SECOND) as u32;
  95. let naive = NaiveDateTime::from_timestamp(sec, nsec);
  96. DateTime::from_utc(naive, Utc)
  97. }
  98. fn per_sec(n: usize, span: Duration) -> f64 {
  99. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  100. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  101. n as f64 / s
  102. }
  103. fn nanos(utc: DateTime<Utc>) -> u64 {
  104. (utc.timestamp() as u64) * 1_000_000_000_u64 + (utc.timestamp_subsec_nanos() as u64)
  105. }
  106. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  107. let opt = Opt::from_args();
  108. let mut n = 0;
  109. match opt {
  110. Opt::PrepPostgres { trades_csv, output_path } => {
  111. let logger = logger.new(o!("cmd" => "prep-postgres"));
  112. info!(logger, "beginning prep-postgres cmd";
  113. "trades_csv" => %trades_csv.display(),
  114. "output_path" => %output_path.display(),
  115. );
  116. if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }
  117. info!(logger, "opening trades_csv file");
  118. let rdr = fs::File::open(&trades_csv)
  119. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;
  120. let rdr = io::BufReader::new(rdr);
  121. let mut rdr = csv::Reader::from_reader(rdr);
  122. info!(logger, "opening output file for writing");
  123. let wtr = fs::File::create(&output_path)
  124. .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
  125. let wtr = io::BufWriter::new(wtr);
  126. let mut wtr = csv::Writer::from_writer(wtr);
  127. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  128. let mut row = csv::StringRecord::new();
  129. //wtr.write_record(&headers).map_err(|e| format!("writing headers row failed: {}", e))?;
  130. while rdr.read_record(&mut row)
  131. .map_err(|e| {
  132. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  133. })?
  134. {
  135. let bldr: PgBuilder = row.deserialize(Some(&headers)).map_err(|e| format!("deser failed: {}", e))?;
  136. let PgBuilder { time, exch, ticker, side, price, amount, server_time } = bldr;
  137. let time = nanos_to_utc(time);
  138. let exch = u8::from(exch);
  139. let base = u8::from(ticker.base);
  140. let quote = u8::from(ticker.quote);
  141. let side: Option<u8> = match side {
  142. Some("bid") => Some(1),
  143. Some("ask") => Some(2),
  144. _ => None,
  145. };
  146. let server_time = match server_time {
  147. 0 => None,
  148. x => Some(nanos_to_utc(x)),
  149. };
  150. let pg_row = PgRow { time, exch, base, quote, amount, price, side, server_time };
  151. wtr.serialize(&pg_row).map_err(|e| format!("serializing PgRow to csv failed: {}", e))?;
  152. n += 1;
  153. if n % PROGRESS_EVERY == 0 {
  154. info!(logger, "parsing/writing csv rows"; "n" => %n.thousands_sep());
  155. }
  156. }
  157. }
  158. Opt::ListCodes => {
  159. println!("side: {:?} {}", Side::Bid, u8::from(Side::Bid));
  160. println!("side: {:?} {}", Side::Ask, u8::from(Side::Ask));
  161. println!();
  162. for exch in Exchange::all() {
  163. println!("INSERT INTO exchanges (id, symbol) VALUES ({}, '{}');", u8::from(exch), exch.as_str());
  164. }
  165. for currency in Currency::all() {
  166. println!("INSERT INTO currencies (id, symbol) VALUES ({}, '{}');", u8::from(currency), currency.as_str());
  167. }
  168. }
  169. Opt::Range { trades_csv, output_path, start, end } => {
  170. let logger = logger.new(o!("cmd" => "range"));
  171. info!(logger, "beginning range cmd";
  172. "trades_csv" => %trades_csv.display(),
  173. "output_path" => %output_path.display(),
  174. "start" => %start,
  175. "end" => %end,
  176. );
  177. if ! trades_csv.exists() { return Err(format!("--trades-csv path does not exist: {}", trades_csv.display())) }
  178. info!(logger, "opening trades_csv file");
  179. let rdr = fs::File::open(&trades_csv)
  180. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, trades_csv.display()))?;
  181. let rdr = io::BufReader::new(rdr);
  182. let mut rdr = csv::Reader::from_reader(rdr);
  183. info!(logger, "opening output file for writing");
  184. let wtr = fs::File::create(&output_path)
  185. .map_err(|e| format!("opening output file failed: {} (tried to open {} for writing)", e, output_path.display()))?;
  186. let wtr = io::BufWriter::new(wtr);
  187. let mut wtr = csv::Writer::from_writer(wtr);
  188. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  189. let time_col: usize = headers.iter().position(|x| x == b"time").ok_or_else(|| {
  190. String::from("no column in headers named 'time'")
  191. })?;
  192. let mut row = csv::ByteRecord::new();
  193. let start_nanos = nanos(start);
  194. let end_nanos = nanos(end);
  195. let mut n_written = 0;
  196. let mut time: u64 = 0;
  197. info!(logger, "writing headers row to output file");
  198. wtr.write_byte_record(&headers).map_err(|e| format!("writing csv headers row failed: {}", e))?;
  199. info!(logger, "entering csv parsing loop");
  200. 'a: while rdr.read_byte_record(&mut row)
  201. .map_err(|e| {
  202. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  203. })?
  204. {
  205. let time_bytes = row.get(time_col).ok_or_else(|| "time column not present for row")?;
  206. time = atoi::atoi(time_bytes).ok_or_else(|| {
  207. format!("failed to parse 'time' col value '{}' as integer", std::str::from_utf8(time_bytes).unwrap_or("utf8err"))
  208. })?;
  209. n += 1;
  210. if n % PROGRESS_EVERY == 0 {
  211. info!(logger, "parsing csv rows"; "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep());
  212. }
  213. if time < start_nanos { continue 'a }
  214. if time > end_nanos { break 'a }
  215. wtr.write_byte_record(&row).map_err(|e| format!("writing parsed csv row to output file failed: {}", e))?;
  216. n_written += 1;
  217. }
  218. info!(logger, "broke out of read csv loop"; "time" => time, "end_nanos" => end_nanos, "n" => %n.thousands_sep(), "n_written" => %n_written.thousands_sep());
  219. info!(logger, "dropping wtr");
  220. drop(wtr);
  221. }
  222. }
  223. Ok(n)
  224. }
  225. fn main() {
  226. let start = Instant::now();
  227. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  228. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  229. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  230. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  231. match run(start, &logger) {
  232. Ok(n) => {
  233. let took = Instant::now() - start;
  234. info!(logger, "finished in {:?}", took;
  235. "n rows" => %n.thousands_sep(),
  236. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
  237. );
  238. }
  239. Err(e) => {
  240. crit!(logger, "run failed: {:?}", e);
  241. eprintln!("\n\nError: {}", e);
  242. std::thread::sleep(Duration::from_millis(100));
  243. std::process::exit(1);
  244. }
  245. }
  246. }