Return-Path: Delivered-To: apmail-lucene-general-archive@www.apache.org Received: (qmail 91064 invoked from network); 6 Feb 2008 19:34:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Feb 2008 19:34:34 -0000 Received: (qmail 47964 invoked by uid 500); 6 Feb 2008 19:34:25 -0000 Delivered-To: apmail-lucene-general-archive@lucene.apache.org Received: (qmail 47937 invoked by uid 500); 6 Feb 2008 19:34:25 -0000 Mailing-List: contact general-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: general@lucene.apache.org Delivered-To: mailing list general@lucene.apache.org Received: (qmail 47902 invoked by uid 99); 6 Feb 2008 19:34:25 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Feb 2008 11:34:25 -0800 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of clay.webster@cnet.com designates 216.239.127.187 as permitted sender) Received: from [216.239.127.187] (HELO c12-smtp1e.cnet.com) (216.239.127.187) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Feb 2008 19:33:53 +0000 X-SBRS: None X-IronPort-AV: E=Sophos;i="4.25,313,1199692800"; d="scan'208";a="273617147" Received: from cnet138.cnet.cnwk ([10.16.0.65]) by c12-smtp1e.cnet.cnwk with ESMTP; 06 Feb 2008 11:34:01 -0800 Received: from cnet30.cnet.cnwk ([10.16.0.109]) by cnet138.cnet.cnwk with Microsoft SMTPSVC(6.0.3790.3959); Wed, 6 Feb 2008 11:34:00 -0800 Received: from NJ-E2K3-MBOX01.cnet.cnwk ([10.17.32.84]) by cnet30.cnet.cnwk with Microsoft SMTPSVC(6.0.3790.3959); Wed, 6 Feb 2008 11:34:00 -0800 X-MimeOLE: Produced By Microsoft Exchange V6.5 Content-class: urn:content-classes:message MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Subject: RE: Lucene-based Distributed Index Leveraging Hadoop Date: Wed, 6 Feb 2008 14:33:58 -0500 Message-ID: <3B5AF75FB405D64B8089C6863C52AB2E2A939D@NJ-E2K3-MBOX01.cnet.cnwk> In-Reply-To: X-MS-Has-Attach: X-MS-TNEF-Correlator: Thread-Topic: Lucene-based Distributed Index Leveraging Hadoop Thread-Index: Acho8jSNsJGTpuaATFScYGEC7ZPzlAAA6Hmg References: From: "Clay Webster" To: , , X-OriginalArrivalTime: 06 Feb 2008 19:34:00.0514 (UTC) FILETIME=[39273220:01C868F7] X-Virus-Checked: Checked by ClamAV on apache.org There seem to be a few other players in this space too. Are you from Rackspace? =20 (http://highscalability.com/how-rackspace-now-uses-mapreduce-and-hadoop- query-terabytes-data) AOL also has a Hadoop/Solr project going on. CNET does not have much brewing there. Although Yonik and I had=20 talked about it a bunch -- but that was long ago.=20 --cw Clay Webster tel:1.908.541.3724 Associate VP, Platform Infrastructure http://www.cnet.com CNET, Inc. (Nasdaq:CNET) mailto:clay@cnet.com > -----Original Message----- > From: Ning Li [mailto:ning.li.li@gmail.com] > Sent: Wednesday, February 06, 2008 1:57 PM > To: general@lucene.apache.org; java-dev@lucene.apache.org; solr- > user@lucene.apache.org > Subject: Lucene-based Distributed Index Leveraging Hadoop >=20 > There have been several proposals for a Lucene-based distributed index > architecture. > 1) Doug Cutting's "Index Server Project Proposal" at > http://www.mail-archive.com/general@lucene.apache.org/msg00338.html > 2) Solr's "Distributed Search" at > http://wiki.apache.org/solr/DistributedSearch > 3) Mark Butler's "Distributed Lucene" at > http://wiki.apache.org/hadoop/DistributedLucene >=20 > We have also been working on a Lucene-based distributed index > architecture. > Our design differs from the above proposals in the way it leverages > Hadoop > as much as possible. In particular, HDFS is used to reliably store > Lucene > instances, Map/Reduce is used to analyze documents and update Lucene > instances > in parallel, and Hadoop's IPC framework is used. Our design is geared > for > applications that require a highly scalable index and where batch > updates > to each Lucene instance are acceptable (verses finer-grained document > at > a time updates). >=20 > We have a working implementation of our design and are in the process > of evaluating its performance. An overview of our design is provided > below. > We welcome feedback and would like to know if you are interested in > working > on it. If so, we would be happy to make the code publicly available. At > the > same time, we would like to collaborate with people working on existing > proposals and see if we can consolidate our efforts. >=20 > TERMINOLOGY > A distributed "index" is partitioned into "shards". Each shard > corresponds > to > a Lucene instance and contains a disjoint subset of the documents in > the > index. > Each shard is stored in HDFS and served by one or more "shard servers". > Here > we only talk about a single distributed index, but in practice multiple > indexes > can be supported. >=20 > A "master" keeps track of the shard servers and the shards being served > by > them. An "application" updates and queries the global index through an > "index client". An index client communicates with the shard servers to > execute a query. >=20 > KEY RPC METHODS > This section lists the key RPC methods in our design. To simplify the > discussion, some of their parameters have been omitted. >=20 > On the Shard Servers > // Execute a query on this shard server's Lucene instance. > // This method is called by an index client. > SearchResults search(Query query); >=20 > On the Master > // Tell the master to update the shards, i.e., Lucene instances. > // This method is called by an index client. > boolean updateShards(Configuration conf); >=20 > // Ask the master where the shards are located. > // This method is called by an index client. > LocatedShards getShardLocations(); >=20 > // Send a heartbeat to the master. This method is called by a > // shard server. In the response, the master informs the > // shard server when to switch to a newer version of the index. > ShardServerCommand sendHeartbeat(); >=20 > QUERYING THE INDEX > To query the index, an application sends a search request to an index > client. > The index client then calls the shard server search() method for each > shard > of the index, merges the results and returns them to the application. > The > index client caches the mapping between shards and shard servers by > periodically calling the master's getShardLocations() method. >=20 > UPDATING THE INDEX USING MAP/REDUCE > To update the index, an application sends an update request to an index > client. > The index client then calls the master's updateShards() method, which > schedules > a Map/Reduce job to update the index. The Map/Reduce job updates the > shards > in > parallel and copies the new index files of each shard (i.e., Lucene > instance) > to HDFS. >=20 > The updateShards() method includes a "configuration", which provides > information for updating the shards. More specifically, the > configuration > includes the following information: > - Input path. This provides the location of updated documents, e.g., > HDFS > files or directories, or HBase tables. > - Input formatter. This specifies how to format the input documents. > - Analysis. This defines the analyzer to use on the input. The > analyzer > determines whether a document is being inserted, updated, or > deleted. > For > inserts or updates, the analyzer also converts each input document > into > a Lucene document. >=20 > The Map phase of the Map/Reduce job formats and analyzes the input (in > parallel), while the Reduce phase collects and applies the updates to > each > Lucene instance (again in parallel). The updates are applied using the > local > file system where a Reduce task runs and then copied back to HDFS. For > example, > if the updates caused a new Lucene segment to be created, the new > segment > would be created on the local file system first, and then copied back > to > HDFS. >=20 > When the Map/Reduce job completes, a "new version" of the index is > ready to > be > queried. It is important to note that the new version of the index is > not > derived from scratch. By leveraging Lucene's update algorithm, the new > version > of each Lucene instance will share as many files as possible as the > previous > version. >=20 > ENSURING INDEX CONSISTENCY > At any point in time, an index client always has a consistent view of > the > shards in the index. The results of a search query include either all > or > none > of a recent update to the index. The details of the algorithm to > accomplish > this are omitted here, but the basic flow is pretty simple. >=20 > After the Map/Reduce job to update the shards completes, the master > will > tell > each shard server to "prepare" the new version of the index. After all > the > shard servers have responded affirmatively to the "prepare" message, > the new >=20 > index is ready to be queried. An index client will then lazily learn > about > the new index when it makes its next getShardLocations() call to the > master. >=20 > In essence, a lazy two-phase commit protocol is used, with "prepare" > and > "commit" messages piggybacked on heartbeats. After a shard has switched > to > the new index, the Lucene files in the old index that are no longer > needed > can safely be deleted. >=20 > ACHIEVING FAULT-TOLERANCE > We rely on the fault-tolerance of Map/Reduce to guarantee that an index > update > will eventually succeed. All shards are stored in HDFS and can be read > by > any > shard server in a cluster. For a given shard, if one of its shard > servers > dies, > new search requests are handled by its surviving shard servers. To > ensure > that > there is always enough coverage for a shard, the master will instruct > other > shard servers to take over the shards of a dead shard server. >=20 > PERFORMANCE ISSUES > Currently, each shard server reads a shard directly from HDFS. > Experiments > have shown that this approach does not perform very well, with HDFS > causing > Lucene to slow down fairly dramatically (by well over 5x when data > blocks > are > accessed over the network). Consequently, we are exploring different > ways to > leverage the fault tolerance of HDFS and, at the same time, work around > its > performance problems. One simple alternative is to add a local file > system > cache on each shard server. Another alternative is to modify HDFS so > that an > application has more control over where to store the primary and > replicas of > an HDFS block. This feature may be useful for other HDFS applications > (e.g., > HBase). We would like to collaborate with other people who are > interested in > adding this feature to HDFS. >=20 >=20 > Regards, > Ning Li