lucene-general mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Clay Webster" <clay.webs...@cnet.com>
Subject RE: Lucene-based Distributed Index Leveraging Hadoop
Date Wed, 06 Feb 2008 19:33:58 GMT

There seem to be a few other players in this space too.

Are you from Rackspace?  
(http://highscalability.com/how-rackspace-now-uses-mapreduce-and-hadoop-
query-terabytes-data)

AOL also has a Hadoop/Solr project going on.

CNET does not have much brewing there.  Although Yonik and I had 
talked about it a bunch -- but that was long ago. 

--cw

Clay Webster                                   tel:1.908.541.3724
Associate VP, Platform Infrastructure         http://www.cnet.com
CNET, Inc. (Nasdaq:CNET)                     mailto:clay@cnet.com

> -----Original Message-----
> From: Ning Li [mailto:ning.li.li@gmail.com]
> Sent: Wednesday, February 06, 2008 1:57 PM
> To: general@lucene.apache.org; java-dev@lucene.apache.org; solr-
> user@lucene.apache.org
> Subject: Lucene-based Distributed Index Leveraging Hadoop
> 
> There have been several proposals for a Lucene-based distributed index
> architecture.
>  1) Doug Cutting's "Index Server Project Proposal" at
>
http://www.mail-archive.com/general@lucene.apache.org/msg00338.html
>  2) Solr's "Distributed Search" at
>     http://wiki.apache.org/solr/DistributedSearch
>  3) Mark Butler's "Distributed Lucene" at
>     http://wiki.apache.org/hadoop/DistributedLucene
> 
> We have also been working on a Lucene-based distributed index
> architecture.
> Our design differs from the above proposals in the way it leverages
> Hadoop
> as much as possible. In particular, HDFS is used to reliably store
> Lucene
> instances, Map/Reduce is used to analyze documents and update Lucene
> instances
> in parallel, and Hadoop's IPC framework is used. Our design is geared
> for
> applications that require a highly scalable index and where batch
> updates
> to each Lucene instance are acceptable (verses finer-grained document
> at
> a time updates).
> 
> We have a working implementation of our design and are in the process
> of evaluating its performance. An overview of our design is provided
> below.
> We welcome feedback and would like to know if you are interested in
> working
> on it. If so, we would be happy to make the code publicly available.
At
> the
> same time, we would like to collaborate with people working on
existing
> proposals and see if we can consolidate our efforts.
> 
> TERMINOLOGY
> A distributed "index" is partitioned into "shards". Each shard
> corresponds
> to
> a Lucene instance and contains a disjoint subset of the documents in
> the
> index.
> Each shard is stored in HDFS and served by one or more "shard
servers".
> Here
> we only talk about a single distributed index, but in practice
multiple
> indexes
> can be supported.
> 
> A "master" keeps track of the shard servers and the shards being
served
> by
> them. An "application" updates and queries the global index through an
> "index client". An index client communicates with the shard servers to
> execute a query.
> 
> KEY RPC METHODS
> This section lists the key RPC methods in our design. To simplify the
> discussion, some of their parameters have been omitted.
> 
>   On the Shard Servers
>     // Execute a query on this shard server's Lucene instance.
>     // This method is called by an index client.
>     SearchResults search(Query query);
> 
>   On the Master
>     // Tell the master to update the shards, i.e., Lucene instances.
>     // This method is called by an index client.
>     boolean updateShards(Configuration conf);
> 
>     // Ask the master where the shards are located.
>     // This method is called by an index client.
>     LocatedShards getShardLocations();
> 
>     // Send a heartbeat to the master. This method is called by a
>     // shard server. In the response, the master informs the
>     // shard server when to switch to a newer version of the index.
>     ShardServerCommand sendHeartbeat();
> 
> QUERYING THE INDEX
> To query the index, an application sends a search request to an index
> client.
> The index client then calls the shard server search() method for each
> shard
> of the index, merges the results and returns them to the application.
> The
> index client caches the mapping between shards and shard servers by
> periodically calling the master's getShardLocations() method.
> 
> UPDATING THE INDEX USING MAP/REDUCE
> To update the index, an application sends an update request to an
index
> client.
> The index client then calls the master's updateShards() method, which
> schedules
> a Map/Reduce job to update the index. The Map/Reduce job updates the
> shards
> in
> parallel and copies the new index files of each shard (i.e., Lucene
> instance)
> to HDFS.
> 
> The updateShards() method includes a "configuration", which provides
> information for updating the shards. More specifically, the
> configuration
> includes the following information:
>   - Input path. This provides the location of updated documents, e.g.,
> HDFS
>     files or directories, or HBase tables.
>   - Input formatter. This specifies how to format the input documents.
>   - Analysis. This defines the analyzer to use on the input. The
> analyzer
>     determines whether a document is being inserted, updated, or
> deleted.
> For
>     inserts or updates, the analyzer also converts each input document
> into
>     a Lucene document.
> 
> The Map phase of the Map/Reduce job formats and analyzes the input (in
> parallel), while the Reduce phase collects and applies the updates to
> each
> Lucene instance (again in parallel). The updates are applied using the
> local
> file system where a Reduce task runs and then copied back to HDFS. For
> example,
> if the updates caused a new Lucene segment to be created, the new
> segment
> would be created on the local file system first, and then copied back
> to
> HDFS.
> 
> When the Map/Reduce job completes, a "new version" of the index is
> ready to
> be
> queried. It is important to note that the new version of the index is
> not
> derived from scratch. By leveraging Lucene's update algorithm, the new
> version
> of each Lucene instance will share as many files as possible as the
> previous
> version.
> 
> ENSURING INDEX CONSISTENCY
> At any point in time, an index client always has a consistent view of
> the
> shards in the index. The results of a search query include either all
> or
> none
> of a recent update to the index. The details of the algorithm to
> accomplish
> this are omitted here, but the basic flow is pretty simple.
> 
> After the Map/Reduce job to update the shards completes, the master
> will
> tell
> each shard server to "prepare" the new version of the index. After all
> the
> shard servers have responded affirmatively to the "prepare" message,
> the new
> 
> index is ready to be queried. An index client will then lazily learn
> about
> the new index when it makes its next getShardLocations() call to the
> master.
> 
> In essence, a lazy two-phase commit protocol is used, with "prepare"
> and
> "commit" messages piggybacked on heartbeats. After a shard has
switched
> to
> the new index, the Lucene files in the old index that are no longer
> needed
> can safely be deleted.
> 
> ACHIEVING FAULT-TOLERANCE
> We rely on the fault-tolerance of Map/Reduce to guarantee that an
index
> update
> will eventually succeed. All shards are stored in HDFS and can be read
> by
> any
> shard server in a cluster. For a given shard, if one of its shard
> servers
> dies,
> new search requests are handled by its surviving shard servers. To
> ensure
> that
> there is always enough coverage for a shard, the master will instruct
> other
> shard servers to take over the shards of a dead shard server.
> 
> PERFORMANCE ISSUES
> Currently, each shard server reads a shard directly from HDFS.
> Experiments
> have shown that this approach does not perform very well, with HDFS
> causing
> Lucene to slow down fairly dramatically (by well over 5x when data
> blocks
> are
> accessed over the network). Consequently, we are exploring different
> ways to
> leverage the fault tolerance of HDFS and, at the same time, work
around
> its
> performance problems. One simple alternative is to add a local file
> system
> cache on each shard server. Another alternative is to modify HDFS so
> that an
> application has more control over where to store the primary and
> replicas of
> an HDFS block. This feature may be useful for other HDFS applications
> (e.g.,
> HBase). We would like to collaborate with other people who are
> interested in
> adding this feature to HDFS.
> 
> 
> Regards,
> Ning Li

Mime
View raw message