You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
5.1KB

  1. use std::convert::From;
  2. use std::fs::File;
  3. use std::io;
  4. use std::cmp;
  5. use std::io::BufRead;
  6. use std::io::BufReader;
  7. use std::io::Read;
  8. use std::path::PathBuf;
  9. use tantivy;
  10. use tantivy::Index;
  11. use tantivy::IndexWriter;
  12. use tantivy::Document;
  13. use time::PreciseTime;
  14. use clap::ArgMatches;
  15. use chan;
  16. use tantivy::merge_policy::NoMergePolicy;
  17. use std::thread;
  18. pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> {
  19. let index_directory = PathBuf::from(argmatch.value_of("index").unwrap());
  20. let document_source = argmatch.value_of("file")
  21. .map(|path| DocumentSource::FromFile(PathBuf::from(path)))
  22. .unwrap_or(DocumentSource::FromPipe);
  23. let no_merge = argmatch.is_present("nomerge");
  24. let mut num_threads = try!(value_t!(argmatch, "num_threads", usize).map_err(|_|format!("Failed to read num_threads argument as an integer.")));
  25. if num_threads == 0 {
  26. num_threads = 1;
  27. }
  28. let buffer_size = try!(value_t!(argmatch, "memory_size", usize).map_err(|_|format!("Failed to read the buffer size argument as an integer.")));
  29. let buffer_size_per_thread = buffer_size / num_threads;
  30. run_index(index_directory, document_source, buffer_size_per_thread, num_threads, no_merge).map_err(|e| format!("Indexing failed : {:?}", e))
  31. }
  32. fn run_index(directory: PathBuf,
  33. document_source: DocumentSource,
  34. buffer_size_per_thread: usize,
  35. num_threads: usize,
  36. no_merge: bool) -> tantivy::Result<()> {
  37. let index = try!(Index::open(&directory));
  38. let schema = index.schema();
  39. let (line_sender, line_receiver) = chan::sync(10_000);
  40. let (doc_sender, doc_receiver) = chan::sync(10_000);
  41. thread::spawn(move || {
  42. let articles = document_source.read().unwrap();
  43. for article_line_res in articles.lines() {
  44. let article_line = article_line_res.unwrap();
  45. line_sender.send(article_line);
  46. }
  47. });
  48. let num_threads_to_parse_json = cmp::max(1, num_threads / 4);
  49. info!("Using {} threads to parse json", num_threads_to_parse_json);
  50. for _ in 0..num_threads_to_parse_json {
  51. let schema_clone = schema.clone();
  52. let doc_sender_clone = doc_sender.clone();
  53. let line_receiver_clone = line_receiver.clone();
  54. thread::spawn(move || {
  55. for article_line in line_receiver_clone {
  56. match schema_clone.parse_document(&article_line) {
  57. Ok(doc) => {
  58. doc_sender_clone.send(doc);
  59. }
  60. Err(err) => {
  61. println!("Failed to add document doc {:?}", err);
  62. }
  63. }
  64. }
  65. });
  66. }
  67. drop(doc_sender);
  68. let mut index_writer = try!(
  69. if num_threads > 0 {
  70. index.writer_with_num_threads(num_threads, buffer_size_per_thread)
  71. }
  72. else {
  73. index.writer(buffer_size_per_thread)
  74. }
  75. );
  76. if no_merge {
  77. index_writer.set_merge_policy(Box::new(NoMergePolicy));
  78. }
  79. let start_overall = PreciseTime::now();
  80. let index_result = index_documents(&mut index_writer, doc_receiver);
  81. {
  82. let duration = start_overall.to(PreciseTime::now());
  83. info!("Indexing the documents took {} s", duration.num_seconds());
  84. }
  85. match index_result {
  86. Ok(docstamp) => {
  87. println!("Commit succeed, docstamp at {}", docstamp);
  88. println!("Waiting for merging threads");
  89. index_writer.wait_merging_threads()?;
  90. println!("Terminated successfully!");
  91. {
  92. let duration = start_overall.to(PreciseTime::now());
  93. info!("Indexing the documents took {} s overall (indexing + merge)", duration.num_seconds());
  94. }
  95. Ok(())
  96. }
  97. Err(e) => {
  98. println!("Error during indexing, rollbacking.");
  99. index_writer.rollback().unwrap();
  100. println!("Rollback succeeded");
  101. Err(e)
  102. }
  103. }
  104. }
  105. fn index_documents(index_writer: &mut IndexWriter, doc_receiver: chan::Receiver<Document>) -> tantivy::Result<u64> {
  106. let group_count = 100_000;
  107. let mut num_docs = 0;
  108. let mut cur = PreciseTime::now();
  109. for doc in doc_receiver {
  110. index_writer.add_document(doc);
  111. if num_docs > 0 && (num_docs % group_count == 0) {
  112. println!("{} Docs", num_docs);
  113. let new = PreciseTime::now();
  114. let elapsed = cur.to(new);
  115. println!("{:?} docs / hour", group_count * 3600 * 1_000_000 as u64 / (elapsed.num_microseconds().unwrap() as u64));
  116. cur = new;
  117. }
  118. num_docs += 1;
  119. }
  120. index_writer.commit()
  121. }
  122. enum DocumentSource {
  123. FromPipe,
  124. FromFile(PathBuf),
  125. }
  126. impl DocumentSource {
  127. fn read(&self,) -> io::Result<BufReader<Box<Read>>> {
  128. Ok(match self {
  129. &DocumentSource::FromPipe => {
  130. BufReader::new(Box::new(io::stdin()))
  131. }
  132. &DocumentSource::FromFile(ref filepath) => {
  133. let read_file = try!(File::open(&filepath));
  134. BufReader::new(Box::new(read_file))
  135. }
  136. })
  137. }
  138. }