lucene-general mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ning Li" <>
Subject Lucene-based Distributed Index Leveraging Hadoop
Date Wed, 06 Feb 2008 18:57:22 GMT
There have been several proposals for a Lucene-based distributed index
 1) Doug Cutting's "Index Server Project Proposal" at
 2) Solr's "Distributed Search" at
 3) Mark Butler's "Distributed Lucene" at

We have also been working on a Lucene-based distributed index architecture.
Our design differs from the above proposals in the way it leverages Hadoop
as much as possible. In particular, HDFS is used to reliably store Lucene
instances, Map/Reduce is used to analyze documents and update Lucene
in parallel, and Hadoop's IPC framework is used. Our design is geared for
applications that require a highly scalable index and where batch updates
to each Lucene instance are acceptable (verses finer-grained document at
a time updates).

We have a working implementation of our design and are in the process
of evaluating its performance. An overview of our design is provided below.
We welcome feedback and would like to know if you are interested in working
on it. If so, we would be happy to make the code publicly available. At the
same time, we would like to collaborate with people working on existing
proposals and see if we can consolidate our efforts.

A distributed "index" is partitioned into "shards". Each shard corresponds
a Lucene instance and contains a disjoint subset of the documents in the
Each shard is stored in HDFS and served by one or more "shard servers". Here
we only talk about a single distributed index, but in practice multiple
can be supported.

A "master" keeps track of the shard servers and the shards being served by
them. An "application" updates and queries the global index through an
"index client". An index client communicates with the shard servers to
execute a query.

This section lists the key RPC methods in our design. To simplify the
discussion, some of their parameters have been omitted.

  On the Shard Servers
    // Execute a query on this shard server's Lucene instance.
    // This method is called by an index client.
    SearchResults search(Query query);

  On the Master
    // Tell the master to update the shards, i.e., Lucene instances.
    // This method is called by an index client.
    boolean updateShards(Configuration conf);

    // Ask the master where the shards are located.
    // This method is called by an index client.
    LocatedShards getShardLocations();

    // Send a heartbeat to the master. This method is called by a
    // shard server. In the response, the master informs the
    // shard server when to switch to a newer version of the index.
    ShardServerCommand sendHeartbeat();

To query the index, an application sends a search request to an index
The index client then calls the shard server search() method for each shard
of the index, merges the results and returns them to the application. The
index client caches the mapping between shards and shard servers by
periodically calling the master's getShardLocations() method.

To update the index, an application sends an update request to an index
The index client then calls the master's updateShards() method, which
a Map/Reduce job to update the index. The Map/Reduce job updates the shards
parallel and copies the new index files of each shard (i.e., Lucene
to HDFS.

The updateShards() method includes a "configuration", which provides
information for updating the shards. More specifically, the configuration
includes the following information:
  - Input path. This provides the location of updated documents, e.g., HDFS
    files or directories, or HBase tables.
  - Input formatter. This specifies how to format the input documents.
  - Analysis. This defines the analyzer to use on the input. The analyzer
    determines whether a document is being inserted, updated, or deleted.
    inserts or updates, the analyzer also converts each input document into
    a Lucene document.

The Map phase of the Map/Reduce job formats and analyzes the input (in
parallel), while the Reduce phase collects and applies the updates to each
Lucene instance (again in parallel). The updates are applied using the local
file system where a Reduce task runs and then copied back to HDFS. For
if the updates caused a new Lucene segment to be created, the new segment
would be created on the local file system first, and then copied back to

When the Map/Reduce job completes, a "new version" of the index is ready to
queried. It is important to note that the new version of the index is not
derived from scratch. By leveraging Lucene's update algorithm, the new
of each Lucene instance will share as many files as possible as the previous

At any point in time, an index client always has a consistent view of the
shards in the index. The results of a search query include either all or
of a recent update to the index. The details of the algorithm to accomplish
this are omitted here, but the basic flow is pretty simple.

After the Map/Reduce job to update the shards completes, the master will
each shard server to "prepare" the new version of the index. After all the
shard servers have responded affirmatively to the "prepare" message, the new

index is ready to be queried. An index client will then lazily learn about
the new index when it makes its next getShardLocations() call to the master.

In essence, a lazy two-phase commit protocol is used, with "prepare" and
"commit" messages piggybacked on heartbeats. After a shard has switched to
the new index, the Lucene files in the old index that are no longer needed
can safely be deleted.

We rely on the fault-tolerance of Map/Reduce to guarantee that an index
will eventually succeed. All shards are stored in HDFS and can be read by
shard server in a cluster. For a given shard, if one of its shard servers
new search requests are handled by its surviving shard servers. To ensure
there is always enough coverage for a shard, the master will instruct other
shard servers to take over the shards of a dead shard server.

Currently, each shard server reads a shard directly from HDFS. Experiments
have shown that this approach does not perform very well, with HDFS causing
Lucene to slow down fairly dramatically (by well over 5x when data blocks
accessed over the network). Consequently, we are exploring different ways to
leverage the fault tolerance of HDFS and, at the same time, work around its
performance problems. One simple alternative is to add a local file system
cache on each shard server. Another alternative is to modify HDFS so that an
application has more control over where to store the primary and replicas of
an HDFS block. This feature may be useful for other HDFS applications (e.g.,
HBase). We would like to collaborate with other people who are interested in
adding this feature to HDFS.

Ning Li

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message