You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

csv.rs 12KB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. #![allow(unused_imports)]
  2. #[macro_use]
  3. extern crate slog;
  4. #[macro_use]
  5. extern crate markets;
  6. use std::path::PathBuf;
  7. use std::time::*;
  8. use std::io::{self, prelude::*};
  9. use std::fs;
  10. use structopt::StructOpt;
  11. use serde::{Serialize, Deserialize};
  12. use slog::Drain;
  13. use pretty_toa::ThousandsSep;
  14. use markets::crypto::{Exchange, Ticker, Side};
  15. // equivalent to panic! but without the ugly 'thread main panicked' yada yada
  16. macro_rules! fatal { ($fmt:expr, $($args:tt)*) => {{
  17. eprintln!($fmt, $($args)*);
  18. std::process::exit(1);
  19. }}}
  20. const PROGRESS_EVERY: usize = 1024 * 1024;
  21. const ONE_SECOND: u64 = 1_000_000_000;
  22. const ONE_HOUR: u64 = ONE_SECOND * 60 * 60;
  23. #[derive(Debug, StructOpt)]
  24. struct Opt {
  25. /// Path to CSV file with trades data
  26. #[structopt(short = "f", long = "trades-csv")]
  27. #[structopt(parse(from_os_str))]
  28. trades_csv: PathBuf,
  29. /// Where to save the query results (CSV output)
  30. #[structopt(short = "o", long = "output-path")]
  31. #[structopt(parse(from_os_str))]
  32. output_path: PathBuf,
  33. }
  34. #[derive(Deserialize)]
  35. struct Trade {
  36. /// Time of trade in unix nanoseconds
  37. pub time: u64,
  38. /// Exchange where trade executed
  39. pub exch: Exchange,
  40. /// Currency rate of trade (base/quote)
  41. pub ticker: Ticker,
  42. /// Price of trade, in quote denomination
  43. pub price: f64,
  44. /// Size/Volume of trade, in base denomination
  45. pub amount: f64,
  46. }
  47. fn per_sec(n: usize, span: Duration) -> f64 {
  48. if n == 0 || span < Duration::from_micros(1) { return 0.0 }
  49. let s: f64 = span.as_nanos() as f64 / 1e9f64;
  50. n as f64 / s
  51. }
  52. #[allow(dead_code)]
  53. #[inline(always)]
  54. fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> {
  55. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?)
  56. .ok_or("parsing time failed")?;
  57. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  58. .map_err(|_| "parsing amount failed")?;
  59. let exch = match row.get(2).ok_or("no exch")? {
  60. b"bmex" => e!(bmex),
  61. b"bnce" => e!(bnce),
  62. b"btfx" => e!(btfx),
  63. b"gdax" => e!(gdax),
  64. b"okex" => e!(okex),
  65. b"bits" => e!(bits),
  66. b"plnx" => e!(plnx),
  67. b"krkn" => e!(krkn),
  68. _ => return Err("illegal exch"),
  69. };
  70. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  71. .map_err(|_| "parsing price failed")?;
  72. let ticker = match row.get(6).ok_or("no ticker")? {
  73. b"btc_usd" => t!(btc-usd),
  74. b"eth_usd" => t!(eth-usd),
  75. b"ltc_usd" => t!(ltc-usd),
  76. b"etc_usd" => t!(etc-usd),
  77. b"bch_usd" => t!(bch-usd),
  78. b"xmr_usd" => t!(xmr-usd),
  79. b"usdt_usd" => t!(usdt-usd),
  80. _ => return Err("illegal ticker"),
  81. };
  82. Ok(Trade { time, amount, exch, price, ticker })
  83. }
  84. #[allow(dead_code)]
  85. #[inline(always)]
  86. fn manual_deserialize_str(row: &csv::StringRecord) -> Result<Trade, &'static str> {
  87. let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?.as_bytes())
  88. .ok_or("parsing time failed")?;
  89. let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?)
  90. .map_err(|_| "parsing amount failed")?;
  91. let exch = match row.get(2).ok_or("no exch")? {
  92. "bmex" => e!(bmex),
  93. "bnce" => e!(bnce),
  94. "btfx" => e!(btfx),
  95. "gdax" => e!(gdax),
  96. "okex" => e!(okex),
  97. "bits" => e!(bits),
  98. "plnx" => e!(plnx),
  99. "krkn" => e!(krkn),
  100. _ => return Err("illegal exch"),
  101. };
  102. let price: f64 = lexical::parse(row.get(3).ok_or("no price")?)
  103. .map_err(|_| "parsing price failed")?;
  104. let ticker = match row.get(6).ok_or("no ticker")? {
  105. "btc_usd" => t!(btc-usd),
  106. "eth_usd" => t!(eth-usd),
  107. "ltc_usd" => t!(ltc-usd),
  108. "etc_usd" => t!(etc-usd),
  109. "bch_usd" => t!(bch-usd),
  110. "xmr_usd" => t!(xmr-usd),
  111. "usdt_usd" => t!(usdt-usd),
  112. _ => return Err("illegal ticker"),
  113. };
  114. Ok(Trade { time, amount, exch, price, ticker })
  115. }
  116. /// Example of code used in discussion of increasing CSV parsing performance
  117. #[allow(dead_code)]
  118. fn fast_parse_bytes<R: Read>(mut rdr: csv::Reader<R>) -> Result<usize, String> {
  119. // our data is ascii, so parsing with the slightly faster ByteRecord is fine
  120. let headers: csv::ByteRecord = rdr.byte_headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  121. let mut row = csv::ByteRecord::new();
  122. // manual_deserialize_bytes assumes the column order of the CSV,
  123. // so here we verify that it actually matches that assumption
  124. assert_eq!(headers.get(0), Some(&b"time"[..]));
  125. assert_eq!(headers.get(1), Some(&b"amount"[..]));
  126. assert_eq!(headers.get(2), Some(&b"exch"[..]));
  127. assert_eq!(headers.get(3), Some(&b"price"[..]));
  128. assert_eq!(headers.get(6), Some(&b"ticker"[..]));
  129. let mut n = 0;
  130. let mut last_time = 0;
  131. while rdr.read_byte_record(&mut row)
  132. .map_err(|e| {
  133. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  134. })?
  135. {
  136. let trade: Trade = manual_deserialize_bytes(&row)
  137. .map_err(|e| {
  138. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  139. })?;
  140. assert!(trade.time >= last_time);
  141. last_time = trade.time;
  142. n += 1;
  143. }
  144. Ok(n)
  145. }
  146. fn run(start: Instant, logger: &slog::Logger) -> Result<usize, String> {
  147. let opt = Opt::from_args();
  148. info!(logger, "initializing...";
  149. "trades-csv" => %opt.trades_csv.display(),
  150. "output-path" => %opt.output_path.display()
  151. );
  152. if ! opt.trades_csv.exists() {
  153. error!(logger, "path does not exist: {}", opt.trades_csv.display());
  154. fatal!("Error: path does not exist: {}", opt.trades_csv.display());
  155. }
  156. debug!(logger, "verified csv path exists"; "trades_csv" => %opt.trades_csv.display());
  157. let rdr = fs::File::open(&opt.trades_csv)
  158. .map_err(|e| format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()))?;
  159. let rdr = io::BufReader::new(rdr);
  160. let mut rdr = csv::Reader::from_reader(rdr);
  161. // initializing --output-path CSV
  162. let wtr = fs::File::create(&opt.output_path)
  163. .map_err(|e| format!("creating output csv file failed: {} (tried to create {})", e, opt.output_path.display()))?;
  164. let wtr = io::BufWriter::new(wtr);
  165. let mut wtr = csv::Writer::from_writer(wtr);
  166. wtr.write_record(&["time","ratio","bmex","gdax","n_bmex","n_gdax","bmex_amt","gdax_amt"]).map_err(|e| format!("writing CSV headers to output file failed: {}", e))?;
  167. let headers: csv::StringRecord = rdr.headers().map_err(|e| format!("failed to parse CSV headers: {}", e))?.clone();
  168. let mut row = csv::StringRecord::new();
  169. // pull out first row to initialize query calculations
  170. rdr.read_record(&mut row).map_err(|e| format!("reading first row failed: {}", e))?;
  171. let trade: Trade = row.deserialize(Some(&headers))
  172. .map_err(|e| {
  173. format!("deserializing first row failed: {}\n\nFailing row:\n{:?}", e, row)
  174. })?;
  175. let mut cur_hour = trade.time - trade.time % ONE_HOUR;
  176. let mut next_hour = cur_hour + ONE_HOUR;
  177. let mut bmex_total = if trade.exch == e!(bmex) { trade.price * trade.amount } else { 0.0 };
  178. let mut bmex_amt = if trade.exch == e!(bmex) { trade.amount } else { 0.0 };
  179. let mut n_bmex = 0;
  180. let mut gdax_total = if trade.exch == e!(gdax) { trade.price * trade.amount } else { 0.0 };
  181. let mut gdax_amt = if trade.exch == e!(gdax) { trade.amount } else { 0.0 };
  182. let mut n_gdax = 0;
  183. let mut n = 0;
  184. let mut n_written = 0;
  185. let mut last_time = 0;
  186. while rdr.read_record(&mut row)
  187. .map_err(|e| {
  188. format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
  189. })?
  190. {
  191. let trade: Trade = row.deserialize(Some(&headers))
  192. .map_err(|e| {
  193. format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row)
  194. })?;
  195. n += 1;
  196. if trade.ticker != t!(btc-usd) { continue }
  197. // verify data is sorted by time
  198. assert!(trade.time >= last_time);
  199. last_time = trade.time;
  200. if trade.time >= next_hour { // finalize last hour, and prepare for this hour
  201. if n_bmex == 0 || n_gdax == 0 {
  202. wtr.write_record(&[
  203. &format!("{}", cur_hour),
  204. "NaN",
  205. "NaN",
  206. "NaN",
  207. &format!("{}", n_bmex),
  208. &format!("{}", n_gdax),
  209. &format!("{}", bmex_amt),
  210. &format!("{}", gdax_amt),
  211. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  212. } else {
  213. let bmex_wt_avg = bmex_total / bmex_amt;
  214. let gdax_wt_avg = gdax_total / gdax_amt;
  215. let ratio = bmex_wt_avg / gdax_wt_avg;
  216. wtr.write_record(&[
  217. &format!("{}", cur_hour),
  218. &format!("{}", ratio),
  219. &format!("{}", bmex_wt_avg),
  220. &format!("{}", gdax_wt_avg),
  221. &format!("{}", n_bmex),
  222. &format!("{}", n_gdax),
  223. &format!("{}", bmex_amt),
  224. &format!("{}", gdax_amt),
  225. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  226. }
  227. n_written += 1;
  228. // reset state
  229. bmex_total = 0.0;
  230. bmex_amt = 0.0;
  231. gdax_total = 0.0;
  232. gdax_amt = 0.0;
  233. n_bmex = 0;
  234. n_gdax = 0;
  235. cur_hour = next_hour;
  236. next_hour += ONE_HOUR;
  237. // if we are skipping hours in between the last and current row, we
  238. // need to write a NaN row for the hours that had no data
  239. while next_hour <= trade.time {
  240. wtr.write_record(&[
  241. &format!("{}", cur_hour),
  242. "NaN",
  243. "NaN",
  244. "NaN",
  245. "0",
  246. "0",
  247. "0.0",
  248. "0.0",
  249. ]).map_err(|e| format!("writing output row failed: {}", e))?;
  250. n_written += 1;
  251. cur_hour = next_hour;
  252. next_hour += ONE_HOUR;
  253. }
  254. }
  255. match trade.exch {
  256. e!(bmex) => {
  257. bmex_total += trade.price * trade.amount;
  258. bmex_amt += trade.amount;
  259. n_bmex += 1;
  260. }
  261. e!(gdax) => {
  262. gdax_total += trade.price * trade.amount;
  263. gdax_amt += trade.amount;
  264. n_gdax += 1;
  265. }
  266. _ => {}
  267. }
  268. if n % PROGRESS_EVERY == 0 || (cfg!(debug_assertions) && n % (1024 * 96) == 0) {
  269. info!(logger, "parsing csv file";
  270. "n rows" => %n.thousands_sep(),
  271. "n written" => %n_written.thousands_sep(),
  272. "elapsed" => ?(Instant::now() - start),
  273. );
  274. }
  275. if cfg!(debug_assertions) && n > PROGRESS_EVERY {
  276. warn!(logger, "debug mode: exiting early";
  277. "n rows" => %n.thousands_sep(),
  278. "n written" => %n_written.thousands_sep(),
  279. "elapsed" => ?(Instant::now() - start),
  280. );
  281. break
  282. }
  283. }
  284. // intentionally skipping the partial hour here
  285. info!(logger, "finished parsing CSV/calculating query. closing output file");
  286. drop(wtr);
  287. Ok(n)
  288. }
  289. fn main() {
  290. let start = Instant::now();
  291. let decorator = slog_term::TermDecorator::new().stdout().force_color().build();
  292. let drain = slog_term::FullFormat::new(decorator).use_utc_timestamp().build().fuse();
  293. let drain = slog_async::Async::new(drain).chan_size(1024 * 64).thread_name("recv".into()).build().fuse();
  294. let logger = slog::Logger::root(drain, o!("version" => structopt::clap::crate_version!()));
  295. match run(start, &logger) {
  296. Ok(n) => {
  297. let took = Instant::now() - start;
  298. let took_secs = took.as_millis() as f64 / 1000.0;
  299. let took_str = format!("{}min, {:.1}sec", took.as_secs() / 60, took_secs % 60.0);
  300. info!(logger, "finished in {}", took_str;
  301. "n rows" => %n.thousands_sep(),
  302. "rows/sec" => &((per_sec(n, took) * 100.0).round() / 10.0).thousands_sep(),
  303. );
  304. }
  305. Err(e) => {
  306. crit!(logger, "run failed: {:?}", e);
  307. eprintln!("\n\nError: {}", e);
  308. std::thread::sleep(Duration::from_millis(100));
  309. std::process::exit(1);
  310. }
  311. }
  312. }