You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
4.4KB

  1. use std::convert::From;
  2. use std::fs::File;
  3. use std::io;
  4. use std::cmp;
  5. use std::io::BufRead;
  6. use std::io::BufReader;
  7. use std::io::Read;
  8. use std::path::PathBuf;
  9. use tantivy;
  10. use tantivy::Index;
  11. use tantivy::IndexWriter;
  12. use tantivy::Document;
  13. use time::PreciseTime;
  14. use clap::ArgMatches;
  15. use chan;
  16. use std::thread;
  17. pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> {
  18. let index_directory = PathBuf::from(argmatch.value_of("index").unwrap());
  19. let document_source = {
  20. match argmatch.value_of("file") {
  21. Some(path) => {
  22. DocumentSource::FromFile(PathBuf::from(path))
  23. }
  24. None => DocumentSource::FromPipe,
  25. }
  26. };
  27. let mut num_threads = try!(value_t!(argmatch, "num_threads", usize).map_err(|_|format!("Failed to read num_threads argument as an integer.")));
  28. if num_threads == 0 {
  29. num_threads = 1;
  30. }
  31. let buffer_size = try!(value_t!(argmatch, "memory_size", usize).map_err(|_|format!("Failed to read the buffer size argument as an integer.")));
  32. let buffer_size_per_thread = buffer_size / num_threads;
  33. run_index(index_directory, document_source, buffer_size_per_thread, num_threads).map_err(|e| format!("Indexing failed : {:?}", e))
  34. }
  35. fn run_index(directory: PathBuf, document_source: DocumentSource, buffer_size_per_thread: usize, num_threads: usize) -> tantivy::Result<()> {
  36. let index = try!(Index::open(&directory));
  37. let schema = index.schema();
  38. let (line_sender, line_receiver) = chan::sync(10_000);
  39. let (doc_sender, doc_receiver) = chan::sync(10_000);
  40. thread::spawn(move || {
  41. let articles = document_source.read().unwrap();
  42. for article_line_res in articles.lines() {
  43. let article_line = article_line_res.unwrap();
  44. line_sender.send(article_line);
  45. }
  46. });
  47. let num_threads_to_parse_json = cmp::max(1, num_threads / 2);
  48. info!("Using {} threads to parse json", num_threads_to_parse_json);
  49. for _ in 0..num_threads_to_parse_json {
  50. let schema_clone = schema.clone();
  51. let doc_sender_clone = doc_sender.clone();
  52. let line_receiver_clone = line_receiver.clone();
  53. thread::spawn(move || {
  54. for article_line in line_receiver_clone {
  55. match schema_clone.parse_document(&article_line) {
  56. Ok(doc) => {
  57. doc_sender_clone.send(doc);
  58. }
  59. Err(err) => {
  60. println!("Failed to add document doc {:?}", err);
  61. }
  62. }
  63. }
  64. });
  65. }
  66. drop(doc_sender);
  67. let mut index_writer = try!(
  68. if num_threads > 0 {
  69. index.writer_with_num_threads(num_threads, buffer_size_per_thread)
  70. }
  71. else {
  72. index.writer(buffer_size_per_thread)
  73. }
  74. );
  75. let index_result = index_documents(&mut index_writer, doc_receiver);
  76. try!(match index_result {
  77. Ok(docstamp) => {
  78. println!("Commit succeed, docstamp at {}", docstamp);
  79. Ok(())
  80. }
  81. Err(e) => {
  82. println!("Error during indexing, rollbacking.");
  83. index_writer.rollback().unwrap();
  84. println!("Rollback succeeded");
  85. Err(e)
  86. }
  87. });
  88. index_writer.wait_merging_threads()
  89. }
  90. fn index_documents(index_writer: &mut IndexWriter, doc_receiver: chan::Receiver<Document>) -> tantivy::Result<u64> {
  91. let group_count = 100_000;
  92. let mut num_docs = 0;
  93. let mut cur = PreciseTime::now();
  94. for doc in doc_receiver {
  95. try!(index_writer.add_document(doc));
  96. if num_docs > 0 && (num_docs % group_count == 0) {
  97. println!("{} Docs", num_docs);
  98. let new = PreciseTime::now();
  99. let elapsed = cur.to(new);
  100. println!("{:?} docs / hour", group_count * 3600 * 1_000_000 as u64 / (elapsed.num_microseconds().unwrap() as u64));
  101. cur = new;
  102. }
  103. num_docs += 1;
  104. }
  105. index_writer.commit()
  106. }
  107. enum DocumentSource {
  108. FromPipe,
  109. FromFile(PathBuf),
  110. }
  111. impl DocumentSource {
  112. fn read(&self,) -> io::Result<BufReader<Box<Read>>> {
  113. Ok(match self {
  114. &DocumentSource::FromPipe => {
  115. BufReader::new(Box::new(io::stdin()))
  116. }
  117. &DocumentSource::FromFile(ref filepath) => {
  118. let read_file = try!(File::open(&filepath));
  119. BufReader::new(Box::new(read_file))
  120. }
  121. })
  122. }
  123. }