From 27312df78c71d8061a4d2b4eeaa60505ff4c4b78 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 8 Aug 2016 00:45:37 +0900 Subject: [PATCH] Added commands --- Cargo.toml | 7 ++ src/commands/index.rs | 109 ++++++++++++++++++++ src/commands/merge.rs | 0 src/commands/mod.rs | 22 ++++ src/commands/new.rs | 26 +++++ src/commands/serve.rs | 163 ++++++++++++++++++++++++++++++ src/main.rs | 229 +++++++++++++----------------------------- 7 files changed, 396 insertions(+), 160 deletions(-) create mode 100644 src/commands/index.rs create mode 100644 src/commands/merge.rs create mode 100644 src/commands/mod.rs create mode 100644 src/commands/new.rs create mode 100644 src/commands/serve.rs diff --git a/Cargo.toml b/Cargo.toml index 1e83dc8..1b11524 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,16 @@ iron = "0.4" staticfile = "0.3.0" lazy_static = "*" rustc-serialize = "0.3.16" +persistent="*" +clap = "2" [dependencies.urlencoded] version = "0.4" [dependencies.mount] git = "https://github.com/iron/mount.git" + +# [dependencies.clap] +#version = "2" +#default-features = false +#features = [ "suggestions", "color" ] \ No newline at end of file diff --git a/src/commands/index.rs b/src/commands/index.rs new file mode 100644 index 0000000..a56cdae --- /dev/null +++ b/src/commands/index.rs @@ -0,0 +1,109 @@ +use rustc_serialize::json; +use rustc_serialize::json::DecodeResult; +use std::convert::From; +use std::fs::File; +use std::io; +use std::io::BufRead; +use std::io::BufReader; +use std::io::Read; +use std::path::PathBuf; +use tantivy; +use tantivy::Index; +use tantivy::schema::*; +use time::PreciseTime; +use clap::ArgMatches; + +use serialize::json; + +fn doc_from_json(schema: Schema, doc_json: &str) -> Document { + let json_it = json::from_str(doc_json).unwrap(); + let json_obj = json_it.as_object().unwrap(); + + println!() +} + +enum DocumentSource { + FromPipe, + FromFile(PathBuf), +} + +pub fn run_index_cli(argmatch: &ArgMatches) -> tantivy::Result<()> { + let index_directory = PathBuf::from(argmatch.value_of("index").unwrap()); + let document_source = { + match argmatch.value_of("file") { + Some(path) => { + DocumentSource::FromFile(PathBuf::from(path)) + } + None => DocumentSource::FromPipe, + } + }; + run_index(index_directory, document_source) +} + +fn run_index(directory: PathBuf, document_source: DocumentSource) -> tantivy::Result<()> { + + let index = try!(Index::open(&directory)); + let schema = index.schema(); + let mut index_writer = index.writer_with_num_threads(8).unwrap(); + + let articles = try!(document_source.read()); + + let mut num_docs = 0; + let mut cur = PreciseTime::now(); + let group_count = 10000; + + let title = schema.get_field("title").unwrap(); + let url = schema.get_field("url").unwrap(); + let body = schema.get_field("body").unwrap(); + + for article_line_res in articles.lines() { + let article_line = article_line_res.unwrap(); + let article_res: DecodeResult = json::decode(&article_line); + match article_res { + Ok(article) => { + let mut doc = Document::new(); + doc.add_text(title, &article.title); + doc.add_text(body, &article.body); + doc.add_text(url, &article.url); + index_writer.add_document(doc).unwrap(); + } + Err(_) => {} + } + + if num_docs > 0 && (num_docs % group_count == 0) { + println!("{} Docs", num_docs); + let new = PreciseTime::now(); + let elapsed = cur.to(new); + println!("{:?} docs / hour", group_count * 3600 * 1e6 as u64 / (elapsed.num_microseconds().unwrap() as u64)); + cur = new; + } + + num_docs += 1; + + } + index_writer.wait() +} + + +#[derive(Clone,Debug,RustcDecodable,RustcEncodable)] +pub struct WikiArticle { + pub url: String, + pub title: String, + pub body: String, +} + + +impl DocumentSource { + fn read(&self,) -> io::Result>> { + Ok(match self { + &DocumentSource::FromPipe => { + BufReader::new(Box::new(io::stdin())) + } + &DocumentSource::FromFile(ref filepath) => { + let read_file = try!(File::open(&filepath)); + BufReader::new(Box::new(read_file)) + } + }) + } +} + diff --git a/src/commands/merge.rs b/src/commands/merge.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/commands/mod.rs b/src/commands/mod.rs new file mode 100644 index 0000000..b8e973e --- /dev/null +++ b/src/commands/mod.rs @@ -0,0 +1,22 @@ +mod index; +mod serve; +mod new; + +pub use self::new::run_new_cli; +pub use self::index::run_index_cli; +pub use self::serve::run_serve_cli; + +// pub mod writer; +// pub mod searcher; +// pub mod index; +// pub mod merger; + +// mod segment_serializer; +// mod segment_writer; +// mod segment_reader; +// mod segment_id; +// mod segment_component; + +// pub use self::segment_component::SegmentComponent; +// pub use self::segment_id::SegmentId; +// pub use self::segment_reader::SegmentReader; \ No newline at end of file diff --git a/src/commands/new.rs b/src/commands/new.rs new file mode 100644 index 0000000..55758ff --- /dev/null +++ b/src/commands/new.rs @@ -0,0 +1,26 @@ +use clap::ArgMatches; +use std::convert::From; +use std::path::PathBuf; +use tantivy; +use tantivy::schema::{Schema, STRING, STORED, TEXT}; +use tantivy::Index; + +fn default_schema() -> Schema { + let mut schema = Schema::new(); + schema.add_text_field("url", STRING | STORED); + schema.add_text_field("title", TEXT | STORED); + schema.add_text_field("body", TEXT | STORED); + schema +} + +pub fn run_new_cli(matches: &ArgMatches) -> tantivy::Result<()> { + let index_directory = PathBuf::from(matches.value_of("index").unwrap()); + run_new(index_directory) +} + +fn run_new(directory: PathBuf) -> tantivy::Result<()> { + let schema = default_schema(); + let mut index = try!(Index::create(&directory, schema)); + index.save_metas() +} + diff --git a/src/commands/serve.rs b/src/commands/serve.rs new file mode 100644 index 0000000..fe65a73 --- /dev/null +++ b/src/commands/serve.rs @@ -0,0 +1,163 @@ +use clap::ArgMatches; +use iron::mime::Mime; +use iron::prelude::*; +use iron::status; +use iron::typemap::Key; +use mount::Mount; +use persistent::Read; +use rustc_serialize::json::as_pretty_json; +use staticfile::Static; +use std::convert::From; +use std::path::Path; +use std::path::PathBuf; +use tantivy; +use tantivy::collector; +use tantivy::collector::CountCollector; +use tantivy::collector::TopCollector; +use tantivy::Document; +use tantivy::Index; +use tantivy::query::Explanation; +use tantivy::query::Query; +use tantivy::query::QueryParser; +use tantivy::Result; +use tantivy::schema::Field; +use tantivy::Score; +use urlencoded::UrlEncodedQuery; + + +pub fn run_serve_cli(matches: &ArgMatches) -> tantivy::Result<()> { + let index_directory = PathBuf::from(matches.value_of("index").unwrap()); + let port = value_t!(matches, "port", u16).unwrap_or(3000u16); + let host_str = matches.value_of("host").unwrap_or("localhost"); + // let host = Ipv4Addr::from_str(&host_str).unwrap(); // TODO err management + let host = format!("{}:{}", host_str, port); + run_serve(index_directory, &host) +} + + +#[derive(RustcDecodable, RustcEncodable)] +struct Serp { + q: String, + num_hits: usize, + hits: Vec, + timings: Vec, +} + +#[derive(RustcDecodable, RustcEncodable)] +struct Hit { + title: String, + body: String, + explain: String, + score: Score, +} + +#[derive(RustcDecodable, RustcEncodable)] +struct Timing { + name: String, + duration: i64, +} + +struct IndexServer { + index: Index, + query_parser: QueryParser, + body_field: Field, + title_field: Field, +} + +impl IndexServer { + + fn load(path: &Path) -> IndexServer { + let index = Index::open(path).unwrap(); + let schema = index.schema(); + let body_field = schema.get_field("body").unwrap(); + let title_field = schema.get_field("title").unwrap(); + let query_parser = QueryParser::new(schema, vec!(body_field, title_field)); + IndexServer { + index: index, + query_parser: query_parser, + title_field: title_field, + body_field: body_field, + } + } + + fn create_hit(&self, doc: &Document, explain: Explanation) -> Hit { + Hit { + title: String::from(doc.get_first(self.title_field).unwrap().text()), + body: String::from(doc.get_first(self.body_field).unwrap().text().clone()), + explain: format!("{:?}", explain), + score: explain.val(), + } + } + + fn search(&self, q: String) -> Result { + let query = self.query_parser.parse_query(&q).unwrap(); + let searcher = self.index.searcher().unwrap(); + let mut count_collector = CountCollector::new(); + let mut top_collector = TopCollector::with_limit(10); + + { + let mut chained_collector = collector::chain() + .add(&mut top_collector) + .add(&mut count_collector); + try!(query.search(&searcher, &mut chained_collector)); + } + let hits: Vec = top_collector.docs() + .iter() + .map(|doc_address| { + let doc: Document = searcher.doc(doc_address).unwrap(); + let explanation = query.explain(&searcher, doc_address).unwrap(); + self.create_hit(&doc, explanation) + }) + .collect(); + Ok(Serp { + q: q, + hits: hits, + num_hits: count_collector.count(), + timings: Vec::new(), + }) + } +} + +impl Key for IndexServer { + type Value = IndexServer; +} + +fn search(req: &mut Request) -> IronResult { + let index_server = req.get::>().unwrap(); + match req.get_ref::() { + Ok(ref qs_map) => { + match qs_map.get("q") { + Some(qs) => { + let query = qs[0].clone(); + let serp = index_server.search(query).unwrap(); + let resp_json = as_pretty_json(&serp).indent(4); + let content_type = "application/json".parse::().unwrap(); + Ok( + Response::with((content_type, status::Ok, format!("{}", resp_json))) + ) + } + None => { + Ok(Response::with((status::BadRequest, "Query not defined"))) + } + } + } + Err(_) => Ok(Response::with((status::BadRequest, "Failed to parse query string"))) + } +} + + +fn run_serve(directory: PathBuf, host: &str) -> tantivy::Result<()> { + let mut mount = Mount::new(); + let server = IndexServer::load(&directory); + + mount.mount("/api", search); + mount.mount("/", Static::new(Path::new("static/"))); + + let mut middleware = Chain::new(mount); + middleware.link(Read::::both(server)); + + println!("listening on http://{}", host); + Iron::new(middleware).http(host).unwrap(); + Ok(()) +} + diff --git a/src/main.rs b/src/main.rs index 586b610..68bd1ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,170 +1,79 @@ -extern crate tantivy; -extern crate time; -extern crate urlencoded; +#[macro_use] +extern crate clap; #[macro_use] extern crate lazy_static; extern crate rustc_serialize; +extern crate tantivy; +extern crate time; +// extern crate regex; +extern crate persistent; +extern crate urlencoded; extern crate iron; extern crate staticfile; extern crate mount; -use tantivy::schema::Field; -use tantivy::collector::CountCollector; -use tantivy::Index; -use std::convert::From; -use time::PreciseTime; -use tantivy::collector; -use urlencoded::UrlEncodedQuery; -use iron::status; -use rustc_serialize::json::as_pretty_json; -use std::path::Path; -use staticfile::Static; -use iron::mime::Mime; -use mount::Mount; -use tantivy::query::Query; -use tantivy::query::QueryParser; -use tantivy::Document; -use tantivy::collector::TopCollector; -use iron::prelude::*; +use clap::{AppSettings, Arg, App, SubCommand}; +mod commands; +use self::commands::*; -#[derive(RustcDecodable, RustcEncodable)] -struct Serp { - query: String, - num_hits: usize, - hits: Vec, - timings: Vec, -} - -#[derive(RustcDecodable, RustcEncodable)] -struct Hit { - title: String, - body: String, -} - -lazy_static! { - static ref INDEX_SERVER: IndexServer = { - IndexServer::load(&Path::new("/data/wiki-index/")) - }; -} - -struct IndexServer { - index: Index, - query_parser: QueryParser, - body_field: Field, - title_field: Field, -} - -impl IndexServer { - fn load(path: &Path) -> IndexServer { - let index = Index::open(path).unwrap(); - let schema = index.schema(); - let body_field = schema.get_field("body").unwrap(); - let title_field = schema.get_field("title").unwrap(); - let query_parser = QueryParser::new(schema, vec!(body_field, title_field)); - IndexServer { - index: index, - query_parser: query_parser, - title_field: title_field, - body_field: body_field, - } - } - - fn create_hit(&self, doc: &Document) -> Hit { - Hit { - title: String::from(doc.get_first(self.title_field).unwrap().text()), - body: String::from(doc.get_first(self.body_field).unwrap().text().clone()), - - } - } -} - -struct TimingStarted { - name: String, - start: PreciseTime, -} - -impl TimingStarted { - fn new(name: &str) -> TimingStarted { - TimingStarted { - name: String::from(name), - start: PreciseTime::now(), - } - } - - fn stop(self) -> Timing { - let stop = PreciseTime::now(); - Timing { - name: self.name, - duration: self.start.to(stop).num_microseconds().unwrap(), - } - } -} - -#[derive(RustcDecodable, RustcEncodable)] -struct Timing { - name: String, - duration: i64, -} - - -fn search(req: &mut Request) -> IronResult { - let mut timings = Vec::new(); - match req.get_ref::() { - Ok(ref qs_map) => { - match qs_map.get("q") { - Some(qs) => { - let query = qs[0].clone(); - let parsed_query = INDEX_SERVER.query_parser.parse_query(&query).unwrap(); - let search_timing = TimingStarted::new("search"); - let searcher = INDEX_SERVER.index.searcher().unwrap(); - - let mut count_collector = CountCollector::new(); - let mut top_collector = TopCollector::with_limit(30); - - { - let mut chained_collector = collector::chain() - .add(&mut top_collector) - .add(&mut count_collector); - let timings = parsed_query.search(&searcher, &mut chained_collector).unwrap(); - println!("{:?}", timings); - } - timings.push(search_timing.stop()); - let storage_timing = TimingStarted::new("store"); - // for scored_doc in top_collector.score_docs() { - // println!("{:?}", scored_doc); - // } - let hits: Vec = top_collector - .docs() - .iter() - .map(|doc_address| searcher.doc(doc_address).unwrap()) - .map(|doc|INDEX_SERVER.create_hit(&doc) ) - .collect(); - timings.push(storage_timing.stop()); - let response = Serp { - query: query, - hits: hits, - num_hits: count_collector.count(), - timings: timings, - }; - let resp_json = as_pretty_json(&response).indent(4); - let content_type = "application/json".parse::().unwrap(); - Ok( - Response::with((content_type, status::Ok, format!("{}", resp_json))) - ) - } - None => { - Ok(Response::with((status::BadRequest, "Query not defined"))) - } - } - } - Err(_) => Ok(Response::with((status::BadRequest, "Failed to parse query string"))) - } -} fn main() { - let mut mount = Mount::new(); - mount.mount("/api", search); - mount.mount("/", Static::new(Path::new("static/"))); - println!("Running on 3000"); - Iron::new(mount).http("127.0.0.1:3000").unwrap(); -} + + let index_arg = Arg::with_name("index") + .short("i") + .long("index") + .value_name("directory") + .help("Tantivy index directory filepath") + .required(true); + + let cli_options = App::new("Tantivy") + .setting(AppSettings::SubcommandRequiredElseHelp) + .version("0.1") + .author("Paul Masurel ") + .about("Tantivy Search Engine's command line interface.") + .subcommand( + SubCommand::with_name("new") + .about("Create a new index. The schema will be populated with a simple example schema") + .arg(index_arg.clone()) + ) + .subcommand( + SubCommand::with_name("serve") + .about("Start a server") + .arg(index_arg.clone()) + .arg(Arg::with_name("host") + .long("host") + .value_name("host") + .help("host to listen to") + ) + .arg(Arg::with_name("port") + .short("p") + .long("port") + .value_name("port") + .help("Port") + .default_value("localhost") + ) + ) + .subcommand( + SubCommand::with_name("index") + .about("Index files") + .arg(index_arg.clone()) + .arg(Arg::with_name("file") + .short("f") + .long("file") + .value_name("file") + .help("File containing the documents to index.") + )) + .get_matches(); + + let (subcommand, some_options) = cli_options.subcommand(); + + let options = some_options.unwrap(); + + match subcommand { + // "serve" => run_serve(options), + "new" => run_new_cli(options).unwrap(), + "index" => run_index_cli(options).unwrap(), + "serve" => run_serve_cli(options).unwrap(), + _ => {} + } +} \ No newline at end of file