@@ -13,8 +13,8 @@ keywords = ["search", "information", "retrieval"] | |||||
license = "MIT" | license = "MIT" | ||||
[dependencies] | [dependencies] | ||||
#tantivy = { path = "../tantivy" } | |||||
tantivy = "0.1.1" | |||||
tantivy = { path = "../tantivy" } | |||||
#tantivy = "0.1.1" | |||||
time = "0.1.34" | time = "0.1.34" | ||||
iron = "0.4" | iron = "0.4" | ||||
staticfile = "0.3.0" | staticfile = "0.3.0" | ||||
@@ -24,7 +24,7 @@ clap = "2" | |||||
ansi_term = "0.8.0" | ansi_term = "0.8.0" | ||||
urlencoded = "0.4" | urlencoded = "0.4" | ||||
mount = "0.2.1" | mount = "0.2.1" | ||||
chan = "*" | |||||
# [dependencies.clap] | # [dependencies.clap] | ||||
@@ -34,4 +34,4 @@ mount = "0.2.1" | |||||
[[bin]] | [[bin]] | ||||
name = "tantivy" | name = "tantivy" | ||||
path = "src/main.rs" | |||||
path = "src/main.rs" |
@@ -57,7 +57,7 @@ fn run_bench(index_path: &Path, | |||||
println!("-------------------------------\n\n\n"); | println!("-------------------------------\n\n\n"); | ||||
let index = try!(Index::open(index_path).map_err(|e| format!("Failed to open index.\n{:?}", e))); | let index = try!(Index::open(index_path).map_err(|e| format!("Failed to open index.\n{:?}", e))); | ||||
let searcher = try!(index.searcher().map_err(|e| format!("Failed to acquire searcher.\n{:?}", e))); | |||||
let searcher = index.searcher(); | |||||
let default_search_fields: Vec<Field> = extract_search_fields(&index.schema()); | let default_search_fields: Vec<Field> = extract_search_fields(&index.schema()); | ||||
let queries = try!(read_query_file(query_filepath).map_err(|e| format!("Failed reading the query file: {}", e))); | let queries = try!(read_query_file(query_filepath).map_err(|e| format!("Failed reading the query file: {}", e))); | ||||
let query_parser = QueryParser::new(index.schema(), default_search_fields); | let query_parser = QueryParser::new(index.schema(), default_search_fields); | ||||
@@ -5,11 +5,14 @@ use std::io::BufRead; | |||||
use std::io::BufReader; | use std::io::BufReader; | ||||
use std::io::Read; | use std::io::Read; | ||||
use std::path::PathBuf; | use std::path::PathBuf; | ||||
use tantivy; | |||||
use tantivy; | |||||
use tantivy::Index; | use tantivy::Index; | ||||
use tantivy::IndexWriter; | |||||
use tantivy::Document; | |||||
use time::PreciseTime; | use time::PreciseTime; | ||||
use clap::ArgMatches; | use clap::ArgMatches; | ||||
use chan; | |||||
use std::thread; | |||||
pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> { | pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> { | ||||
let index_directory = PathBuf::from(argmatch.value_of("index").unwrap()); | let index_directory = PathBuf::from(argmatch.value_of("index").unwrap()); | ||||
@@ -22,7 +25,7 @@ pub fn run_index_cli(argmatch: &ArgMatches) -> Result<(), String> { | |||||
} | } | ||||
}; | }; | ||||
let num_threads = try!(value_t!(argmatch, "num_threads", usize).map_err(|_|format!("Failed to read num_threads argument as an integer."))); | let num_threads = try!(value_t!(argmatch, "num_threads", usize).map_err(|_|format!("Failed to read num_threads argument as an integer."))); | ||||
run_index(index_directory, document_source, num_threads).map_err(|e| format!("Indexing failed : {:?}", e)) | |||||
run_index(index_directory, document_source, num_threads).map_err(|e| format!("Indexing failed : {:?}", e)) | |||||
} | } | ||||
enum DocumentSource { | enum DocumentSource { | ||||
@@ -31,12 +34,44 @@ enum DocumentSource { | |||||
} | } | ||||
fn run_index(directory: PathBuf, document_source: DocumentSource, num_threads: usize) -> tantivy::Result<()> { | fn run_index(directory: PathBuf, document_source: DocumentSource, num_threads: usize) -> tantivy::Result<()> { | ||||
let index = try!(Index::open(&directory)); | let index = try!(Index::open(&directory)); | ||||
let schema = index.schema(); | let schema = index.schema(); | ||||
let mut index_writer = try!( | |||||
let (line_sender, line_receiver) = chan::sync(10_000); | |||||
let (doc_sender, doc_receiver) = chan::sync(10_000); | |||||
thread::spawn(move || { | |||||
let articles = document_source.read().unwrap(); | |||||
for article_line_res in articles.lines() { | |||||
let article_line = article_line_res.unwrap(); | |||||
line_sender.send(article_line); | |||||
} | |||||
}); | |||||
// using 3 threads to parse the json documents | |||||
for _ in 0..3 { | |||||
let schema_clone = schema.clone(); | |||||
let doc_sender_clone = doc_sender.clone(); | |||||
let line_receiver_clone = line_receiver.clone(); | |||||
thread::spawn(move || { | |||||
for article_line in line_receiver_clone { | |||||
match schema_clone.parse_document(&article_line) { | |||||
Ok(doc) => { | |||||
doc_sender_clone.send(doc); | |||||
} | |||||
Err(err) => { | |||||
println!("Failed to add document doc {:?}", err); | |||||
} | |||||
} | |||||
} | |||||
}); | |||||
} | |||||
drop(doc_sender); | |||||
let mut index_writer = try!( | |||||
if num_threads > 0 { | if num_threads > 0 { | ||||
index.writer_with_num_threads(num_threads) | index.writer_with_num_threads(num_threads) | ||||
} | } | ||||
@@ -44,23 +79,29 @@ fn run_index(directory: PathBuf, document_source: DocumentSource, num_threads: u | |||||
index.writer() | index.writer() | ||||
} | } | ||||
); | ); | ||||
let articles = try!(document_source.read()); | |||||
let index_result = index_documents(&mut index_writer, doc_receiver); | |||||
match index_result { | |||||
Ok(docstamp) => { | |||||
println!("Commit succeed, docstamp at {}", docstamp); | |||||
Ok(()) | |||||
} | |||||
Err(e) => { | |||||
println!("Error during indexing, rollbacking."); | |||||
index_writer.rollback().unwrap(); | |||||
println!("Rollback succeeded"); | |||||
Err(e) | |||||
} | |||||
} | |||||
} | |||||
fn index_documents(index_writer: &mut IndexWriter, doc_receiver: chan::Receiver<Document>) -> tantivy::Result<u64> { | |||||
let group_count = 100_000; | |||||
let mut num_docs = 0; | let mut num_docs = 0; | ||||
let mut cur = PreciseTime::now(); | let mut cur = PreciseTime::now(); | ||||
let group_count = 100000; | |||||
for article_line_res in articles.lines() { | |||||
let article_line = article_line_res.unwrap(); // TODO | |||||
match schema.parse_document(&article_line) { | |||||
Ok(doc) => { | |||||
index_writer.add_document(doc).unwrap(); | |||||
} | |||||
Err(err) => { | |||||
println!("Failed to add document doc {:?}", err); | |||||
} | |||||
} | |||||
for doc in doc_receiver { | |||||
try!(index_writer.add_document(doc)); | |||||
if num_docs > 0 && (num_docs % group_count == 0) { | if num_docs > 0 && (num_docs % group_count == 0) { | ||||
println!("{} Docs", num_docs); | println!("{} Docs", num_docs); | ||||
let new = PreciseTime::now(); | let new = PreciseTime::now(); | ||||
@@ -68,12 +109,9 @@ fn run_index(directory: PathBuf, document_source: DocumentSource, num_threads: u | |||||
println!("{:?} docs / hour", group_count * 3600 * 1_000_000 as u64 / (elapsed.num_microseconds().unwrap() as u64)); | println!("{:?} docs / hour", group_count * 3600 * 1_000_000 as u64 / (elapsed.num_microseconds().unwrap() as u64)); | ||||
cur = new; | cur = new; | ||||
} | } | ||||
num_docs += 1; | num_docs += 1; | ||||
} | } | ||||
index_writer.wait().unwrap(); // TODO | |||||
Ok(()) | |||||
index_writer.commit() | |||||
} | } | ||||
@@ -90,7 +128,7 @@ impl DocumentSource { | |||||
Ok(match self { | Ok(match self { | ||||
&DocumentSource::FromPipe => { | &DocumentSource::FromPipe => { | ||||
BufReader::new(Box::new(io::stdin())) | BufReader::new(Box::new(io::stdin())) | ||||
} | |||||
} | |||||
&DocumentSource::FromFile(ref filepath) => { | &DocumentSource::FromFile(ref filepath) => { | ||||
let read_file = try!(File::open(&filepath)); | let read_file = try!(File::open(&filepath)); | ||||
BufReader::new(Box::new(read_file)) | BufReader::new(Box::new(read_file)) | ||||
@@ -98,4 +136,3 @@ impl DocumentSource { | |||||
}) | }) | ||||
} | } | ||||
} | } | ||||
@@ -12,7 +12,7 @@ pub fn run_merge_cli(argmatch: &ArgMatches) -> Result<(), String> { | |||||
fn run_merge(path: PathBuf) -> tantivy::Result<()> { | fn run_merge(path: PathBuf) -> tantivy::Result<()> { | ||||
let index = try!(Index::open(&path)); | let index = try!(Index::open(&path)); | ||||
let segments = index.segments(); | |||||
let segments = try!(index.segments()); | |||||
let mut index_writer = try!(index.writer()); | let mut index_writer = try!(index.writer()); | ||||
index_writer.merge(&segments) | index_writer.merge(&segments) | ||||
} | } |
@@ -72,7 +72,7 @@ fn prompt_yn(msg: &str) -> bool { | |||||
} | } | ||||
fn ask_add_field_text(field_name: &str, schema: &mut Schema) { | |||||
fn ask_add_field_text(field_name: &str, schema_builder: &mut SchemaBuilder) { | |||||
let mut text_options = TextOptions::new(); | let mut text_options = TextOptions::new(); | ||||
if prompt_yn("Should the field be stored") { | if prompt_yn("Should the field be stored") { | ||||
text_options = text_options.set_stored(); | text_options = text_options.set_stored(); | ||||
@@ -100,11 +100,11 @@ fn ask_add_field_text(field_name: &str, schema: &mut Schema) { | |||||
TextIndexingOptions::Unindexed | TextIndexingOptions::Unindexed | ||||
}; | }; | ||||
text_options = text_options.set_indexing_options(indexing_options); | text_options = text_options.set_indexing_options(indexing_options); | ||||
schema.add_text_field(field_name, text_options); | |||||
schema_builder.add_text_field(field_name, text_options); | |||||
} | } | ||||
fn ask_add_field_u32(field_name: &str, schema: &mut Schema) { | |||||
fn ask_add_field_u32(field_name: &str, schema_builder: &mut SchemaBuilder) { | |||||
let mut u32_options = U32Options::new(); | let mut u32_options = U32Options::new(); | ||||
if prompt_yn("Should the field be stored") { | if prompt_yn("Should the field be stored") { | ||||
u32_options = u32_options.set_stored(); | u32_options = u32_options.set_stored(); | ||||
@@ -115,31 +115,32 @@ fn ask_add_field_u32(field_name: &str, schema: &mut Schema) { | |||||
if prompt_yn("Should the field be indexed") { | if prompt_yn("Should the field be indexed") { | ||||
u32_options = u32_options.set_indexed(); | u32_options = u32_options.set_indexed(); | ||||
} | } | ||||
schema.add_u32_field(field_name, u32_options); | |||||
schema_builder.add_u32_field(field_name, u32_options); | |||||
} | } | ||||
fn ask_add_field(schema: &mut Schema) { | |||||
fn ask_add_field(schema_builder: &mut SchemaBuilder) { | |||||
println!("\n\n"); | println!("\n\n"); | ||||
let field_name = prompt_input("New field name ", field_name_validate); | let field_name = prompt_input("New field name ", field_name_validate); | ||||
let text_or_integer = prompt_options("Text or unsigned 32-bit Integer", vec!('T', 'I')); | let text_or_integer = prompt_options("Text or unsigned 32-bit Integer", vec!('T', 'I')); | ||||
if text_or_integer =='T' { | if text_or_integer =='T' { | ||||
ask_add_field_text(&field_name, schema); | |||||
ask_add_field_text(&field_name, schema_builder); | |||||
} | } | ||||
else { | else { | ||||
ask_add_field_u32(&field_name, schema); | |||||
ask_add_field_u32(&field_name, schema_builder); | |||||
} | } | ||||
} | } | ||||
fn run_new(directory: PathBuf) -> tantivy::Result<()> { | fn run_new(directory: PathBuf) -> tantivy::Result<()> { | ||||
println!("\n{} ", Style::new().bold().fg(Green).paint("Creating new index")); | println!("\n{} ", Style::new().bold().fg(Green).paint("Creating new index")); | ||||
println!("{} ", Style::new().bold().fg(Green).paint("Let's define it's schema!")); | println!("{} ", Style::new().bold().fg(Green).paint("Let's define it's schema!")); | ||||
let mut schema = Schema::new(); | |||||
let mut schema_builder = SchemaBuilder::new(); | |||||
loop { | loop { | ||||
ask_add_field(&mut schema); | |||||
ask_add_field(&mut schema_builder); | |||||
if !prompt_yn("Add another field") { | if !prompt_yn("Add another field") { | ||||
break; | break; | ||||
} | } | ||||
} | } | ||||
let schema = schema_builder.build(); | |||||
let schema_json = format!("{}", json::as_pretty_json(&schema)); | let schema_json = format!("{}", json::as_pretty_json(&schema)); | ||||
println!("\n{}\n", Style::new().fg(Green).paint(schema_json)); | println!("\n{}\n", Style::new().fg(Green).paint(schema_json)); | ||||
let mut index = try!(Index::create(&directory, schema)); | let mut index = try!(Index::create(&directory, schema)); | ||||
@@ -111,7 +111,7 @@ impl IndexServer { | |||||
fn search(&self, q: String, num_hits: usize, explain: bool) -> Result<Serp> { | fn search(&self, q: String, num_hits: usize, explain: bool) -> Result<Serp> { | ||||
let query = self.query_parser.parse_query(&q).unwrap(); | let query = self.query_parser.parse_query(&q).unwrap(); | ||||
let searcher = self.index.searcher().unwrap(); | |||||
let searcher = self.index.searcher(); | |||||
let mut count_collector = CountCollector::new(); | let mut count_collector = CountCollector::new(); | ||||
let mut top_collector = TopCollector::with_limit(num_hits); | let mut top_collector = TopCollector::with_limit(num_hits); | ||||
let mut timer_tree = TimerTree::new(); | let mut timer_tree = TimerTree::new(); | ||||
@@ -7,6 +7,7 @@ extern crate time; | |||||
extern crate persistent; | extern crate persistent; | ||||
extern crate urlencoded; | extern crate urlencoded; | ||||
extern crate iron; | extern crate iron; | ||||
extern crate chan; | |||||
extern crate staticfile; | extern crate staticfile; | ||||
extern crate ansi_term; | extern crate ansi_term; | ||||
extern crate mount; | extern crate mount; | ||||
@@ -23,7 +24,7 @@ fn main() { | |||||
.value_name("directory") | .value_name("directory") | ||||
.help("Tantivy index directory filepath") | .help("Tantivy index directory filepath") | ||||
.required(true); | .required(true); | ||||
let cli_options = App::new("Tantivy") | let cli_options = App::new("Tantivy") | ||||
.setting(AppSettings::SubcommandRequiredElseHelp) | .setting(AppSettings::SubcommandRequiredElseHelp) | ||||
.version("0.1") | .version("0.1") | ||||
@@ -33,7 +34,7 @@ fn main() { | |||||
SubCommand::with_name("new") | SubCommand::with_name("new") | ||||
.about("Create a new index. The schema will be populated with a simple example schema") | .about("Create a new index. The schema will be populated with a simple example schema") | ||||
.arg(index_arg.clone()) | .arg(index_arg.clone()) | ||||
) | |||||
) | |||||
.subcommand( | .subcommand( | ||||
SubCommand::with_name("serve") | SubCommand::with_name("serve") | ||||
.about("Start a server") | .about("Start a server") | ||||
@@ -50,7 +51,7 @@ fn main() { | |||||
.help("Port") | .help("Port") | ||||
.default_value("localhost") | .default_value("localhost") | ||||
) | ) | ||||
) | |||||
) | |||||
.subcommand( | .subcommand( | ||||
SubCommand::with_name("index") | SubCommand::with_name("index") | ||||
.about("Index files") | .about("Index files") | ||||
@@ -90,11 +91,11 @@ fn main() { | |||||
.arg(index_arg.clone()) | .arg(index_arg.clone()) | ||||
) | ) | ||||
.get_matches(); | .get_matches(); | ||||
let (subcommand, some_options) = cli_options.subcommand(); | let (subcommand, some_options) = cli_options.subcommand(); | ||||
let options = some_options.unwrap(); | let options = some_options.unwrap(); | ||||
match subcommand { | match subcommand { | ||||
"new" => run_new_cli(options).unwrap(), | "new" => run_new_cli(options).unwrap(), | ||||
"index" => run_index_cli(options).unwrap(), | "index" => run_index_cli(options).unwrap(), | ||||
@@ -109,4 +110,4 @@ fn main() { | |||||
}, | }, | ||||
_ => {} | _ => {} | ||||
} | } | ||||
} | |||||
} |