Return-Path: Delivered-To: apmail-incubator-cassandra-dev-archive@minotaur.apache.org Received: (qmail 54091 invoked from network); 17 Aug 2009 09:24:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Aug 2009 09:24:50 -0000 Received: (qmail 65937 invoked by uid 500); 17 Aug 2009 09:24:56 -0000 Delivered-To: apmail-incubator-cassandra-dev-archive@incubator.apache.org Received: (qmail 65905 invoked by uid 500); 17 Aug 2009 09:24:56 -0000 Mailing-List: contact cassandra-dev-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-dev@incubator.apache.org Received: (qmail 65895 invoked by uid 99); 17 Aug 2009 09:24:56 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Aug 2009 09:24:56 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [209.85.210.177] (HELO mail-yx0-f177.google.com) (209.85.210.177) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Aug 2009 09:24:45 +0000 Received: by yxe7 with SMTP id 7so3405358yxe.32 for ; Mon, 17 Aug 2009 02:24:23 -0700 (PDT) MIME-Version: 1.0 Received: by 10.100.233.19 with SMTP id f19mr3118470anh.72.1250501063812; Mon, 17 Aug 2009 02:24:23 -0700 (PDT) In-Reply-To: References: Date: Mon, 17 Aug 2009 02:24:23 -0700 Message-ID: Subject: Re: hadoop tasks reading from cassandra From: Jeff Hodges To: cassandra-dev@incubator.apache.org Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org For those of you playing at home, a "stupid" version of the hadoop support has been attached to CASSANDRA-342. I mention it here for the curious, but please keep discussion of it in the ticket. Thanks! https://issues.apache.org/jira/browse/CASSANDRA-342?focusedCommentId=12744001&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12744001 -- Jeff On Fri, Jul 24, 2009 at 1:23 AM, Jeff Hodges wrote: > Hey, > > Getting Hadoop to play nice with Cassandra has been a desire for many > folks on this list and probably more on the other one. For the > purposes of this email, I'm going to restrict this goal of getting > Hadoop to read from Cassandra in a not-stupid way. > > "Not-stupid" has a very specific meaning. "Not-stupid" means that: > > 1) Each Hadoop Mapper sees only a small subset of the entire desired > dataset from Cassandra and that the entire desired dataset will never > be seen by any one phase of the Hadoop process. > > 2) Every portion of the desired dataset will be unique to the Mapper > it is delivered to. No two Mappers will ever overlap. > > 3) There will be no portion of the desired dataset that is not seen by a Mapper. > > 4) Partitioning of the dataset to the Mappers should try to > efficiently use the Cassandra nodes. This means attempting to keep > partitions on to one node solely. > > Conspicuously not on this list is data locality. That is, keeping the > data passed from a node to a given Mapper at or near the same machine. > This requires further investigation outside the scope of this initial > project. > > Also, please remember that "not-stupid" is not the same as "smart". > > = How Hadoop Wants It And How It's Been Done By HBase > > I've dug around the Hadoop and HBase codebases and while my > understanding is not yet perfect, this seems to be the general layout > of the problem. > > First, a subclass of InputFormat needs to be written. This class's job > is to split up the dataset for a Hadoop job into InputSplits, or more > accurately, subclasses of InputSplits. These InputSplits are > serialized to disk in HDFS in files named so that each is picked up by > just one Mapper. Okay, actually the Mapper has no idea about > InputSplits. > > A InputSplit is loaded up by on a machine running a Mapper, and then > getRecordReader() is called on the subclass of InputFormat, and the > Mapper's InputSplit is passed in as well as various hadoop job > information. getRecordReader() returns a subclass of RecordReader that > allows the Mapper to call next() on it over and over again to run > through the portion of the dataset represented by InputSplit. > > In the 0.19.3 version of the HBase codebase, InputSplits are created > by gathering all the start keys for each "region" (which conceptually > maps, roughly, to a Cassandra node) in the database and divvying up > the keys approximately evenly across the number of Mappers desired. > Each TableSplits (the HBase subclass of InputSplit) created has > information about the start key of the region, the end key of the > region and the region location (the "name" of the node the dataset is > on). > > This splitting on keys works because HBase keys are always stored in > an ordered fashion. Basically, HBase always partitions using something > akin to Cassandra's OrderPreservingPartitioner. > > I've posted a gist with just the method that does this divvying up, by > the by[1]. (Interestingly, the region location seems to only be > encoded to allow for a nice toString() method on InputSplit to be used > during logging.) > > HBase's subclass of RecordReader uses an instance of the ClientScanner > class to keep state about where they are in the Mapper's portion of > the dataset. This ClientScanner queries HBase's META server (which > contains information about all regions) with the start key and table > name to gather what HBase region to talk to and caches that > information. Note, that this is done for each Mapper on each machine. > Note, too, that the region information encoded in the TableSplit > doesn't seem to be used. > > These last few points are where the code gets really hairy. I could be > wrong about some of it and corrections would be appreciated. > > The important parts, anyway, are the subclassing of InputFormat, > RecordReader and InputSplit. > > = How To Do It With Cassandra (Maybe) > > So, I spoke to Jonathan on #cassandra about this and we tried to see > how we could take the HBase method and turn it into something that > would work with Cassandra. > > Our initial assumption is that the Cassandra database to be mapped > over has to use the OrderPreservingPartitioner to keep the input > splitting consistent. > > Now, Cassandra nodes don't really have a concept of a "start" and > "end" key. We could, however, get a start key for a given node by > taking the first key returned from SSTableReader#getIndexedKeys(). We > would then gather up the start keys from each of the nodes, and sort > them. > > We could then use each key in this gathered list as a start key for a > "region" and, given an addition to the slice API, slice from start key > to the next start key (the end key, in HBase terminology). We would > need to modify the slice API to provide slices where the end key given > is exclusive to the set returned, instead of inclusive. > > In terms of actual code to be written, our subclass of InputFormat is > what would gather this list of start keys, and we would serialize the > start/finish key pairs with our own subclass of InputSplit. And, of > course, our subclass of RecordReader would make the > finish-key-exclusive slice call. > > This method satisifies property 1 of our not-stupid definition. At no > point in our Hadoop job are we accessing the entire dataset or even > all of the keys (if I'm remembering how getIndexedKeys works > correctly). > > This satisifies property 2 and 3 because we are clearly slicing > everything once and only once unless I'm misremembering how > replication works w.r.t. ordered partitioning. If I am misremembering > and start keys are duplicated, we can just return a sorted set instead > of a sorted array. > > This satisfies property 4 because we are slicing along the seams given > us by the nodes themselves. > > = Back To The Game > > Right, so that's the first pass. What sucks about this? What rules > about it? Questions? > > [1] http://gist.github.com/150217 > -- > Jeff >