Browse Source

Added commands

develop
Paul Masurel 7 years ago
parent
commit
27312df78c
7 changed files with 396 additions and 160 deletions
  1. +7
    -0
      Cargo.toml
  2. +109
    -0
      src/commands/index.rs
  3. +0
    -0
      src/commands/merge.rs
  4. +22
    -0
      src/commands/mod.rs
  5. +26
    -0
      src/commands/new.rs
  6. +163
    -0
      src/commands/serve.rs
  7. +69
    -160
      src/main.rs

+ 7
- 0
Cargo.toml View File

@@ -11,9 +11,16 @@ iron = "0.4"
staticfile = "0.3.0"
lazy_static = "*"
rustc-serialize = "0.3.16"
persistent="*"
clap = "2"

[dependencies.urlencoded]
version = "0.4"

[dependencies.mount]
git = "https://github.com/iron/mount.git"

# [dependencies.clap]
#version = "2"
#default-features = false
#features = [ "suggestions", "color" ]

+ 109
- 0
src/commands/index.rs View File

@@ -0,0 +1,109 @@
use rustc_serialize::json;
use rustc_serialize::json::DecodeResult;
use std::convert::From;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Read;
use std::path::PathBuf;
use tantivy;
use tantivy::Index;
use tantivy::schema::*;
use time::PreciseTime;
use clap::ArgMatches;

use serialize::json;

fn doc_from_json(schema: Schema, doc_json: &str) -> Document {
let json_it = json::from_str(doc_json).unwrap();
let json_obj = json_it.as_object().unwrap();
println!()
}

enum DocumentSource {
FromPipe,
FromFile(PathBuf),
}

pub fn run_index_cli(argmatch: &ArgMatches) -> tantivy::Result<()> {
let index_directory = PathBuf::from(argmatch.value_of("index").unwrap());
let document_source = {
match argmatch.value_of("file") {
Some(path) => {
DocumentSource::FromFile(PathBuf::from(path))
}
None => DocumentSource::FromPipe,
}
};
run_index(index_directory, document_source)
}

fn run_index(directory: PathBuf, document_source: DocumentSource) -> tantivy::Result<()> {
let index = try!(Index::open(&directory));
let schema = index.schema();
let mut index_writer = index.writer_with_num_threads(8).unwrap();
let articles = try!(document_source.read());
let mut num_docs = 0;
let mut cur = PreciseTime::now();
let group_count = 10000;
let title = schema.get_field("title").unwrap();
let url = schema.get_field("url").unwrap();
let body = schema.get_field("body").unwrap();
for article_line_res in articles.lines() {
let article_line = article_line_res.unwrap();
let article_res: DecodeResult<WikiArticle> = json::decode(&article_line);
match article_res {
Ok(article) => {
let mut doc = Document::new();
doc.add_text(title, &article.title);
doc.add_text(body, &article.body);
doc.add_text(url, &article.url);
index_writer.add_document(doc).unwrap();
}
Err(_) => {}
}

if num_docs > 0 && (num_docs % group_count == 0) {
println!("{} Docs", num_docs);
let new = PreciseTime::now();
let elapsed = cur.to(new);
println!("{:?} docs / hour", group_count * 3600 * 1e6 as u64 / (elapsed.num_microseconds().unwrap() as u64));
cur = new;
}

num_docs += 1;

}
index_writer.wait()
}


#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
pub struct WikiArticle {
pub url: String,
pub title: String,
pub body: String,
}


impl DocumentSource {
fn read(&self,) -> io::Result<BufReader<Box<Read>>> {
Ok(match self {
&DocumentSource::FromPipe => {
BufReader::new(Box::new(io::stdin()))
}
&DocumentSource::FromFile(ref filepath) => {
let read_file = try!(File::open(&filepath));
BufReader::new(Box::new(read_file))
}
})
}
}


+ 0
- 0
src/commands/merge.rs View File


+ 22
- 0
src/commands/mod.rs View File

@@ -0,0 +1,22 @@
mod index;
mod serve;
mod new;

pub use self::new::run_new_cli;
pub use self::index::run_index_cli;
pub use self::serve::run_serve_cli;

// pub mod writer;
// pub mod searcher;
// pub mod index;
// pub mod merger;

// mod segment_serializer;
// mod segment_writer;
// mod segment_reader;
// mod segment_id;
// mod segment_component;

// pub use self::segment_component::SegmentComponent;
// pub use self::segment_id::SegmentId;
// pub use self::segment_reader::SegmentReader;

+ 26
- 0
src/commands/new.rs View File

@@ -0,0 +1,26 @@
use clap::ArgMatches;
use std::convert::From;
use std::path::PathBuf;
use tantivy;
use tantivy::schema::{Schema, STRING, STORED, TEXT};
use tantivy::Index;

fn default_schema() -> Schema {
let mut schema = Schema::new();
schema.add_text_field("url", STRING | STORED);
schema.add_text_field("title", TEXT | STORED);
schema.add_text_field("body", TEXT | STORED);
schema
}

pub fn run_new_cli(matches: &ArgMatches) -> tantivy::Result<()> {
let index_directory = PathBuf::from(matches.value_of("index").unwrap());
run_new(index_directory)
}

fn run_new(directory: PathBuf) -> tantivy::Result<()> {
let schema = default_schema();
let mut index = try!(Index::create(&directory, schema));
index.save_metas()
}


+ 163
- 0
src/commands/serve.rs View File

@@ -0,0 +1,163 @@
use clap::ArgMatches;
use iron::mime::Mime;
use iron::prelude::*;
use iron::status;
use iron::typemap::Key;
use mount::Mount;
use persistent::Read;
use rustc_serialize::json::as_pretty_json;
use staticfile::Static;
use std::convert::From;
use std::path::Path;
use std::path::PathBuf;
use tantivy;
use tantivy::collector;
use tantivy::collector::CountCollector;
use tantivy::collector::TopCollector;
use tantivy::Document;
use tantivy::Index;
use tantivy::query::Explanation;
use tantivy::query::Query;
use tantivy::query::QueryParser;
use tantivy::Result;
use tantivy::schema::Field;
use tantivy::Score;
use urlencoded::UrlEncodedQuery;


pub fn run_serve_cli(matches: &ArgMatches) -> tantivy::Result<()> {
let index_directory = PathBuf::from(matches.value_of("index").unwrap());
let port = value_t!(matches, "port", u16).unwrap_or(3000u16);
let host_str = matches.value_of("host").unwrap_or("localhost");
// let host = Ipv4Addr::from_str(&host_str).unwrap(); // TODO err management
let host = format!("{}:{}", host_str, port);
run_serve(index_directory, &host)
}


#[derive(RustcDecodable, RustcEncodable)]
struct Serp {
q: String,
num_hits: usize,
hits: Vec<Hit>,
timings: Vec<Timing>,
}

#[derive(RustcDecodable, RustcEncodable)]
struct Hit {
title: String,
body: String,
explain: String,
score: Score,
}

#[derive(RustcDecodable, RustcEncodable)]
struct Timing {
name: String,
duration: i64,
}

struct IndexServer {
index: Index,
query_parser: QueryParser,
body_field: Field,
title_field: Field,
}

impl IndexServer {
fn load(path: &Path) -> IndexServer {
let index = Index::open(path).unwrap();
let schema = index.schema();
let body_field = schema.get_field("body").unwrap();
let title_field = schema.get_field("title").unwrap();
let query_parser = QueryParser::new(schema, vec!(body_field, title_field));
IndexServer {
index: index,
query_parser: query_parser,
title_field: title_field,
body_field: body_field,
}
}

fn create_hit(&self, doc: &Document, explain: Explanation) -> Hit {
Hit {
title: String::from(doc.get_first(self.title_field).unwrap().text()),
body: String::from(doc.get_first(self.body_field).unwrap().text().clone()),
explain: format!("{:?}", explain),
score: explain.val(),
}
}
fn search(&self, q: String) -> Result<Serp> {
let query = self.query_parser.parse_query(&q).unwrap();
let searcher = self.index.searcher().unwrap();
let mut count_collector = CountCollector::new();
let mut top_collector = TopCollector::with_limit(10);

{
let mut chained_collector = collector::chain()
.add(&mut top_collector)
.add(&mut count_collector);
try!(query.search(&searcher, &mut chained_collector));
}
let hits: Vec<Hit> = top_collector.docs()
.iter()
.map(|doc_address| {
let doc: Document = searcher.doc(doc_address).unwrap();
let explanation = query.explain(&searcher, doc_address).unwrap();
self.create_hit(&doc, explanation)
})
.collect();
Ok(Serp {
q: q,
hits: hits,
num_hits: count_collector.count(),
timings: Vec::new(),
})
}
}

impl Key for IndexServer {
type Value = IndexServer;
}

fn search(req: &mut Request) -> IronResult<Response> {
let index_server = req.get::<Read<IndexServer>>().unwrap();
match req.get_ref::<UrlEncodedQuery>() {
Ok(ref qs_map) => {
match qs_map.get("q") {
Some(qs) => {
let query = qs[0].clone();
let serp = index_server.search(query).unwrap();
let resp_json = as_pretty_json(&serp).indent(4);
let content_type = "application/json".parse::<Mime>().unwrap();
Ok(
Response::with((content_type, status::Ok, format!("{}", resp_json)))
)
}
None => {
Ok(Response::with((status::BadRequest, "Query not defined")))
}
}
}
Err(_) => Ok(Response::with((status::BadRequest, "Failed to parse query string")))
}
}


fn run_serve(directory: PathBuf, host: &str) -> tantivy::Result<()> {
let mut mount = Mount::new();
let server = IndexServer::load(&directory);
mount.mount("/api", search);
mount.mount("/", Static::new(Path::new("static/")));
let mut middleware = Chain::new(mount);
middleware.link(Read::<IndexServer>::both(server));
println!("listening on http://{}", host);
Iron::new(middleware).http(host).unwrap();
Ok(())
}


+ 69
- 160
src/main.rs View File

@@ -1,170 +1,79 @@
extern crate tantivy;
extern crate time;
extern crate urlencoded;
#[macro_use]
extern crate clap;
#[macro_use]
extern crate lazy_static;
extern crate rustc_serialize;
extern crate tantivy;
extern crate time;
// extern crate regex;
extern crate persistent;
extern crate urlencoded;
extern crate iron;
extern crate staticfile;
extern crate mount;

use tantivy::schema::Field;
use tantivy::collector::CountCollector;
use tantivy::Index;
use std::convert::From;
use time::PreciseTime;
use tantivy::collector;
use urlencoded::UrlEncodedQuery;
use iron::status;
use rustc_serialize::json::as_pretty_json;
use std::path::Path;
use staticfile::Static;
use iron::mime::Mime;
use mount::Mount;
use tantivy::query::Query;
use tantivy::query::QueryParser;
use tantivy::Document;
use tantivy::collector::TopCollector;
use iron::prelude::*;
use clap::{AppSettings, Arg, App, SubCommand};
mod commands;
use self::commands::*;

#[derive(RustcDecodable, RustcEncodable)]
struct Serp {
query: String,
num_hits: usize,
hits: Vec<Hit>,
timings: Vec<Timing>,
}

#[derive(RustcDecodable, RustcEncodable)]
struct Hit {
title: String,
body: String,
}

lazy_static! {
static ref INDEX_SERVER: IndexServer = {
IndexServer::load(&Path::new("/data/wiki-index/"))
};
}

struct IndexServer {
index: Index,
query_parser: QueryParser,
body_field: Field,
title_field: Field,
}

impl IndexServer {
fn load(path: &Path) -> IndexServer {
let index = Index::open(path).unwrap();
let schema = index.schema();
let body_field = schema.get_field("body").unwrap();
let title_field = schema.get_field("title").unwrap();
let query_parser = QueryParser::new(schema, vec!(body_field, title_field));
IndexServer {
index: index,
query_parser: query_parser,
title_field: title_field,
body_field: body_field,
}
}

fn create_hit(&self, doc: &Document) -> Hit {
Hit {
title: String::from(doc.get_first(self.title_field).unwrap().text()),
body: String::from(doc.get_first(self.body_field).unwrap().text().clone()),

}
}
}

struct TimingStarted {
name: String,
start: PreciseTime,
}

impl TimingStarted {
fn new(name: &str) -> TimingStarted {
TimingStarted {
name: String::from(name),
start: PreciseTime::now(),
}
}

fn stop(self) -> Timing {
let stop = PreciseTime::now();
Timing {
name: self.name,
duration: self.start.to(stop).num_microseconds().unwrap(),
}
}
}

#[derive(RustcDecodable, RustcEncodable)]
struct Timing {
name: String,
duration: i64,
}


fn search(req: &mut Request) -> IronResult<Response> {
let mut timings = Vec::new();
match req.get_ref::<UrlEncodedQuery>() {
Ok(ref qs_map) => {
match qs_map.get("q") {
Some(qs) => {
let query = qs[0].clone();
let parsed_query = INDEX_SERVER.query_parser.parse_query(&query).unwrap();
let search_timing = TimingStarted::new("search");
let searcher = INDEX_SERVER.index.searcher().unwrap();

let mut count_collector = CountCollector::new();
let mut top_collector = TopCollector::with_limit(30);

{
let mut chained_collector = collector::chain()
.add(&mut top_collector)
.add(&mut count_collector);
let timings = parsed_query.search(&searcher, &mut chained_collector).unwrap();
println!("{:?}", timings);
}
timings.push(search_timing.stop());
let storage_timing = TimingStarted::new("store");
// for scored_doc in top_collector.score_docs() {
// println!("{:?}", scored_doc);
// }
let hits: Vec<Hit> = top_collector
.docs()
.iter()
.map(|doc_address| searcher.doc(doc_address).unwrap())
.map(|doc|INDEX_SERVER.create_hit(&doc) )
.collect();
timings.push(storage_timing.stop());
let response = Serp {
query: query,
hits: hits,
num_hits: count_collector.count(),
timings: timings,
};
let resp_json = as_pretty_json(&response).indent(4);
let content_type = "application/json".parse::<Mime>().unwrap();
Ok(
Response::with((content_type, status::Ok, format!("{}", resp_json)))
)
}
None => {
Ok(Response::with((status::BadRequest, "Query not defined")))
}
}
}
Err(_) => Ok(Response::with((status::BadRequest, "Failed to parse query string")))
}
}

fn main() {
let mut mount = Mount::new();
mount.mount("/api", search);
mount.mount("/", Static::new(Path::new("static/")));
println!("Running on 3000");
Iron::new(mount).http("127.0.0.1:3000").unwrap();
}
let index_arg = Arg::with_name("index")
.short("i")
.long("index")
.value_name("directory")
.help("Tantivy index directory filepath")
.required(true);
let cli_options = App::new("Tantivy")
.setting(AppSettings::SubcommandRequiredElseHelp)
.version("0.1")
.author("Paul Masurel <paul.masurel@gmail.com>")
.about("Tantivy Search Engine's command line interface.")
.subcommand(
SubCommand::with_name("new")
.about("Create a new index. The schema will be populated with a simple example schema")
.arg(index_arg.clone())
)
.subcommand(
SubCommand::with_name("serve")
.about("Start a server")
.arg(index_arg.clone())
.arg(Arg::with_name("host")
.long("host")
.value_name("host")
.help("host to listen to")
)
.arg(Arg::with_name("port")
.short("p")
.long("port")
.value_name("port")
.help("Port")
.default_value("localhost")
)
)
.subcommand(
SubCommand::with_name("index")
.about("Index files")
.arg(index_arg.clone())
.arg(Arg::with_name("file")
.short("f")
.long("file")
.value_name("file")
.help("File containing the documents to index.")
))
.get_matches();
let (subcommand, some_options) = cli_options.subcommand();
let options = some_options.unwrap();
match subcommand {
// "serve" => run_serve(options),
"new" => run_new_cli(options).unwrap(),
"index" => run_index_cli(options).unwrap(),
"serve" => run_serve_cli(options).unwrap(),
_ => {}
}
}

Loading…
Cancel
Save