incubator-cassandra-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <>
Subject Re: hadoop tasks reading from cassandra
Date Fri, 24 Jul 2009 16:08:13 GMT


This looks like a great start. A few comments.

1. In addition to OrderPreservingPartitioner, it would be useful to support
MapReduce on RandomPartitioned Cassandra as well. We had a rough prototype
that sort-of works at this moment. The difficulty with random partitioner
is that it's a bit hard to generate the splits. In our prototype, we simply
map each row to a split. This is ok for fat rows (e.g., a row includes all
info for a user), but may be too fine-grained for other cases. Another
possibility is to generate a split that corresponds to a set of rows in a
hash-range (instead of key range). This requires some new apis in

2. For better performance, in the future, it would be useful to expose and
exploit data locality in cassandra so that a map task is executed on a
cassandra node that owns the data locally. A related issue is It breaks
encapsulation, but it's worth thinking about. Google's DFS and Bigtable
both expose certain locality info for better performance.

IBM Almaden Research Center
K55/B1, 650 Harry Road, San Jose, CA  95120-6099

Jeff Hodges <> wrote on 07/24/2009 01:23:48 AM:

> Hey,
> Getting Hadoop to play nice with Cassandra has been a desire for many
> folks on this list and probably more on the other one. For the
> purposes of this email, I'm going to restrict this goal of getting
> Hadoop to read from Cassandra in a not-stupid way.
> "Not-stupid" has a very specific meaning. "Not-stupid" means that:
> 1) Each Hadoop Mapper sees only a small subset of the entire desired
> dataset from Cassandra and that the entire desired dataset will never
> be seen by any one phase of the Hadoop process.
> 2) Every portion of the desired dataset will be unique to the Mapper
> it is delivered to. No two Mappers will ever overlap.
> 3) There will be no portion of the desired dataset that is not seen
> by a Mapper.
> 4) Partitioning of the dataset to the Mappers should try to
> efficiently use the Cassandra nodes. This means attempting to keep
> partitions on to one node solely.
> Conspicuously not on this list is data locality. That is, keeping the
> data passed from a node to a given Mapper at or near the same machine.
> This requires further investigation outside the scope of this initial
> project.
> Also, please remember that "not-stupid" is not the same as "smart".
> = How Hadoop Wants It And How It's Been Done By HBase
> I've dug around the Hadoop and HBase codebases and while my
> understanding is not yet perfect, this seems to be the general layout
> of the problem.
> First, a subclass of InputFormat needs to be written. This class's job
> is to split up the dataset for a Hadoop job into InputSplits, or more
> accurately, subclasses of InputSplits. These InputSplits are
> serialized to disk in HDFS in files named so that each is picked up by
> just one Mapper. Okay, actually the Mapper has no idea about
> InputSplits.
> A InputSplit is loaded up by on a machine running a Mapper, and then
> getRecordReader() is called on the subclass of InputFormat, and the
> Mapper's InputSplit is passed in as well as various hadoop job
> information. getRecordReader() returns a subclass of RecordReader that
> allows the Mapper to call next() on it over and over again to run
> through the portion of the dataset represented by InputSplit.
> In the 0.19.3 version of the HBase codebase, InputSplits are created
> by gathering all the start keys for each "region" (which conceptually
> maps, roughly, to a Cassandra node) in the database and divvying up
> the keys approximately evenly across the number of Mappers desired.
> Each TableSplits (the HBase subclass of InputSplit) created has
> information about the start key of the region, the end key of the
> region and the region location (the "name" of the node the dataset is
> on).
> This splitting on keys works because HBase keys are always stored in
> an ordered fashion. Basically, HBase always partitions using something
> akin to Cassandra's OrderPreservingPartitioner.
> I've posted a gist with just the method that does this divvying up, by
> the by[1]. (Interestingly, the region location seems to only be
> encoded to allow for a nice toString() method on InputSplit to be used
> during logging.)
> HBase's subclass of RecordReader uses an instance of the ClientScanner
> class to keep state about where they are in the Mapper's portion of
> the dataset. This ClientScanner queries HBase's META server (which
> contains information about all regions) with the start key and table
> name to gather what HBase region to talk to and caches that
> information. Note, that this is done for each Mapper on each machine.
> Note, too, that the region information encoded in the TableSplit
> doesn't seem to be used.
> These last few points are where the code gets really hairy. I could be
> wrong about some of it and corrections would be appreciated.
> The important parts, anyway, are the subclassing of InputFormat,
> RecordReader and InputSplit.
> = How To Do It With Cassandra (Maybe)
> So, I spoke to Jonathan on #cassandra about this and we tried to see
> how we could take the HBase method and turn it into something that
> would work with Cassandra.
> Our initial assumption is that the Cassandra database to be mapped
> over has to use the OrderPreservingPartitioner to keep the input
> splitting consistent.
> Now, Cassandra nodes don't really have a concept of a "start" and
> "end" key. We could, however, get a start key for a given node by
> taking the first key returned from SSTableReader#getIndexedKeys(). We
> would then gather up the start keys from each of the nodes, and sort
> them.
> We could then use each key in this gathered list as a start key for a
> "region" and, given an addition to the slice API, slice from start key
> to the next start key (the end key, in HBase terminology). We would
> need to modify the slice API to provide slices where the end key given
> is exclusive to the set returned, instead of inclusive.
> In terms of actual code to be written, our subclass of InputFormat is
> what would gather this list of start keys, and we would serialize the
> start/finish key pairs with our own subclass of InputSplit. And, of
> course, our subclass of RecordReader would make the
> finish-key-exclusive slice call.
> This method satisifies property 1 of our not-stupid definition. At no
> point in our Hadoop job are we accessing the entire dataset or even
> all of the keys (if I'm remembering how getIndexedKeys works
> correctly).
> This satisifies property 2 and 3 because we are clearly slicing
> everything once and only once unless I'm misremembering how
> replication works w.r.t. ordered partitioning. If I am misremembering
> and start keys are duplicated, we can just return a sorted set instead
> of a sorted array.
> This satisfies property 4 because we are slicing along the seams given
> us by the nodes themselves.
> = Back To The Game
> Right, so that's the first pass. What sucks about this? What rules
> about it? Questions?
> [1]
> --
> Jeff
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message