incubator-cassandra-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Hodges <j...@somethingsimilar.com>
Subject Re: hadoop tasks reading from cassandra
Date Mon, 17 Aug 2009 09:24:23 GMT
For those of you playing at home, a "stupid" version of the hadoop
support has been attached to CASSANDRA-342. I mention it here for the
curious, but please keep discussion of it in the ticket. Thanks!

https://issues.apache.org/jira/browse/CASSANDRA-342?focusedCommentId=12744001&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12744001

--
Jeff

On Fri, Jul 24, 2009 at 1:23 AM, Jeff Hodges<jeff@somethingsimilar.com> wrote:
> Hey,
>
> Getting Hadoop to play nice with Cassandra has been a desire for many
> folks on this list and probably more on the other one. For the
> purposes of this email, I'm going to restrict this goal of getting
> Hadoop to read from Cassandra in a not-stupid way.
>
> "Not-stupid" has a very specific meaning. "Not-stupid" means that:
>
> 1) Each Hadoop Mapper sees only a small subset of the entire desired
> dataset from Cassandra and that the entire desired dataset will never
> be seen by any one phase of the Hadoop process.
>
> 2) Every portion of the desired dataset will be unique to the Mapper
> it is delivered to. No two Mappers will ever overlap.
>
> 3) There will be no portion of the desired dataset that is not seen by a Mapper.
>
> 4) Partitioning of the dataset to the Mappers should try to
> efficiently use the Cassandra nodes. This means attempting to keep
> partitions on to one node solely.
>
> Conspicuously not on this list is data locality. That is, keeping the
> data passed from a node to a given Mapper at or near the same machine.
> This requires further investigation outside the scope of this initial
> project.
>
> Also, please remember that "not-stupid" is not the same as "smart".
>
> = How Hadoop Wants It And How It's Been Done By HBase
>
> I've dug around the Hadoop and HBase codebases and while my
> understanding is not yet perfect, this seems to be the general layout
> of the problem.
>
> First, a subclass of InputFormat needs to be written. This class's job
> is to split up the dataset for a Hadoop job into InputSplits, or more
> accurately, subclasses of InputSplits. These InputSplits are
> serialized to disk in HDFS in files named so that each is picked up by
> just one Mapper. Okay, actually the Mapper has no idea about
> InputSplits.
>
> A InputSplit is loaded up by on a machine running a Mapper, and then
> getRecordReader() is called on the subclass of InputFormat, and the
> Mapper's InputSplit is passed in as well as various hadoop job
> information. getRecordReader() returns a subclass of RecordReader that
> allows the Mapper to call next() on it over and over again to run
> through the portion of the dataset represented by InputSplit.
>
> In the 0.19.3 version of the HBase codebase, InputSplits are created
> by gathering all the start keys for each "region" (which conceptually
> maps, roughly, to a Cassandra node) in the database and divvying up
> the keys approximately evenly across the number of Mappers desired.
> Each TableSplits (the HBase subclass of InputSplit) created has
> information about the start key of the region, the end key of the
> region and the region location (the "name" of the node the dataset is
> on).
>
> This splitting on keys works because HBase keys are always stored in
> an ordered fashion. Basically, HBase always partitions using something
> akin to Cassandra's OrderPreservingPartitioner.
>
> I've posted a gist with just the method that does this divvying up, by
> the by[1]. (Interestingly, the region location seems to only be
> encoded to allow for a nice toString() method on InputSplit to be used
> during logging.)
>
> HBase's subclass of RecordReader uses an instance of the ClientScanner
> class to keep state about where they are in the Mapper's portion of
> the dataset. This ClientScanner queries HBase's META server (which
> contains information about all regions) with the start key and table
> name to gather what HBase region to talk to and caches that
> information. Note, that this is done for each Mapper on each machine.
> Note, too, that the region information encoded in the TableSplit
> doesn't seem to be used.
>
> These last few points are where the code gets really hairy. I could be
> wrong about some of it and corrections would be appreciated.
>
> The important parts, anyway, are the subclassing of InputFormat,
> RecordReader and InputSplit.
>
> = How To Do It With Cassandra (Maybe)
>
> So, I spoke to Jonathan on #cassandra about this and we tried to see
> how we could take the HBase method and turn it into something that
> would work with Cassandra.
>
> Our initial assumption is that the Cassandra database to be mapped
> over has to use the OrderPreservingPartitioner to keep the input
> splitting consistent.
>
> Now, Cassandra nodes don't really have a concept of a "start" and
> "end" key. We could, however, get a start key for a given node by
> taking the first key returned from SSTableReader#getIndexedKeys(). We
> would then gather up the start keys from each of the nodes, and sort
> them.
>
> We could then use each key in this gathered list as a start key for a
> "region" and, given an addition to the slice API, slice from start key
> to the next start key (the end key, in HBase terminology). We would
> need to modify the slice API to provide slices where the end key given
> is exclusive to the set returned, instead of inclusive.
>
> In terms of actual code to be written, our subclass of InputFormat is
> what would gather this list of start keys, and we would serialize the
> start/finish key pairs with our own subclass of InputSplit. And, of
> course, our subclass of RecordReader would make the
> finish-key-exclusive slice call.
>
> This method satisifies property 1 of our not-stupid definition. At no
> point in our Hadoop job are we accessing the entire dataset or even
> all of the keys (if I'm remembering how getIndexedKeys works
> correctly).
>
> This satisifies property 2 and 3 because we are clearly slicing
> everything once and only once unless I'm misremembering how
> replication works w.r.t. ordered partitioning. If I am misremembering
> and start keys are duplicated, we can just return a sorted set instead
> of a sorted array.
>
> This satisfies property 4 because we are slicing along the seams given
> us by the nodes themselves.
>
> = Back To The Game
>
> Right, so that's the first pass. What sucks about this? What rules
> about it? Questions?
>
> [1] http://gist.github.com/150217
> --
> Jeff
>

Mime
View raw message