hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Deem, Mike" <md...@amazon.com>
Subject Split Indexes
Date Thu, 22 Jul 2010 00:22:52 GMT
We are planning to use Hadoop to run a number of recurring jobs that involve map side joins.

Rather than requiring that the joined datasets be partitioned into separate part-* files,
we are considering the following solution. Our concerns with the partitioned approach include:

*         All the inputs may not have been partitioned the same. In our environment we have
a number of datasets that are produced and consumed by a number of different jobs. We run
production jobs and experimental jobs that may or may not use exactly the same code, etc.
Generating a matched set of partitions before you can run a job that involves joins requires
extra work and is likely to be error prone.

*         We run our jobs on clusters of varying sizes. We have production and experimental
clusters and make use of Amazon's EMR for ad-hoc job runs. Pre-partitioned data may not be
well matched to the size of the cluster on which a given job will be run.

In our proposed solution, each input dataset will have an associated split index. The split
index could be computed as the dataset is written or could be generated on the fly and cached.

In either case, the split index is generated as outlined in the following pseudo code:

              previousSplitID = -1;
              splitStartOffset = 0;
                For each key in the input {
                                hash = computeHash(key);
                                splitID = hash % MAX_SPLITS;
if(previousSplitID != splitID) {
if(previousSplitID != -1) {
index.write(previousSplitID, splitStartOffset, getCurrentOffset(input) - splitStartOffset);
previousSplitID = splitID;
splitStartOffset = getCurrentOffset(input);
if(previousSplitID != -1) {
index.write(previousSplitID, splitStartOffset, getCurrentOffset(input) - splitStartOffset);

When generating the splits for the join, each input's split index would be read to determine
the splits for that input and the corresponding splits processed together. Note that the actual
number of splits used for any given job doesn't have to be MAX_SPLITS. If MAX_SPLITS is large,
multiple splits from the index could easily be combined to create a number of splits matched
to the cluster size.

I would like to hear any thoughts you may have about this approach. Has something similar
already been implemented? Does this approach raise any concerns we may not have thought of?


  == Mike ==

Mike Deem

View raw message