You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
5.2KB

  1. use chan;
  2. use clap::ArgMatches;
  3. use std::cmp;
  4. use std::convert::From;
  5. use std::fs::File;
  6. use std::io;
  7. use std::io::BufRead;
  8. use std::io::BufReader;
  9. use std::io::Read;
  10. use std::path::PathBuf;
  11. use std::thread;
  12. use tantivy;
  13. use tantivy::merge_policy::NoMergePolicy;
  14. use tantivy::Document;
  15. use tantivy::Index;
  16. use tantivy::IndexWriter;
  17. use time::PreciseTime;
  18. pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> {
  19. let index_directory = PathBuf::from(argmatch.value_of("index").unwrap());
  20. let document_source = argmatch
  21. .value_of("file")
  22. .map(|path| DocumentSource::FromFile(PathBuf::from(path)))
  23. .unwrap_or(DocumentSource::FromPipe);
  24. let no_merge = argmatch.is_present("nomerge");
  25. let mut num_threads = value_t!(argmatch, "num_threads", usize)
  26. .map_err(|_| format!("Failed to read num_threads argument as an integer."))?;
  27. if num_threads == 0 {
  28. num_threads = 1;
  29. }
  30. let buffer_size = value_t!(argmatch, "memory_size", usize)
  31. .map_err(|_| format!("Failed to read the buffer size argument as an integer."))?;
  32. let buffer_size_per_thread = buffer_size / num_threads;
  33. run_index(
  34. index_directory,
  35. document_source,
  36. buffer_size_per_thread,
  37. num_threads,
  38. no_merge,
  39. )
  40. .map_err(|e| format!("Indexing failed : {:?}", e))
  41. }
  42. fn run_index(
  43. directory: PathBuf,
  44. document_source: DocumentSource,
  45. buffer_size_per_thread: usize,
  46. num_threads: usize,
  47. no_merge: bool,
  48. ) -> tantivy::Result<()> {
  49. let index = Index::open_in_dir(&directory)?;
  50. let schema = index.schema();
  51. let (line_sender, line_receiver) = chan::sync(10_000);
  52. let (doc_sender, doc_receiver) = chan::sync(10_000);
  53. thread::spawn(move || {
  54. let articles = document_source.read().unwrap();
  55. for article_line_res in articles.lines() {
  56. let article_line = article_line_res.unwrap();
  57. line_sender.send(article_line);
  58. }
  59. });
  60. let num_threads_to_parse_json = cmp::max(1, num_threads / 4);
  61. info!("Using {} threads to parse json", num_threads_to_parse_json);
  62. for _ in 0..num_threads_to_parse_json {
  63. let schema_clone = schema.clone();
  64. let doc_sender_clone = doc_sender.clone();
  65. let line_receiver_clone = line_receiver.clone();
  66. thread::spawn(move || {
  67. for article_line in line_receiver_clone {
  68. match schema_clone.parse_document(&article_line) {
  69. Ok(doc) => {
  70. doc_sender_clone.send(doc);
  71. }
  72. Err(err) => {
  73. println!("Failed to add document doc {:?}", err);
  74. }
  75. }
  76. }
  77. });
  78. }
  79. drop(doc_sender);
  80. let mut index_writer = if num_threads > 0 {
  81. index.writer_with_num_threads(num_threads, buffer_size_per_thread)
  82. } else {
  83. index.writer(buffer_size_per_thread)
  84. }?;
  85. if no_merge {
  86. index_writer.set_merge_policy(Box::new(NoMergePolicy));
  87. }
  88. let start_overall = PreciseTime::now();
  89. let index_result = index_documents(&mut index_writer, doc_receiver);
  90. {
  91. let duration = start_overall.to(PreciseTime::now());
  92. info!("Indexing the documents took {} s", duration.num_seconds());
  93. }
  94. match index_result {
  95. Ok(docstamp) => {
  96. println!("Commit succeed, docstamp at {}", docstamp);
  97. println!("Waiting for merging threads");
  98. index_writer.wait_merging_threads()?;
  99. println!("Terminated successfully!");
  100. {
  101. let duration = start_overall.to(PreciseTime::now());
  102. info!(
  103. "Indexing the documents took {} s overall (indexing + merge)",
  104. duration.num_seconds()
  105. );
  106. }
  107. Ok(())
  108. }
  109. Err(e) => {
  110. println!("Error during indexing, rollbacking.");
  111. index_writer.rollback().unwrap();
  112. println!("Rollback succeeded");
  113. Err(e)
  114. }
  115. }
  116. }
  117. fn index_documents(
  118. index_writer: &mut IndexWriter,
  119. doc_receiver: chan::Receiver<Document>,
  120. ) -> tantivy::Result<u64> {
  121. let group_count = 100_000;
  122. let mut num_docs = 0;
  123. let mut cur = PreciseTime::now();
  124. for doc in doc_receiver {
  125. index_writer.add_document(doc);
  126. if num_docs > 0 && (num_docs % group_count == 0) {
  127. println!("{} Docs", num_docs);
  128. let new = PreciseTime::now();
  129. let elapsed = cur.to(new);
  130. println!(
  131. "{:?} docs / hour",
  132. group_count * 3600 * 1_000_000 as u64
  133. / (elapsed.num_microseconds().unwrap() as u64)
  134. );
  135. cur = new;
  136. }
  137. num_docs += 1;
  138. }
  139. index_writer.commit()
  140. }
  141. enum DocumentSource {
  142. FromPipe,
  143. FromFile(PathBuf),
  144. }
  145. impl DocumentSource {
  146. fn read(&self) -> io::Result<BufReader<Box<dyn Read>>> {
  147. Ok(match self {
  148. &DocumentSource::FromPipe => BufReader::new(Box::new(io::stdin())),
  149. &DocumentSource::FromFile(ref filepath) => {
  150. let read_file = File::open(&filepath)?;
  151. BufReader::new(Box::new(read_file))
  152. }
  153. })
  154. }
  155. }