hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Dunning <tdunn...@veoh.com>
Subject Re: Lucene-based Distributed Index Leveraging Hadoop
Date Wed, 06 Feb 2008 19:23:32 GMT

Very nice summary.

One of the issues that we have had with multiple search servers is that on
linux, there can be substantial contention for disk I/O.  This means that as
a new index is being written, access to the current index can be stalled for
very long periods of time (sometimes >10s).  This problem is very difficult
to work around even with different I/O schedulers, write throttling and
other measures.  Our best work-around is to simply take a shard out of
service during delivery of an updated index.  This is obviously not a good

Regarding your comments about control over where the index resides, it may
help in the short run to write the original version of the index from one of
the storage nodes.  That will let you ensure that at least one copy of the
index is local to that machine.  Rebalancing will destroy that locality over
time, of course, so you may have problems there.  It would obviously be
preferable to be able to say to HDFS something like "the following 5 hosts
are preferred locations for replicas of this file" and have that preference
be respected during allocation and rebalancing.

On 2/6/08 10:59 AM, "Ning Li" <ning.li.00@gmail.com> wrote:

> There have been several proposals for a Lucene-based distributed index
> architecture.
>  1) Doug Cutting's "Index Server Project Proposal" at
>     http://www.mail-archive.com/general@lucene.apache.org/msg00338.html
>  2) Solr's "Distributed Search" at
>     http://wiki.apache.org/solr/DistributedSearch
>  3) Mark Butler's "Distributed Lucene" at
>     http://wiki.apache.org/hadoop/DistributedLucene
> We have also been working on a Lucene-based distributed index architecture.
> Our design differs from the above proposals in the way it leverages Hadoop
> as much as possible. In particular, HDFS is used to reliably store Lucene
> instances, Map/Reduce is used to analyze documents and update Lucene instances
> in parallel, and Hadoop's IPC framework is used. Our design is geared for
> applications that require a highly scalable index and where batch updates
> to each Lucene instance are acceptable (verses finer-grained document at
> a time updates).
> We have a working implementation of our design and are in the process
> of evaluating its performance. An overview of our design is provided below.
> We welcome feedback and would like to know if you are interested in working
> on it. If so, we would be happy to make the code publicly available. At the
> same time, we would like to collaborate with people working on existing
> proposals and see if we can consolidate our efforts.
> A distributed "index" is partitioned into "shards". Each shard corresponds to
> a Lucene instance and contains a disjoint subset of the documents in the
> index.
> Each shard is stored in HDFS and served by one or more "shard servers". Here
> we only talk about a single distributed index, but in practice multiple
> indexes
> can be supported.
> A "master" keeps track of the shard servers and the shards being served by
> them. An "application" updates and queries the global index through an
> "index client". An index client communicates with the shard servers to
> execute a query.
> This section lists the key RPC methods in our design. To simplify the
> discussion, some of their parameters have been omitted.
>   On the Shard Servers
>     // Execute a query on this shard server's Lucene instance.
>     // This method is called by an index client.
>     SearchResults search(Query query);
>   On the Master
>     // Tell the master to update the shards, i.e., Lucene instances.
>     // This method is called by an index client.
>     boolean updateShards(Configuration conf);
>     // Ask the master where the shards are located.
>     // This method is called by an index client.
>     LocatedShards getShardLocations();
>     // Send a heartbeat to the master. This method is called by a
>     // shard server. In the response, the master informs the
>     // shard server when to switch to a newer version of the index.
>     ShardServerCommand sendHeartbeat();
> To query the index, an application sends a search request to an index client.
> The index client then calls the shard server search() method for each shard
> of the index, merges the results and returns them to the application. The
> index client caches the mapping between shards and shard servers by
> periodically calling the master's getShardLocations() method.
> To update the index, an application sends an update request to an index
> client.
> The index client then calls the master's updateShards() method, which
> schedules
> a Map/Reduce job to update the index. The Map/Reduce job updates the shards in
> parallel and copies the new index files of each shard (i.e., Lucene instance)
> to HDFS.
> The updateShards() method includes a "configuration", which provides
> information for updating the shards. More specifically, the configuration
> includes the following information:
>   - Input path. This provides the location of updated documents, e.g., HDFS
>     files or directories, or HBase tables.
>   - Input formatter. This specifies how to format the input documents.
>   - Analysis. This defines the analyzer to use on the input. The analyzer
>     determines whether a document is being inserted, updated, or deleted. For
>     inserts or updates, the analyzer also converts each input document into
>     a Lucene document.
> The Map phase of the Map/Reduce job formats and analyzes the input (in
> parallel), while the Reduce phase collects and applies the updates to each
> Lucene instance (again in parallel). The updates are applied using the local
> file system where a Reduce task runs and then copied back to HDFS. For
> example,
> if the updates caused a new Lucene segment to be created, the new segment
> would be created on the local file system first, and then copied back to HDFS.
> When the Map/Reduce job completes, a "new version" of the index is ready to be
> queried. It is important to note that the new version of the index is not
> derived from scratch. By leveraging Lucene's update algorithm, the new version
> of each Lucene instance will share as many files as possible as the previous
> version.
> At any point in time, an index client always has a consistent view of the
> shards in the index. The results of a search query include either all or none
> of a recent update to the index. The details of the algorithm to accomplish
> this are omitted here, but the basic flow is pretty simple.
> After the Map/Reduce job to update the shards completes, the master will tell
> each shard server to "prepare" the new version of the index. After all the
> shard servers have responded affirmatively to the "prepare" message, the new
> index is ready to be queried. An index client will then lazily learn about
> the new index when it makes its next getShardLocations() call to the master.
> In essence, a lazy two-phase commit protocol is used, with "prepare" and
> "commit" messages piggybacked on heartbeats. After a shard has switched to
> the new index, the Lucene files in the old index that are no longer needed
> can safely be deleted.
> We rely on the fault-tolerance of Map/Reduce to guarantee that an index update
> will eventually succeed. All shards are stored in HDFS and can be read by any
> shard server in a cluster. For a given shard, if one of its shard servers
> dies,
> new search requests are handled by its surviving shard servers. To ensure that
> there is always enough coverage for a shard, the master will instruct other
> shard servers to take over the shards of a dead shard server.
> Currently, each shard server reads a shard directly from HDFS. Experiments
> have shown that this approach does not perform very well, with HDFS causing
> Lucene to slow down fairly dramatically (by well over 5x when data blocks are
> accessed over the network). Consequently, we are exploring different ways to
> leverage the fault tolerance of HDFS and, at the same time, work around its
> performance problems. One simple alternative is to add a local file system
> cache on each shard server. Another alternative is to modify HDFS so that an
> application has more control over where to store the primary and replicas of
> an HDFS block. This feature may be useful for other HDFS applications (e.g.,
> HBase). We would like to collaborate with other people who are interested in
> adding this feature to HDFS.
> Regards,
> Ning Li

View raw message