You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

377 lines
13KB

  1. #![allow(unused_imports)]
  2. #![allow(unused_labels)]
  3. use std::str::FromStr;
  4. use std::time::{Instant, Duration};
  5. use std::{fs, io};
  6. use std::io::prelude::*;
  7. use std::str::from_utf8;
  8. use std::error::Error;
  9. use std::f64::NAN;
  10. use serde::{Serialize, Deserialize};
  11. use itertools_num::linspace;
  12. use std::collections::HashMap as Map;
  13. const N: usize = 128;
  14. const LOGSPACE: [i64; 128] =
  15. [-2134300000000, -1854700000000, -1611800000000, -1400600000000,
  16. -1217200000000, -1057700000000, -919200000000, -798800000000,
  17. -694100000000, -603200000000, -524200000000, -455500000000,
  18. -395800000000, -344000000000, -298900000000, -259700000000,
  19. -225700000000, -196100000000, -170400000000, -148100000000,
  20. -128700000000, -111800000000, -97200000000, -84400000000,
  21. -73400000000, -63800000000, -55400000000, -48100000000,
  22. -41800000000, -36300000000, -31600000000, -27400000000,
  23. -23800000000, -20700000000, -18000000000, -15600000000,
  24. -13600000000, -11800000000, -10200000000, -8900000000,
  25. -7700000000, -6700000000, -5800000000, -5000000000,
  26. -4400000000, -3800000000, -3300000000, -2900000000,
  27. -2500000000, -2100000000, -1900000000, -1600000000,
  28. -1400000000, -1200000000, -1000000000, -900000000,
  29. -800000000, -700000000, -600000000, -500000000,
  30. -400000000, -300000000, -200000000, -100000000,
  31. 100000000, 200000000, 300000000, 400000000,
  32. 500000000, 600000000, 700000000, 800000000,
  33. 900000000, 1000000000, 1200000000, 1400000000,
  34. 1600000000, 1900000000, 2100000000, 2500000000,
  35. 2900000000, 3300000000, 3800000000, 4400000000,
  36. 5000000000, 5800000000, 6700000000, 7700000000,
  37. 8900000000, 10200000000, 11800000000, 13600000000,
  38. 15600000000, 18000000000, 20700000000, 23800000000,
  39. 27400000000, 31600000000, 36300000000, 41800000000,
  40. 48100000000, 55400000000, 63800000000, 73400000000,
  41. 84400000000, 97200000000, 111800000000, 128700000000,
  42. 148100000000, 170400000000, 196100000000, 225700000000,
  43. 259700000000, 298900000000, 344000000000, 395800000000,
  44. 455500000000, 524200000000, 603200000000, 694100000000,
  45. 798800000000, 919200000000, 1057700000000, 1217200000000,
  46. 1400600000000, 1611800000000, 1854700000000, 2134300000000];
  47. #[derive(Deserialize)]
  48. struct Trade {
  49. pub time: i64,
  50. pub price: f64,
  51. pub amount: f64,
  52. }
  53. /// Use this to deserialize just the time column on the first pass through
  54. /// the events file.
  55. #[derive(Deserialize)]
  56. struct EventTime {
  57. pub time: i64,
  58. }
  59. struct Event {
  60. pub time: i64,
  61. pub data: Vec<f64>,
  62. }
  63. pub fn seconds(d: Duration) -> f64 {
  64. d.as_secs() as f64 + (d.subsec_nanos() as f64 / 1_000_000_000_f64)
  65. }
  66. fn main() -> Result<(), Box<dyn Error>> {
  67. let start = Instant::now();
  68. let args: clap::ArgMatches = clap::App::new("time-explorer")
  69. .version("0.1")
  70. .arg(clap::Arg::with_name("trades")
  71. .long("trades-csv")
  72. .short("t")
  73. .help("Path of csv with time (integer nanoseconds timestamp), \
  74. price (f64), and amount (f64) columns.")
  75. .takes_value(true)
  76. .required(true))
  77. .arg(clap::Arg::with_name("events")
  78. .long("events-csv")
  79. .short("e")
  80. .help("Path of csv file with a time (integer nanoseconds timestamp) as column 0, \
  81. along with any other metadata columns that will be included in results")
  82. .takes_value(true)
  83. .required(true))
  84. .arg(clap::Arg::with_name("output")
  85. .long("output-file")
  86. .short("o")
  87. .help("Path to save results csv to")
  88. .takes_value(true)
  89. .required(true))
  90. .arg(clap::Arg::with_name("verbose")
  91. .long("verbose")
  92. .short("v"))
  93. .arg(clap::Arg::with_name("n-periods")
  94. .long("n-periods")
  95. .short("n")
  96. .help("Controls how many time buckets are evaluated")
  97. .takes_value(true)
  98. .default_value("50"))
  99. .get_matches();
  100. let verbose = args.is_present("verbose");
  101. if verbose { println!("{:>8.2}s reading...", seconds(Instant::now() - start)); }
  102. let trades_csv = args.value_of("trades").unwrap();
  103. let events_csv = args.value_of("events").unwrap();
  104. let output = args.value_of("output").unwrap();
  105. let n: &str = args.value_of("n-periods").unwrap();
  106. let n: usize = usize::from_str(n)?;
  107. let trades_csv =
  108. fs::OpenOptions::new()
  109. .read(true)
  110. .open(trades_csv)?;
  111. let mut times: Vec<i64> = Vec::with_capacity(8192);
  112. let mut amounts: Vec<f64> = Vec::with_capacity(8192);
  113. let mut totals: Vec<f64> = Vec::with_capacity(8192);
  114. #[cfg(feature = "super-fast-csv-parsing")]
  115. {
  116. // lookout below! MANY unwraps in here
  117. // note: this code NOT part of original time-explorer. this code is what
  118. // I was referring to in the "fine print" note where it says "With 10
  119. // minutes work (knowing what I know today), I was able to get CSV parsing
  120. // down to 3.46sec"
  121. let mut rdr = csv::Reader::from_reader(io::BufReader::new(rdr));
  122. let headers = rdr.byte_headers().unwrap().clone();
  123. let mut row = csv::ByteRecord::new();
  124. let mut col_index: [usize; 3] = [
  125. headers.iter().position(|x| x == b"time").unwrap(),
  126. headers.iter().position(|x| x == b"amount").unwrap(),
  127. headers.iter().position(|x| x == b"price").unwrap(),
  128. ];
  129. while rdr.read_byte_record(&mut row).unwrap() {
  130. times.push(atoi::atoi(row.get(col_index[0]).unwrap()).unwrap());
  131. let amount: f64 = lexical::parse(row.get(col_index[1]).unwrap()).unwrap();
  132. let price: f64 = lexical::parse(row.get(col_index[2]).unwrap()).unwrap();
  133. totals.push(price * amount);
  134. amounts.push(amount);
  135. }
  136. }
  137. #[cfg(not(feature = "super-fast-csv-parsing"))]
  138. {
  139. // this is what was originally in time-explorer
  140. let mut trades: Vec<Trade> =
  141. csv::Reader::from_reader(trades_csv)
  142. .deserialize()
  143. .map(|x| x.unwrap())
  144. .collect();
  145. trades.sort_by_key(|k| k.time);
  146. for Trade { time, price, amount } in trades {
  147. times.push(time);
  148. totals.push(price * amount);
  149. amounts.push(amount);
  150. }
  151. }
  152. if verbose { println!("{:>8.2}s finished parsing trades csv (times.len() = {}) ...", seconds(Instant::now() - start), times.len()); }
  153. let mut events: Vec<Event> = {
  154. let events_csv =
  155. fs::OpenOptions::new()
  156. .read(true)
  157. .open(events_csv)?;
  158. csv::Reader::from_reader(events_csv)
  159. .deserialize()
  160. .map(|t: Result<EventTime, _>| {
  161. let EventTime { time } = t.unwrap();
  162. //let data = [0.0; N - 1];
  163. let data = vec![0.0; n - 1];
  164. Event { time, data }
  165. }).collect()
  166. };
  167. assert!(!events.is_empty());
  168. events.sort_by_key(|k| k.time);
  169. let mut cursor: usize = 0;
  170. let mut truncate_events = None;
  171. let buckets: Vec<i64> =
  172. linspace(LOGSPACE[0] as f64, LOGSPACE[N - 1] as f64, n)
  173. .map(|x| x as i64)
  174. .collect();
  175. if verbose { println!("{:>8.2}s calculating...", seconds(Instant::now() - start)); }
  176. let mut n_incomplete_buckets = 0;
  177. let mut n_skipped_buckets = 0;
  178. let mut n_time_buckets = 0;
  179. 'a: for (i, event) in events.iter_mut().enumerate() {
  180. let mut min_time: i64 = event.time + buckets[0];
  181. let mut max_time: i64 = event.time + buckets[1];
  182. 'oops: while times[cursor] > min_time && cursor > 0 { cursor -= 1; }
  183. n_incomplete_buckets += (times[cursor] > min_time) as usize;
  184. n_skipped_buckets += (times[cursor] > max_time) as usize;
  185. // find the beginning if there are gaps
  186. 'b: while times[cursor] < min_time {
  187. if cursor >= times.len() - 1 {
  188. truncate_events = Some(i);
  189. break 'a
  190. } else {
  191. cursor += 1
  192. }
  193. }
  194. let mut j: usize = cursor;
  195. 'c: for k in 0..(n - 2) {
  196. let mut wsum: f64 = 0.0;
  197. let mut w: f64 = 0.0;
  198. 'd: while j < times.len() - 1 && times[j] < max_time {
  199. wsum += totals[j];
  200. w += amounts[j];
  201. j += 1;
  202. }
  203. event.data[k] = if w > 0.0 { wsum / w } else { NAN };
  204. min_time = max_time;
  205. max_time = event.time + buckets[k + 2];
  206. n_time_buckets += 1;
  207. }
  208. if i % 512 == 0 {
  209. assert!(max_time > min_time);
  210. if verbose {
  211. //let n_nan = event.data.iter().filter(|x| !x.is_finite()).count();
  212. println!("{:>8.2}s No. {:>5} {:>12.2}, {:>12.2}, {:>12.2} ...", //, {:>12.2}, {:>12.2}, {:>12.2} ...",
  213. //cursor={}, j={}, times[cursor]={}, n_nan={}, max_time-min_time={}",
  214. seconds(Instant::now() - start), i,
  215. event.data[0], event.data[20], event.data[40]); //, event.data[60], event.data[80], event.data[100]);
  216. //min_time, max_time, cursor,
  217. //j, times[cursor], n_nan, max_time-min_time);
  218. }
  219. }
  220. }
  221. assert!(truncate_events.is_none()); // for now
  222. if verbose { println!("{:>8.2} writing... (n_time_buckets={}, n_incomplete_buckets={}, n_skipped_buckets={})", seconds(Instant::now() - start), n_time_buckets, n_incomplete_buckets, n_skipped_buckets); }
  223. // we have to read this again because I could not figure out ownership problems
  224. let events_csv =
  225. fs::OpenOptions::new()
  226. .read(true)
  227. .open(events_csv)?;
  228. let mut events_csv = csv::Reader::from_reader(events_csv);
  229. let output_csv =
  230. fs::OpenOptions::new()
  231. .write(true)
  232. .create(true)
  233. .truncate(true)
  234. .open(output)?;
  235. let mut wtr = csv::Writer::from_writer(output_csv);
  236. let data_cols: Vec<i64> = {
  237. let mut xs = vec![0; n - 1];
  238. for i in 0..(n - 1) {
  239. xs[i] = (buckets[i] + buckets[i + 1]) / 2;
  240. }
  241. xs
  242. };
  243. {
  244. let headers = events_csv.byte_headers()?;
  245. for col in headers.iter() {
  246. wtr.write_field(col)?;
  247. }
  248. for col in data_cols.iter() {
  249. wtr.write_field(&format!("{}", col))?;
  250. }
  251. wtr.write_record(None::<&[u8]>)?;
  252. }
  253. let mut record = csv::ByteRecord::new();
  254. for event in events {
  255. if !events_csv.read_byte_record(&mut record)? { panic!("failed to read from events csv") }
  256. for meta in record.iter() {
  257. wtr.write_field(meta)?;
  258. }
  259. for val in event.data.iter() {
  260. wtr.write_field(&format!("{}", val))?;
  261. }
  262. wtr.write_record(None::<&[u8]>)?;
  263. }
  264. if verbose { println!("{:>8.2} finished.", seconds(Instant::now() - start)); }
  265. Ok(())
  266. }
  267. /*
  268. def to_tframe(version, df, trades, start):
  269. d = {'bid': {}, 'ask': {}}
  270. cursor = 0
  271. n = 0
  272. n_periods = 40
  273. xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos
  274. mask = df['version'] == version
  275. #my_trades = sorted(list(zip(df.loc[mask].index, df.loc[mask, 'side'], df.loc[mask, 'gid'])))
  276. my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid'])))
  277. #idx = trades.index
  278. idx = trades.index.values.astype(np.int64)
  279. amts = trades['amount']
  280. totals = trades['total']
  281. assert len(idx) == len(amts)
  282. assert len(idx) == len(totals)
  283. for tm, side, gid in my_trades:
  284. print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor)
  285. #min_time = tm + timedelta(milliseconds=xs[0])
  286. #max_time = tm + timedelta(milliseconds=xs[1])
  287. min_time = tm + xs[0]
  288. max_time = tm + xs[1]
  289. if idx[cursor] > min_time:
  290. print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time)
  291. while idx[cursor] > min_time and cursor > 0:
  292. cursor -= 1
  293. else:
  294. while idx[cursor] < min_time and cursor < len(idx) - 1:
  295. cursor += 1
  296. i = 1
  297. j = cursor
  298. d[side][gid] = {}
  299. while i < len(xs) - 1:
  300. wsum = 0.0
  301. w = 0.0
  302. while idx[j] < max_time:
  303. wsum += totals[j]
  304. w += amts[j]
  305. j += 1
  306. if w > 0.0:
  307. d[side][gid][xs[i]] = wsum / w
  308. else:
  309. d[side][gid][xs[i]] = np.nan
  310. i += 1
  311. min_time = max_time
  312. #max_time = tm + timedelta(milliseconds=xs[i])
  313. max_time = tm + xs[i]
  314. n += 1
  315. d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index'))
  316. d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index'))
  317. #yield (version, d)
  318. return d
  319. */