Return-Path: Delivered-To: apmail-hadoop-core-user-archive@www.apache.org Received: (qmail 2432 invoked from network); 8 Feb 2008 03:31:27 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Feb 2008 03:31:27 -0000 Received: (qmail 85745 invoked by uid 500); 8 Feb 2008 03:31:17 -0000 Delivered-To: apmail-hadoop-core-user-archive@hadoop.apache.org Received: (qmail 85706 invoked by uid 500); 8 Feb 2008 03:31:17 -0000 Mailing-List: contact core-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-user@hadoop.apache.org Delivered-To: mailing list core-user@hadoop.apache.org Received: (qmail 85697 invoked by uid 99); 8 Feb 2008 03:31:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Feb 2008 19:31:17 -0800 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [66.104.95.4] (HELO listing.marketingbrokers.com) (66.104.95.4) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 08 Feb 2008 03:30:57 +0000 Received: from ip66-104-95-21.z95-104-66.customer.algx.net ([66.104.95.21]) by listing.marketingbrokers.com (JAMES SMTP Server 2.2.0) with SMTP ID 986 for ; Thu, 7 Feb 2008 19:30:27 -0800 (PST) Mime-Version: 1.0 (Apple Message framework v752.2) In-Reply-To: References: Content-Type: text/plain; charset=US-ASCII; delsp=yes; format=flowed Message-Id: <2906B3F3-807E-4A59-9D7F-7161FCD8A955@marketingbrokers.com> Content-Transfer-Encoding: 7bit From: "Peter W." Subject: Re: Lucene-based Distributed Index Leveraging Hadoop Date: Thu, 7 Feb 2008 19:30:27 -0800 To: core-user@hadoop.apache.org X-Mailer: Apple Mail (2.752.2) X-Virus-Checked: Checked by ClamAV on apache.org Howdy, Your work is outstanding and will hopefully be adopted soon. The HDFS distributed Lucene index solves many of the various dependencies introduced by achieving this another way using RMI, HTTP (serialized objects w/servlets) or Tomcat balancing with mysql databases, schemas and connection pools. Before this, other mixed options were available where Nutch obtains documents, html/ and xml parsers extract data, Hadoop reduces those results and Lucene stores, indexes same. Something like get document(Nutch), REST post as XML(Solr), XML to data(ROME,Abdera), data to map(Hadoop), reduce to tables(Hadoop,HBase) then reconstruct bytes to Lucene Document object for indexing. Obviously, yours is cleaner and more scalable. I'd want the master also to keep track of (task[id],[comp]leted,[prog] ress) in ways kind of like tables you could perform status updates: +------+------+------+ | id | comp | prog | +------+------+------+ Also, maybe the following indexing pipeline... index clients: from remote app machine1,machine2,machine3 using hdfs batch index lucene documents (hundred at a time) place in single encapsulation object connect to master select task id where (completed=0) && (progress=0) update progress=1 put object (hdfs) master: recreate collection from stream (in) iterate object, cast items to Document hash document key in the mapper, contents are IM index Lucene documents in reducer allowing Text object access for filtering purposes return indexed # as integer (rpc response) back on clients: updated progress=0,comp=1 when finished send master confirmation info with heartbeat Then add dates and logic for fixing extended race conditions where (completed=0) && (progress=1) on the master where clients can resubmit jobs using confirmed keys received as inventory lists. To update progress and completed tasks, somehow check the size of part-files in each labeled out dir or monitor Hadoop logs in appropriate temp dir. Run new JobClients accordingly. Sweet, Peter W. On Feb 6, 2008, at 10:59 AM, Ning Li wrote: > There have been several proposals for a Lucene-based distributed index > architecture. > 1) Doug Cutting's "Index Server Project Proposal" at > http://www.mail-archive.com/general@lucene.apache.org/ > msg00338.html > 2) Solr's "Distributed Search" at > http://wiki.apache.org/solr/DistributedSearch > 3) Mark Butler's "Distributed Lucene" at > http://wiki.apache.org/hadoop/DistributedLucene > > We have also been working on a Lucene-based distributed index > architecture. > Our design differs from the above proposals in the way it leverages > Hadoop > as much as possible. In particular, HDFS is used to reliably store > Lucene > instances, Map/Reduce is used to analyze documents and update > Lucene instances > in parallel, and Hadoop's IPC framework is used. Our design is > geared for > applications that require a highly scalable index and where batch > updates > to each Lucene instance are acceptable (verses finer-grained > document at > a time updates). > > We have a working implementation of our design and are in the process > of evaluating its performance. An overview of our design is > provided below. > We welcome feedback and would like to know if you are interested in > working > on it. If so, we would be happy to make the code publicly > available. At the > same time, we would like to collaborate with people working on > existing > proposals and see if we can consolidate our efforts. > > TERMINOLOGY > A distributed "index" is partitioned into "shards". Each shard > corresponds to > a Lucene instance and contains a disjoint subset of the documents > in the index. > Each shard is stored in HDFS and served by one or more "shard > servers". Here > we only talk about a single distributed index, but in practice > multiple indexes > can be supported. > > A "master" keeps track of the shard servers and the shards being > served by > them. An "application" updates and queries the global index through an > "index client". An index client communicates with the shard servers to > execute a query. > > KEY RPC METHODS > This section lists the key RPC methods in our design. To simplify the > discussion, some of their parameters have been omitted. > > On the Shard Servers > // Execute a query on this shard server's Lucene instance. > // This method is called by an index client. > SearchResults search(Query query); > > On the Master > // Tell the master to update the shards, i.e., Lucene instances. > // This method is called by an index client. > boolean updateShards(Configuration conf); > > // Ask the master where the shards are located. > // This method is called by an index client. > LocatedShards getShardLocations(); > > // Send a heartbeat to the master. This method is called by a > // shard server. In the response, the master informs the > // shard server when to switch to a newer version of the index. > ShardServerCommand sendHeartbeat(); > > QUERYING THE INDEX > To query the index, an application sends a search request to an > index client. > The index client then calls the shard server search() method for > each shard > of the index, merges the results and returns them to the > application. The > index client caches the mapping between shards and shard servers by > periodically calling the master's getShardLocations() method. > > UPDATING THE INDEX USING MAP/REDUCE > To update the index, an application sends an update request to an > index client. > The index client then calls the master's updateShards() method, > which schedules > a Map/Reduce job to update the index. The Map/Reduce job updates > the shards in > parallel and copies the new index files of each shard (i.e., Lucene > instance) > to HDFS. > > The updateShards() method includes a "configuration", which provides > information for updating the shards. More specifically, the > configuration > includes the following information: > - Input path. This provides the location of updated documents, > e.g., HDFS > files or directories, or HBase tables. > - Input formatter. This specifies how to format the input documents. > - Analysis. This defines the analyzer to use on the input. The > analyzer > determines whether a document is being inserted, updated, or > deleted. For > inserts or updates, the analyzer also converts each input > document into > a Lucene document. > > The Map phase of the Map/Reduce job formats and analyzes the input (in > parallel), while the Reduce phase collects and applies the updates > to each > Lucene instance (again in parallel). The updates are applied using > the local > file system where a Reduce task runs and then copied back to HDFS. > For example, > if the updates caused a new Lucene segment to be created, the new > segment > would be created on the local file system first, and then copied > back to HDFS. > > When the Map/Reduce job completes, a "new version" of the index is > ready to be > queried. It is important to note that the new version of the index > is not > derived from scratch. By leveraging Lucene's update algorithm, the > new version > of each Lucene instance will share as many files as possible as the > previous > version. > > ENSURING INDEX CONSISTENCY > At any point in time, an index client always has a consistent view > of the > shards in the index. The results of a search query include either > all or none > of a recent update to the index. The details of the algorithm to > accomplish > this are omitted here, but the basic flow is pretty simple. > > After the Map/Reduce job to update the shards completes, the master > will tell > each shard server to "prepare" the new version of the index. After > all the > shard servers have responded affirmatively to the "prepare" > message, the new > index is ready to be queried. An index client will then lazily > learn about > the new index when it makes its next getShardLocations() call to > the master. > In essence, a lazy two-phase commit protocol is used, with > "prepare" and > "commit" messages piggybacked on heartbeats. After a shard has > switched to > the new index, the Lucene files in the old index that are no longer > needed > can safely be deleted. > > ACHIEVING FAULT-TOLERANCE > We rely on the fault-tolerance of Map/Reduce to guarantee that an > index update > will eventually succeed. All shards are stored in HDFS and can be > read by any > shard server in a cluster. For a given shard, if one of its shard > servers dies, > new search requests are handled by its surviving shard servers. To > ensure that > there is always enough coverage for a shard, the master will > instruct other > shard servers to take over the shards of a dead shard server. > > PERFORMANCE ISSUES > Currently, each shard server reads a shard directly from HDFS. > Experiments > have shown that this approach does not perform very well, with HDFS > causing > Lucene to slow down fairly dramatically (by well over 5x when data > blocks are > accessed over the network). Consequently, we are exploring > different ways to > leverage the fault tolerance of HDFS and, at the same time, work > around its > performance problems. One simple alternative is to add a local file > system > cache on each shard server. Another alternative is to modify HDFS > so that an > application has more control over where to store the primary and > replicas of > an HDFS block. This feature may be useful for other HDFS > applications (e.g., > HBase). We would like to collaborate with other people who are > interested in > adding this feature to HDFS. > > > Regards, > Ning Li