diff --git a/Cargo.toml b/Cargo.toml index dbbf9f4..67a6ed7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,8 @@ keywords = ["search", "information", "retrieval"] license = "MIT" [dependencies] -#tantivy = { path = "../tantivy" } -tantivy = "0.1.1" +tantivy = { path = "../tantivy" } +#tantivy = "0.1.1" time = "0.1.34" iron = "0.4" staticfile = "0.3.0" @@ -24,7 +24,7 @@ clap = "2" ansi_term = "0.8.0" urlencoded = "0.4" mount = "0.2.1" - +chan = "*" # [dependencies.clap] @@ -34,4 +34,4 @@ mount = "0.2.1" [[bin]] name = "tantivy" -path = "src/main.rs" \ No newline at end of file +path = "src/main.rs" diff --git a/src/commands/bench.rs b/src/commands/bench.rs index 6fee3c6..4e83367 100644 --- a/src/commands/bench.rs +++ b/src/commands/bench.rs @@ -57,7 +57,7 @@ fn run_bench(index_path: &Path, println!("-------------------------------\n\n\n"); let index = try!(Index::open(index_path).map_err(|e| format!("Failed to open index.\n{:?}", e))); - let searcher = try!(index.searcher().map_err(|e| format!("Failed to acquire searcher.\n{:?}", e))); + let searcher = index.searcher(); let default_search_fields: Vec = extract_search_fields(&index.schema()); let queries = try!(read_query_file(query_filepath).map_err(|e| format!("Failed reading the query file: {}", e))); let query_parser = QueryParser::new(index.schema(), default_search_fields); diff --git a/src/commands/index.rs b/src/commands/index.rs index 35895a7..7814d65 100644 --- a/src/commands/index.rs +++ b/src/commands/index.rs @@ -5,11 +5,14 @@ use std::io::BufRead; use std::io::BufReader; use std::io::Read; use std::path::PathBuf; -use tantivy; +use tantivy; use tantivy::Index; +use tantivy::IndexWriter; +use tantivy::Document; use time::PreciseTime; use clap::ArgMatches; - +use chan; +use std::thread; pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> { let index_directory = PathBuf::from(argmatch.value_of("index").unwrap()); @@ -22,7 +25,7 @@ pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> { } }; let num_threads = try!(value_t!(argmatch, "num_threads", usize).map_err(|_|format!("Failed to read num_threads argument as an integer."))); - run_index(index_directory, document_source, num_threads).map_err(|e| format!("Indexing failed : {:?}", e)) + run_index(index_directory, document_source, num_threads).map_err(|e| format!("Indexing failed : {:?}", e)) } enum DocumentSource { @@ -31,12 +34,44 @@ enum DocumentSource { } fn run_index(directory: PathBuf, document_source: DocumentSource, num_threads: usize) -> tantivy::Result<()> { - + let index = try!(Index::open(&directory)); - let schema = index.schema(); - - let mut index_writer = try!( + let (line_sender, line_receiver) = chan::sync(10_000); + let (doc_sender, doc_receiver) = chan::sync(10_000); + + thread::spawn(move || { + let articles = document_source.read().unwrap(); + for article_line_res in articles.lines() { + let article_line = article_line_res.unwrap(); + line_sender.send(article_line); + } + }); + + // using 3 threads to parse the json documents + for _ in 0..3 { + + let schema_clone = schema.clone(); + let doc_sender_clone = doc_sender.clone(); + let line_receiver_clone = line_receiver.clone(); + + thread::spawn(move || { + for article_line in line_receiver_clone { + match schema_clone.parse_document(&article_line) { + Ok(doc) => { + doc_sender_clone.send(doc); + } + Err(err) => { + println!("Failed to add document doc {:?}", err); + } + } + } + }); + } + drop(doc_sender); + + + let mut index_writer = try!( if num_threads > 0 { index.writer_with_num_threads(num_threads) } @@ -44,23 +79,29 @@ fn run_index(directory: PathBuf, document_source: DocumentSource, num_threads: u index.writer() } ); - - let articles = try!(document_source.read()); - + + + let index_result = index_documents(&mut index_writer, doc_receiver); + match index_result { + Ok(docstamp) => { + println!("Commit succeed, docstamp at {}", docstamp); + Ok(()) + } + Err(e) => { + println!("Error during indexing, rollbacking."); + index_writer.rollback().unwrap(); + println!("Rollback succeeded"); + Err(e) + } + } +} + +fn index_documents(index_writer: &mut IndexWriter, doc_receiver: chan::Receiver) -> tantivy::Result { + let group_count = 100_000; let mut num_docs = 0; let mut cur = PreciseTime::now(); - let group_count = 100000; - - for article_line_res in articles.lines() { - let article_line = article_line_res.unwrap(); // TODO - match schema.parse_document(&article_line) { - Ok(doc) => { - index_writer.add_document(doc).unwrap(); - } - Err(err) => { - println!("Failed to add document doc {:?}", err); - } - } + for doc in doc_receiver { + try!(index_writer.add_document(doc)); if num_docs > 0 && (num_docs % group_count == 0) { println!("{} Docs", num_docs); let new = PreciseTime::now(); @@ -68,12 +109,9 @@ fn run_index(directory: PathBuf, document_source: DocumentSource, num_threads: u println!("{:?} docs / hour", group_count * 3600 * 1_000_000 as u64 / (elapsed.num_microseconds().unwrap() as u64)); cur = new; } - num_docs += 1; - } - index_writer.wait().unwrap(); // TODO - Ok(()) + index_writer.commit() } @@ -90,7 +128,7 @@ impl DocumentSource { Ok(match self { &DocumentSource::FromPipe => { BufReader::new(Box::new(io::stdin())) - } + } &DocumentSource::FromFile(ref filepath) => { let read_file = try!(File::open(&filepath)); BufReader::new(Box::new(read_file)) @@ -98,4 +136,3 @@ impl DocumentSource { }) } } - diff --git a/src/commands/merge.rs b/src/commands/merge.rs index db61e4a..4ab0770 100644 --- a/src/commands/merge.rs +++ b/src/commands/merge.rs @@ -12,7 +12,7 @@ pub fn run_merge_cli(argmatch: &ArgMatches) -> Result<(), String> { fn run_merge(path: PathBuf) -> tantivy::Result<()> { let index = try!(Index::open(&path)); - let segments = index.segments(); + let segments = try!(index.segments()); let mut index_writer = try!(index.writer()); index_writer.merge(&segments) } diff --git a/src/commands/new.rs b/src/commands/new.rs index 64ba920..51f6be9 100644 --- a/src/commands/new.rs +++ b/src/commands/new.rs @@ -72,7 +72,7 @@ fn prompt_yn(msg: &str) -> bool { } -fn ask_add_field_text(field_name: &str, schema: &mut Schema) { +fn ask_add_field_text(field_name: &str, schema_builder: &mut SchemaBuilder) { let mut text_options = TextOptions::new(); if prompt_yn("Should the field be stored") { text_options = text_options.set_stored(); @@ -100,11 +100,11 @@ fn ask_add_field_text(field_name: &str, schema: &mut Schema) { TextIndexingOptions::Unindexed }; text_options = text_options.set_indexing_options(indexing_options); - schema.add_text_field(field_name, text_options); + schema_builder.add_text_field(field_name, text_options); } -fn ask_add_field_u32(field_name: &str, schema: &mut Schema) { +fn ask_add_field_u32(field_name: &str, schema_builder: &mut SchemaBuilder) { let mut u32_options = U32Options::new(); if prompt_yn("Should the field be stored") { u32_options = u32_options.set_stored(); @@ -115,31 +115,32 @@ fn ask_add_field_u32(field_name: &str, schema: &mut Schema) { if prompt_yn("Should the field be indexed") { u32_options = u32_options.set_indexed(); } - schema.add_u32_field(field_name, u32_options); + schema_builder.add_u32_field(field_name, u32_options); } -fn ask_add_field(schema: &mut Schema) { +fn ask_add_field(schema_builder: &mut SchemaBuilder) { println!("\n\n"); let field_name = prompt_input("New field name ", field_name_validate); let text_or_integer = prompt_options("Text or unsigned 32-bit Integer", vec!('T', 'I')); if text_or_integer =='T' { - ask_add_field_text(&field_name, schema); + ask_add_field_text(&field_name, schema_builder); } else { - ask_add_field_u32(&field_name, schema); + ask_add_field_u32(&field_name, schema_builder); } } fn run_new(directory: PathBuf) -> tantivy::Result<()> { println!("\n{} ", Style::new().bold().fg(Green).paint("Creating new index")); println!("{} ", Style::new().bold().fg(Green).paint("Let's define it's schema!")); - let mut schema = Schema::new(); + let mut schema_builder = SchemaBuilder::new(); loop { - ask_add_field(&mut schema); + ask_add_field(&mut schema_builder); if !prompt_yn("Add another field") { break; } } + let schema = schema_builder.build(); let schema_json = format!("{}", json::as_pretty_json(&schema)); println!("\n{}\n", Style::new().fg(Green).paint(schema_json)); let mut index = try!(Index::create(&directory, schema)); diff --git a/src/commands/serve.rs b/src/commands/serve.rs index 921a3e0..07f4eca 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -111,7 +111,7 @@ impl IndexServer { fn search(&self, q: String, num_hits: usize, explain: bool) -> Result { let query = self.query_parser.parse_query(&q).unwrap(); - let searcher = self.index.searcher().unwrap(); + let searcher = self.index.searcher(); let mut count_collector = CountCollector::new(); let mut top_collector = TopCollector::with_limit(num_hits); let mut timer_tree = TimerTree::new(); diff --git a/src/main.rs b/src/main.rs index 56e65e9..0ce42d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ extern crate time; extern crate persistent; extern crate urlencoded; extern crate iron; +extern crate chan; extern crate staticfile; extern crate ansi_term; extern crate mount; @@ -23,7 +24,7 @@ fn main() { .value_name("directory") .help("Tantivy index directory filepath") .required(true); - + let cli_options = App::new("Tantivy") .setting(AppSettings::SubcommandRequiredElseHelp) .version("0.1") @@ -33,7 +34,7 @@ fn main() { SubCommand::with_name("new") .about("Create a new index. The schema will be populated with a simple example schema") .arg(index_arg.clone()) - ) + ) .subcommand( SubCommand::with_name("serve") .about("Start a server") @@ -50,7 +51,7 @@ fn main() { .help("Port") .default_value("localhost") ) - ) + ) .subcommand( SubCommand::with_name("index") .about("Index files") @@ -90,11 +91,11 @@ fn main() { .arg(index_arg.clone()) ) .get_matches(); - + let (subcommand, some_options) = cli_options.subcommand(); - + let options = some_options.unwrap(); - + match subcommand { "new" => run_new_cli(options).unwrap(), "index" => run_index_cli(options).unwrap(), @@ -109,4 +110,4 @@ fn main() { }, _ => {} } -} \ No newline at end of file +}