hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alan Gates <ga...@yahoo-inc.com>
Subject Re: Split Indexes
Date Thu, 22 Jul 2010 17:54:44 GMT
Pig has implemented map side merge joins in this way.  If the storage  
mechanism contains an index (e.g. Zebra) it can use it.

Alan.

On Jul 21, 2010, at 5:22 PM, Deem, Mike wrote:

> We are planning to use Hadoop to run a number of recurring jobs that  
> involve map side joins.
>
> Rather than requiring that the joined datasets be partitioned into  
> separate part-* files, we are considering the following solution.  
> Our concerns with the partitioned approach include:
>
> ·         All the inputs may not have been partitioned the same. In  
> our environment we have a number of datasets that are produced and  
> consumed by a number of different jobs. We run production jobs and  
> experimental jobs that may or may not use exactly the same code,  
> etc. Generating a matched set of partitions before you can run a job  
> that involves joins requires extra work and is likely to be error  
> prone.
>
> ·         We run our jobs on clusters of varying sizes. We have  
> production and experimental clusters and make use of Amazon’s EMR  
> for ad-hoc job runs. Pre-partitioned data may not be well matched to  
> the size of the cluster on which a given job will be run.
>
> In our proposed solution, each input dataset will have an associated  
> split index. The split index could be computed as the dataset is  
> written or could be generated on the fly and cached.
>
> In either case, the split index is generated as outlined in the  
> following pseudo code:
>
>               previousSplitID = -1;
>               splitStartOffset = 0;
>                 For each key in the input {
>                                 hash = computeHash(key);
>                                 splitID = hash % MAX_SPLITS;
> if(previousSplitID != splitID) {
> if(previousSplitID != -1) {
> index.write(previousSplitID, splitStartOffset,  
> getCurrentOffset(input) – splitStartOffset);
>  }
> previousSplitID = splitID;
> splitStartOffset = getCurrentOffset(input);
>                                 }
> if(previousSplitID != -1) {
> index.write(previousSplitID, splitStartOffset,  
> getCurrentOffset(input) - splitStartOffset);
> }
>                 }
>
> When generating the splits for the join, each input’s split index  
> would be read to determine the splits for that input and the  
> corresponding splits processed together. Note that the actual number  
> of splits used for any given job doesn’t have to be MAX_SPLITS. If  
> MAX_SPLITS is large, multiple splits from the index could easily be  
> combined to create a number of splits matched to the cluster size.
>
> I would like to hear any thoughts you may have about this approach.  
> Has something similar already been implemented? Does this approach  
> raise any concerns we may not have thought of?
>
> Thanks,
>
>   == Mike ==
>
> Mike Deem
> mdeem@amazon.com
>


Mime
View raw message