hadoop-hive-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Namit Jain (JIRA)" <j...@apache.org>
Subject [jira] Commented: (HIVE-917) Bucketed Map Join
Date Fri, 05 Feb 2010 08:44:28 GMT

    [ https://issues.apache.org/jira/browse/HIVE-917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12830012#action_12830012
] 

Namit Jain commented on HIVE-917:
---------------------------------

BucketMapJoinOptimizer.java: 80
wrong comment:
    // process group-by pattern

can you add a correct comment ?



    // mapper. That means there is not reducer between the root table scan and

change to:

    // mapper. That means there is no reducer between the root table scan and



Add some comments in
    private boolean checkBucketColumns(List<String> bucketColumns, MapJoinDesc mjDesc,
int index) {


A mapjoin B where A is the big table and partitioned should be optimized
B is not partitioned
(assuming both A and B are bucketed)


          if(partNumber == 0) {
            Integer num = new Integer(0);
            bucketNumbers.add(num);
            aliasToBucketNumber.put(alias, num);
            aliasToBucketFileNames.put(alias, new ArrayList<String>());



no need to do this - anyway, the results are empty


ExecMapper:

    if(bucketMatcherCls == null) {
      bucketMatcherCls = org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class;
    }


Add the class name in mapredlocalwork and initialize it using reflection
Keep file name to file name mapping in mapredlocalwork (only useful for bucketed map join
- not for skew join)


MapredLocalWork:


  private LinkedHashMap<String, Integer> aliasToBucketNumber;
  private LinkedHashMap<String, List<String>> aliasToBucketFileNames;
  private String mapJoinBigTableAlias;
  private Class<? extends BucketMatcher> bucketMatcker;


create a new class for the above


  public Class<? extends BucketMatcher> getBucketMatcker() {
    return bucketMatcker;
  }

  public void setBucketMatcker(Class<? extends BucketMatcher> bucketMatcker) {
    this.bucketMatcker = bucketMatcker;
  }

spelling: should be Matcher


DefaultBucketMatcher:

  public List<Path> getAliasBucketFiles(String refTableInputFile, String refTableAlias,
String alias) {
    int bigTblBucketNum =  aliasToBucketNumber.get(refTableAlias);
    int smallTblBucketNum = aliasToBucketNumber.get(alias);
    Collections.sort(aliasToBucketFileNames.get(refTableAlias));
    Collections.sort(aliasToBucketFileNames.get(alias));

    List<Path> resultFileNames = new ArrayList<Path>();
    if (bigTblBucketNum >= smallTblBucketNum) {
      int temp = bigTblBucketNum / smallTblBucketNum;
      int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile);
      int toAddSmallIndex = index/temp;
      if(toAddSmallIndex < aliasToBucketFileNames.get(alias).size()) {
        resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(toAddSmallIndex)));
      }
    } else {
      int jump = smallTblBucketNum / bigTblBucketNum;
      int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile);
      for (int i = index; i < aliasToBucketFileNames.get(alias).size(); i = i + jump) {
        if(i <= aliasToBucketFileNames.get(alias).size()) {
          resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(i)));
        }
      }
    }
    return resultFileNames;
  }



move this to compile time and add some more comments



FetchOperator.java:

6 		 boolean ret = false;
				267 		try {
				268 		value = currRecReader.createValue();
				269 		ret = currRecReader.next(key, value);
				270 		} catch (Exception e) {
				271 		e.printStackTrace();
				272 		}

> Bucketed Map Join
> -----------------
>
>                 Key: HIVE-917
>                 URL: https://issues.apache.org/jira/browse/HIVE-917
>             Project: Hadoop Hive
>          Issue Type: New Feature
>            Reporter: Zheng Shao
>         Attachments: hive-917-2010-2-3.patch
>
>
> Hive already have support for map-join. Map-join treats the big table as job input, and
in each mapper, it loads all data from a small table.
> In case the big table is already bucketed on the join key, we don't have to load the
whole small table in each of the mappers. This will greatly alleviate the memory pressure,
and make map-join work with medium-sized tables.
> There are 4 steps we can improve:
> S0. This is what the user can already do now: create a new bucketed table and insert
all data from the small table to it; Submit BUCKETNUM jobs, each doing a map-side join of
"bigtable TABLEPARTITION(BUCKET i OUT OF NBUCKETS)" with "smallbucketedtable TABLEPARTITION(BUCKET
i OUT OF NBUCKETS)".
> S1. Change the code so that when map-join is loading the small table, we automatically
drop the rows with the keys that are NOT in the same bucket as the big table. This should
alleviate the problem on memory, but we might still have thousands of mappers reading the
whole of the small table.
> S2. Let's say the user already bucketed the small table on the join key into exactly
the same number of buckets (or a factor of the buckets of the big table), then map-join can
choose to load only the buckets that are useful.
> S3. Add a new hint (e.g. /*+ MAPBUCKETJOIN(a) */), so that Hive automatically does S2,
without the need of asking the user to create temporary bucketed table for the small table.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message