cassandra-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Hodges <>
Subject hadoop tasks reading from cassandra
Date Fri, 24 Jul 2009 08:23:48 GMT

Getting Hadoop to play nice with Cassandra has been a desire for many
folks on this list and probably more on the other one. For the
purposes of this email, I'm going to restrict this goal of getting
Hadoop to read from Cassandra in a not-stupid way.

"Not-stupid" has a very specific meaning. "Not-stupid" means that:

1) Each Hadoop Mapper sees only a small subset of the entire desired
dataset from Cassandra and that the entire desired dataset will never
be seen by any one phase of the Hadoop process.

2) Every portion of the desired dataset will be unique to the Mapper
it is delivered to. No two Mappers will ever overlap.

3) There will be no portion of the desired dataset that is not seen by a Mapper.

4) Partitioning of the dataset to the Mappers should try to
efficiently use the Cassandra nodes. This means attempting to keep
partitions on to one node solely.

Conspicuously not on this list is data locality. That is, keeping the
data passed from a node to a given Mapper at or near the same machine.
This requires further investigation outside the scope of this initial

Also, please remember that "not-stupid" is not the same as "smart".

= How Hadoop Wants It And How It's Been Done By HBase

I've dug around the Hadoop and HBase codebases and while my
understanding is not yet perfect, this seems to be the general layout
of the problem.

First, a subclass of InputFormat needs to be written. This class's job
is to split up the dataset for a Hadoop job into InputSplits, or more
accurately, subclasses of InputSplits. These InputSplits are
serialized to disk in HDFS in files named so that each is picked up by
just one Mapper. Okay, actually the Mapper has no idea about

A InputSplit is loaded up by on a machine running a Mapper, and then
getRecordReader() is called on the subclass of InputFormat, and the
Mapper's InputSplit is passed in as well as various hadoop job
information. getRecordReader() returns a subclass of RecordReader that
allows the Mapper to call next() on it over and over again to run
through the portion of the dataset represented by InputSplit.

In the 0.19.3 version of the HBase codebase, InputSplits are created
by gathering all the start keys for each "region" (which conceptually
maps, roughly, to a Cassandra node) in the database and divvying up
the keys approximately evenly across the number of Mappers desired.
Each TableSplits (the HBase subclass of InputSplit) created has
information about the start key of the region, the end key of the
region and the region location (the "name" of the node the dataset is

This splitting on keys works because HBase keys are always stored in
an ordered fashion. Basically, HBase always partitions using something
akin to Cassandra's OrderPreservingPartitioner.

I've posted a gist with just the method that does this divvying up, by
the by[1]. (Interestingly, the region location seems to only be
encoded to allow for a nice toString() method on InputSplit to be used
during logging.)

HBase's subclass of RecordReader uses an instance of the ClientScanner
class to keep state about where they are in the Mapper's portion of
the dataset. This ClientScanner queries HBase's META server (which
contains information about all regions) with the start key and table
name to gather what HBase region to talk to and caches that
information. Note, that this is done for each Mapper on each machine.
Note, too, that the region information encoded in the TableSplit
doesn't seem to be used.

These last few points are where the code gets really hairy. I could be
wrong about some of it and corrections would be appreciated.

The important parts, anyway, are the subclassing of InputFormat,
RecordReader and InputSplit.

= How To Do It With Cassandra (Maybe)

So, I spoke to Jonathan on #cassandra about this and we tried to see
how we could take the HBase method and turn it into something that
would work with Cassandra.

Our initial assumption is that the Cassandra database to be mapped
over has to use the OrderPreservingPartitioner to keep the input
splitting consistent.

Now, Cassandra nodes don't really have a concept of a "start" and
"end" key. We could, however, get a start key for a given node by
taking the first key returned from SSTableReader#getIndexedKeys(). We
would then gather up the start keys from each of the nodes, and sort

We could then use each key in this gathered list as a start key for a
"region" and, given an addition to the slice API, slice from start key
to the next start key (the end key, in HBase terminology). We would
need to modify the slice API to provide slices where the end key given
is exclusive to the set returned, instead of inclusive.

In terms of actual code to be written, our subclass of InputFormat is
what would gather this list of start keys, and we would serialize the
start/finish key pairs with our own subclass of InputSplit. And, of
course, our subclass of RecordReader would make the
finish-key-exclusive slice call.

This method satisifies property 1 of our not-stupid definition. At no
point in our Hadoop job are we accessing the entire dataset or even
all of the keys (if I'm remembering how getIndexedKeys works

This satisifies property 2 and 3 because we are clearly slicing
everything once and only once unless I'm misremembering how
replication works w.r.t. ordered partitioning. If I am misremembering
and start keys are duplicated, we can just return a sorted set instead
of a sorted array.

This satisfies property 4 because we are slicing along the seams given
us by the nodes themselves.

= Back To The Game

Right, so that's the first pass. What sucks about this? What rules
about it? Questions?


View raw message