hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r910755 [1/3] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org...
Date Tue, 16 Feb 2010 23:09:08 GMT
Author: namit
Date: Tue Feb 16 23:09:07 2010
New Revision: 910755

URL: http://svn.apache.org/viewvc?rev=910755&view=rev
Log:
HIVE-917. Bucketed Map Join
(He Yongqiang via namit)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin1.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin3.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin4.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Feb 16 23:09:07 2010
@@ -24,6 +24,9 @@
     HIVE-1117. Make queryPlan serializable
     (Zheng Shao via namit)
 
+    HIVE-917. Bucketed Map Join
+    (He Yongqiang via namit)
+
   IMPROVEMENTS
 
     HIVE-983. Function from_unixtime takes long.

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Feb 16 23:09:07 2010
@@ -192,7 +192,9 @@
     // Optimizer
     HIVEOPTCP("hive.optimize.cp", true), // column pruner
     HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
-    HIVEOPTGROUPBY("hive.optimize.groupby", true); // optimize group by
+    HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
+    HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
+    ;
 
     public final String varname;
     public final String defaultVal;

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java?rev=910755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/BucketMatcher.java Tue Feb 16 23:09:07 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+
+public interface BucketMatcher {
+  
+  public List<Path> getAliasBucketFiles(String currentInputFile, String refTableAlias, String alias);
+  
+  public void setAliasBucketFileNameMapping(
+      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping);
+
+}
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java?rev=910755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultBucketMatcher.java Tue Feb 16 23:09:07 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+
+public class DefaultBucketMatcher implements BucketMatcher {
+  
+  protected Log LOG = LogFactory.getLog(this.getClass().getName());
+
+  //MAPPING: bucket_file_name_in_big_tble->{alias_table->corresonding_bucket_file_names}
+  private LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketMapping;
+  
+  public DefaultBucketMatcher(){
+  }
+
+  public List<Path> getAliasBucketFiles(String refTableInputFile, String refTableAlias, String alias) {
+    List<String> pathStr=aliasBucketMapping.get(alias).get(refTableInputFile);
+    List<Path> paths = new ArrayList<Path>();
+    if(pathStr!=null) {
+      for (String p : pathStr) {
+        LOG.info("Loading file " + p + " for " + alias + ". (" + refTableInputFile + ")");
+        paths.add(new Path(p));
+      }
+    }
+    return paths;
+  }
+  
+  public void setAliasBucketFileNameMapping(
+      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping) {
+    this.aliasBucketMapping = aliasBucketFileNameMapping;
+  }
+  
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Tue Feb 16 23:09:07 2010
@@ -25,13 +25,18 @@
 import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
@@ -40,6 +45,7 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * ExecMapper.
@@ -60,6 +66,8 @@
   private MemoryMXBean memoryMXBean;
   private long numRows = 0;
   private long nextCntr = 1;
+  private String lastInputFile = null;
+  private MapredLocalWork localWork = null;
 
   @Override
   public void configure(JobConf job) {
@@ -88,7 +96,7 @@
       mo.initialize(jc, null);
 
       // initialize map local work
-      MapredLocalWork localWork = mrwork.getMapLocalWork();
+      localWork = mrwork.getMapLocalWork();
       if (localWork == null) {
         return;
       }
@@ -131,54 +139,15 @@
       rp = reporter;
       mo.setOutputCollector(oc);
       mo.setReporter(rp);
-      // process map local operators
-      if (fetchOperators != null) {
-        try {
-          MapredLocalWork localWork = mo.getConf().getMapLocalWork();
-          int fetchOpNum = 0;
-          for (Map.Entry<String, FetchOperator> entry : fetchOperators
-              .entrySet()) {
-            int fetchOpRows = 0;
-            String alias = entry.getKey();
-            FetchOperator fetchOp = entry.getValue();
-            Operator<? extends Serializable> forwardOp = localWork
-                .getAliasToWork().get(alias);
-
-            while (true) {
-              InspectableObject row = fetchOp.getNextRow();
-              if (row == null) {
-                forwardOp.close(false);
-                break;
-              }
-              fetchOpRows++;
-              forwardOp.process(row.o, 0);
-              // check if any operator had a fatal error or early exit during
-              // execution
-              if (forwardOp.getDone()) {
-                done = true;
-                break;
-              }
-            }
-
-            if (l4j.isInfoEnabled()) {
-              l4j
-                  .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows
-                  + " used mem: "
-                  + memoryMXBean.getHeapMemoryUsage().getUsed());
-            }
-          }
-        } catch (Throwable e) {
-          abort = true;
-          if (e instanceof OutOfMemoryError) {
-            // Don't create a new object if we are already out of memory
-            throw (OutOfMemoryError) e;
-          } else {
-            throw new RuntimeException("Map local work failed", e);
-          }
-        }
-      }
     }
-
+    
+    if (localWork != null
+        && (this.lastInputFile == null || 
+            (localWork.getInputFileChangeSensitive() && inputFileChanged()))) {
+      this.lastInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME);
+      processMapLocalWork(localWork.getInputFileChangeSensitive());      
+    }
+    
     try {
       if (mo.getDone()) {
         done = true;
@@ -208,6 +177,96 @@
     }
   }
 
+  /**
+   * For CompbineFileInputFormat, the mapper's input file will be changed on the
+   * fly. If the map local work has any mapping depending on the current
+   * mapper's input file, the work need to clear context and re-initialization
+   * after the input file changed. This is first introduced to process bucket
+   * map join.
+   * 
+   * @return
+   */
+  private boolean inputFileChanged() {
+    String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME);
+    if (this.lastInputFile == null
+        || !this.lastInputFile.equals(currentInputFile)) {
+      return true;
+    }
+    return false;
+  }
+
+  private void processMapLocalWork(boolean inputFileChangeSenstive) {
+    // process map local operators
+    if (fetchOperators != null) {
+      try {
+        int fetchOpNum = 0;
+        for (Map.Entry<String, FetchOperator> entry : fetchOperators
+            .entrySet()) {
+          int fetchOpRows = 0;
+          String alias = entry.getKey();
+          FetchOperator fetchOp = entry.getValue();
+          
+          if(inputFileChangeSenstive) {
+            fetchOp.clearFetchContext();
+            setUpFetchOpContext(fetchOp, alias);
+          }
+          
+          Operator<? extends Serializable> forwardOp = localWork
+              .getAliasToWork().get(alias);
+
+          while (true) {
+            InspectableObject row = fetchOp.getNextRow();
+            if (row == null) {
+              forwardOp.close(false);
+              break;
+            }
+            fetchOpRows++;
+            forwardOp.process(row.o, 0);
+            // check if any operator had a fatal error or early exit during
+            // execution
+            if (forwardOp.getDone()) {
+              done = true;
+              break;
+            }
+          }
+
+          if (l4j.isInfoEnabled()) {
+            l4j
+                .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows
+                    + " used mem: "
+                    + memoryMXBean.getHeapMemoryUsage().getUsed());
+          }
+        }
+      } catch (Throwable e) {
+        abort = true;
+        if (e instanceof OutOfMemoryError) {
+          // Don't create a new object if we are already out of memory
+          throw (OutOfMemoryError) e;
+        } else {
+          throw new RuntimeException("Map local work failed", e);
+        }
+      }
+    }
+  }
+  
+  private void setUpFetchOpContext(FetchOperator fetchOp, String alias)
+      throws Exception {
+    String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME);
+    BucketMapJoinContext bucketMatcherCxt = this.localWork.getBucketMapjoinContext();
+    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass();
+    if(bucketMatcherCls == null) {
+      bucketMatcherCls = org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class;
+    }
+    BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(bucketMatcherCls, null);
+    bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt.getAliasBucketFileNameMapping());
+    List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile,
+        bucketMatcherCxt.getMapJoinBigTableAlias(),
+        alias);
+    Iterator<Path> iter = aliasFiles.iterator();
+    fetchOp.setupContext(iter, null);
+  }
+  
+
   private long getNextCntr(long cntr) {
     // A very simple counter to keep track of number of rows processed by the
     // reducer. It dumps

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Tue Feb 16 23:09:07 2010
@@ -104,7 +104,8 @@
       if (isPrintable(ent.getValue())) {
         out.print(ent.getValue());
         out.println();
-      } else if (ent.getValue() instanceof List) {
+      } else if (ent.getValue() instanceof List
+          || ent.getValue() instanceof Map) {
         out.print(ent.getValue().toString());
         out.println();
       } else if (ent.getValue() instanceof Serializable) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Feb 16 23:09:07 2010
@@ -222,14 +222,18 @@
 
     while (iterPath.hasNext()) {
       Path nxt = iterPath.next();
-      PartitionDesc prt = iterPartDesc.next();
+      PartitionDesc prt = null;
+      if(iterPartDesc != null)
+        prt = iterPartDesc.next();
       FileSystem fs = nxt.getFileSystem(job);
       if (fs.exists(nxt)) {
         FileStatus[] fStats = fs.listStatus(nxt);
         for (FileStatus fStat : fStats) {
           if (fStat.getLen() > 0) {
             currPath = nxt;
-            currPart = prt;
+            if(iterPartDesc != null) {
+              currPart = prt;
+            }
             return;
           }
         }
@@ -264,7 +268,7 @@
       LOG.debug("Creating fetchTask with deserializer typeinfo: "
           + serde.getObjectInspector().getTypeName());
       LOG.debug("deserializer properties: " + tmp.getProperties());
-      if (!tblDataDone) {
+      if (currPart != null) {
         setPrtnDesc();
       }
     }
@@ -298,10 +302,10 @@
             return null;
           }
         }
-  
+
         boolean ret = currRecReader.next(key, value);
         if (ret) {
-          if (tblDataDone) {
+          if (this.currPart == null) {
             Object obj = serde.deserialize(value);
             return new InspectableObject(obj, serde.getObjectInspector());
           } else {
@@ -317,7 +321,7 @@
       throw new IOException(e);
     }
   }
-
+  
   /**
    * Clear the context, if anything needs to be done.
    * 
@@ -328,11 +332,32 @@
         currRecReader.close();
         currRecReader = null;
       }
+      this.currPath = null;
+      this.iterPath = null;
+      this.iterPartDesc = null;
     } catch (Exception e) {
       throw new HiveException("Failed with exception " + e.getMessage()
           + org.apache.hadoop.util.StringUtils.stringifyException(e));
     }
   }
+  
+  /**
+   * used for bucket map join. there is a hack for getting partitionDesc. 
+   * bucket map join right now only allow one partition present in bucket map join. 
+   */
+  public void setupContext (Iterator<Path> iterPath, Iterator<PartitionDesc> iterPartDesc) {
+    this.iterPath = iterPath;
+    this.iterPartDesc = iterPartDesc;
+    if(iterPartDesc == null) {
+      if (work.getTblDir() != null) {
+        this.currTbl = work.getTblDesc();
+      } else {
+        //hack, get the first.
+        List<PartitionDesc> listParts = work.getPartDesc();
+        currPart = listParts.get(0);
+      }
+    }
+  }
 
   public ObjectInspector getOutputObjectInspector() throws HiveException {
     try {

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=910755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Tue Feb 16 23:09:07 2010
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+
+/**
+ *this transformation does bucket map join optimization.
+ */
+public class BucketMapJoinOptimizer implements Transform {
+  
+  private static final Log LOG = LogFactory.getLog(GroupByOptimizer.class
+      .getName());
+
+  public BucketMapJoinOptimizer() {
+  }
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx = new BucketMapjoinOptProcCtx();
+
+    // process map joins with no reducers pattern
+    opRules.put(new RuleRegExp("R1", "MAPJOIN%"), getBucketMapjoinProc(pctx));
+    opRules.put(new RuleRegExp("R2", "RS%.*MAPJOIN"), getBucketMapjoinRejectProc(pctx));
+    opRules.put(new RuleRegExp(new String("R3"), "UNION%.*MAPJOIN%"),
+        getBucketMapjoinRejectProc(pctx));
+    opRules.put(new RuleRegExp(new String("R4"), "MAPJOIN%.*MAPJOIN%"),
+        getBucketMapjoinRejectProc(pctx));
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules,
+        bucketMapJoinOptimizeCtx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    return pctx;
+  }
+
+  private NodeProcessor getBucketMapjoinRejectProc(ParseContext pctx) {
+    return new NodeProcessor () {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
+        MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
+        BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
+        context.listOfRejectedMapjoins.add(mapJoinOp);
+        return null;
+      }
+    };
+  }
+
+  private NodeProcessor getBucketMapjoinProc(ParseContext pctx) {
+    return new BucketMapjoinOptProc(pctx);
+  }
+
+  private NodeProcessor getDefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
+        return null;
+      }
+    };
+  }
+  
+  class BucketMapjoinOptProc implements NodeProcessor {
+    
+    protected ParseContext pGraphContext;
+    
+    public BucketMapjoinOptProc(ParseContext pGraphContext) {
+      super();
+      this.pGraphContext = pGraphContext;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
+      BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
+
+      if(context.getListOfRejectedMapjoins().contains(mapJoinOp))
+        return null;
+      
+      QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext().get(mapJoinOp);
+      if(joinCxt == null)
+        return null;
+      
+      List<String> joinAliases = new ArrayList<String>();
+      String[] srcs = joinCxt.getBaseSrc();
+      String[] left = joinCxt.getLeftAliases();
+      List<String> mapAlias = joinCxt.getMapAliases();
+      String baseBigAlias = null;
+      for(String s : left) {
+        if(s != null && !joinAliases.contains(s)) {
+          joinAliases.add(s);
+          if(!mapAlias.contains(s)) {
+            baseBigAlias = s;
+          }
+        }
+      }
+      for(String s : srcs) {
+        if(s != null && !joinAliases.contains(s)) {
+          joinAliases.add(s);
+          if(!mapAlias.contains(s)) {
+            baseBigAlias = s;
+          }
+        }
+      }
+      
+      MapJoinDesc mjDecs = mapJoinOp.getConf();
+      LinkedHashMap<String, Integer> aliasToBucketNumber = new LinkedHashMap<String, Integer>();
+      LinkedHashMap<String, List<String>> aliasToBucketFileNames = new LinkedHashMap<String, List<String>>();
+      // right now this code does not work with "a join b on a.key = b.key and
+      // a.ds = b.ds", where ds is a partition column. It only works with joins
+      // with only one partition presents in each join source tables.
+      Map<String, Operator<? extends Serializable>> topOps = this.pGraphContext.getTopOps();
+      Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();
+      
+      List<Integer> bucketNumbers = new ArrayList<Integer>();
+      for (int index = 0; index < joinAliases.size(); index++) {
+        String alias = joinAliases.get(index);
+        TableScanOperator tso = (TableScanOperator) topOps.get(alias);
+        Table tbl = topToTable.get(tso);
+        if(tbl.isPartitioned()) {
+          PrunedPartitionList prunedParts = null;
+          try {
+            prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso), pGraphContext.getConf(), alias,
+                pGraphContext.getPrunedPartitions());
+          } catch (HiveException e) {
+            // Has to use full name to make sure it does not conflict with
+            // org.apache.commons.lang.StringUtils
+            LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+            throw new SemanticException(e.getMessage(), e);
+          }
+          int partNumber = prunedParts.getConfirmedPartns().size()
+              + prunedParts.getUnknownPartns().size();
+          if(partNumber > 1)
+            return null;
+
+          Partition part = null;
+          Iterator<Partition> iter = prunedParts.getConfirmedPartns()
+              .iterator();
+          while (iter.hasNext())
+            part = iter.next();
+          if (part == null) {
+            iter = prunedParts.getUnknownPartns().iterator();
+            while (iter.hasNext())
+              part = iter.next();
+          }
+
+          assert part != null;
+
+          if (!checkBucketColumns(part.getBucketCols(), mjDecs, index))
+            return null;
+
+          Integer num = new Integer(part.getBucketCount());
+          aliasToBucketNumber.put(alias, num);
+          List<String> fileNames = new ArrayList<String>();
+          try {
+            FileSystem fs = FileSystem.get(this.pGraphContext.getConf());
+            FileStatus[] files = fs.listStatus(new Path(part.getDataLocation()
+                .toString()));
+            if (files != null) {
+              for (FileStatus file : files) {
+                fileNames.add(file.getPath().toString());
+              }
+            }
+          } catch (IOException e) {
+            throw new SemanticException(e);
+          }
+          aliasToBucketFileNames.put(alias, fileNames);
+        } else {
+          if (!checkBucketColumns(tbl.getBucketCols(), mjDecs, index))
+            return null;
+          Integer num = new Integer(tbl.getNumBuckets());
+          aliasToBucketNumber.put(alias, num);
+          List<String> fileNames = new ArrayList<String>();
+          try {
+            FileSystem fs = FileSystem.get(this.pGraphContext.getConf());
+            FileStatus[] files = fs.listStatus(new Path(tbl.getDataLocation().toString()));
+            if(files != null) {
+              for(FileStatus file : files) {
+                fileNames.add(file.getPath().toString());
+              }
+            }
+          } catch (IOException e) {
+            throw new SemanticException(e);
+          }
+          aliasToBucketFileNames.put(alias, fileNames);
+        }
+      }
+      
+      // All tables or partitions are bucketed, and their bucket number is
+      // stored in 'bucketNumbers', we need to check if the number of buckets in
+      // the big table can be divided by no of buckets in small tables.
+      int bucketNoInBigTbl = aliasToBucketNumber.get(baseBigAlias).intValue();
+      Iterator<Integer> iter = aliasToBucketNumber.values().iterator();
+      while(iter.hasNext()) {
+        int nxt = iter.next().intValue();
+        boolean ok = (nxt >= bucketNoInBigTbl) ? nxt % bucketNoInBigTbl == 0
+            : bucketNoInBigTbl % nxt == 0;
+        if(!ok)
+          return null;
+      }
+      MapJoinDesc desc = mapJoinOp.getConf();
+      
+      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping = 
+        new LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>>();
+      
+      int bigTblBucketNum =  aliasToBucketNumber.get(baseBigAlias);
+      Collections.sort(aliasToBucketFileNames.get(baseBigAlias));
+      for (int j = 0; j < joinAliases.size(); j++) {
+        String alias = joinAliases.get(j);
+        if(alias.equals(baseBigAlias))
+          continue;
+        Collections.sort(aliasToBucketFileNames.get(alias));
+        LinkedHashMap<String, ArrayList<String>> mapping = new LinkedHashMap<String, ArrayList<String>>();
+        aliasBucketFileNameMapping.put(alias, mapping);
+        for(String inputBigTBLBucket : aliasToBucketFileNames.get(baseBigAlias)) {
+          int smallTblBucketNum = aliasToBucketNumber.get(alias);
+          ArrayList<String> resultFileNames = new ArrayList<String>();
+          int index = aliasToBucketFileNames.get(baseBigAlias).indexOf(inputBigTBLBucket);
+          if (bigTblBucketNum >= smallTblBucketNum) {
+            int temp = bigTblBucketNum / smallTblBucketNum;
+            int toAddSmallIndex = index/temp;
+            if(toAddSmallIndex < aliasToBucketFileNames.get(alias).size()) {
+              resultFileNames.add(aliasToBucketFileNames.get(alias).get(toAddSmallIndex));
+            }
+          } else {
+            int jump = smallTblBucketNum / bigTblBucketNum;
+            for (int i = index; i < aliasToBucketFileNames.get(alias).size(); i = i + jump) {
+              if(i <= aliasToBucketFileNames.get(alias).size()) {
+                resultFileNames.add(aliasToBucketFileNames.get(alias).get(i));
+              }
+            }
+          }
+          mapping.put(inputBigTBLBucket, resultFileNames);
+        }
+      }
+      desc.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+      desc.setBigTableAlias(baseBigAlias);
+      return null;
+    }
+    
+    private boolean checkBucketColumns(List<String> bucketColumns, MapJoinDesc mjDesc, int index) {
+      List<ExprNodeDesc> keys = mjDesc.getKeys().get((byte)index);
+      if (keys == null || bucketColumns == null || bucketColumns.size() == 0)
+        return false;
+      
+      //get all join columns from join keys stored in MapJoinDesc
+      List<String> joinCols = new ArrayList<String>();
+      List<ExprNodeDesc> joinKeys = new ArrayList<ExprNodeDesc>();
+      joinKeys.addAll(keys);
+      while (joinKeys.size() > 0) {
+        ExprNodeDesc node = joinKeys.remove(0);
+        if (node instanceof ExprNodeColumnDesc) {
+          joinCols.addAll(node.getCols());
+        } else if (node instanceof ExprNodeGenericFuncDesc) {
+          ExprNodeGenericFuncDesc udfNode = ((ExprNodeGenericFuncDesc) node);
+          GenericUDF udf = udfNode.getGenericUDF();
+          if (!FunctionRegistry.isDeterministic(udf)) {
+            return false;
+          }
+          joinKeys.addAll(0, udfNode.getChildExprs());
+        } else {
+          return false;
+        }
+      }
+
+      // to see if the join columns from a table is exactly this same as its
+      // bucket columns 
+      if (joinCols.size() == 0 || joinCols.size() != bucketColumns.size()) {
+        return false;
+      }
+      
+      for (String col : joinCols) {
+        if (!bucketColumns.contains(col))
+          return false;
+      }
+      
+      return true;
+    }
+    
+  }
+  
+  class BucketMapjoinOptProcCtx implements NodeProcessorCtx {
+    // we only convert map joins that follows a root table scan in the same
+    // mapper. That means there is no reducer between the root table scan and
+    // mapjoin.
+    Set<MapJoinOperator> listOfRejectedMapjoins = new HashSet<MapJoinOperator>();
+    
+    public Set<MapJoinOperator> getListOfRejectedMapjoins() {
+      return listOfRejectedMapjoins;
+    }
+    
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Feb 16 23:09:07 2010
@@ -60,6 +60,7 @@
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -194,6 +195,7 @@
         }
 
         setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
+        setupBucketMapJoinInfo(plan, currMapJoinOp);
       } else {
         initUnionPlan(opProcCtx, currTask, false);
       }
@@ -215,6 +217,7 @@
       seenOps.add(currTopOp);
       boolean local = (pos == desc.getPosBigTable()) ? false : true;
       setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+      setupBucketMapJoinInfo(plan, (MapJoinOperator)op);
     }
 
     opProcCtx.setCurrTask(currTask);
@@ -222,6 +225,22 @@
     opProcCtx.setCurrAliasId(null);
   }
 
+  private static void setupBucketMapJoinInfo(MapredWork plan,
+      MapJoinOperator currMapJoinOp) {
+    MapredLocalWork localPlan = plan.getMapLocalWork();
+    if (localPlan != null && currMapJoinOp != null) {
+      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping = 
+        currMapJoinOp.getConf().getAliasBucketFileNameMapping();
+      if(aliasBucketFileNameMapping!= null) {
+        BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
+        localPlan.setBucketMapjoinContext(bucketMJCxt);
+        bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+        localPlan.setInputFileChangeSensitive(true);
+        bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
+      }
+    }
+  }
+
   /**
    * Initialize the current union plan.
    * 
@@ -370,6 +389,7 @@
         boolean local = ((pos == -1) || (pos == (mjOp.getConf())
             .getPosBigTable())) ? false : true;
         setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
+        setupBucketMapJoinInfo(plan, oldMapJoin);
       }
       opProcCtx.setCurrMapJoinOp(null);
 
@@ -806,6 +826,7 @@
       opProcCtx.setMapJoinCtx(mjOp, mjCtx);
       opProcCtx.getMapCurrCtx().put(parent,
           new GenMapRedCtx(childTask, null, null));
+      setupBucketMapJoinInfo(cplan, mjOp);
     }
 
     currTopOp = null;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Tue Feb 16 23:09:07 2010
@@ -367,6 +367,11 @@
     // traverse all the joins and convert them if necessary
     if (pGraphContext.getJoinContext() != null) {
       Map<JoinOperator, QBJoinTree> joinMap = new HashMap<JoinOperator, QBJoinTree>();
+      Map<MapJoinOperator, QBJoinTree> mapJoinMap = pGraphContext.getMapJoinContext();
+      if(mapJoinMap == null) {
+        mapJoinMap = new HashMap<MapJoinOperator, QBJoinTree> ();
+        pGraphContext.setMapJoinContext(mapJoinMap);
+      }
 
       Set<Map.Entry<JoinOperator, QBJoinTree>> joinCtx = pGraphContext
           .getJoinContext().entrySet();
@@ -378,7 +383,9 @@
         QBJoinTree qbJoin = joinEntry.getValue();
         int mapJoinPos = mapSideJoin(joinOp, qbJoin);
         if (mapJoinPos >= 0) {
-          listMapJoinOps.add(convertMapJoin(pactx, joinOp, qbJoin, mapJoinPos));
+          MapJoinOperator mapJoinOp = convertMapJoin(pactx, joinOp, qbJoin, mapJoinPos);
+          listMapJoinOps.add(mapJoinOp);
+          mapJoinMap.put(mapJoinOp, qbJoin);
         } else {
           joinMap.put(joinOp, qbJoin);
         }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Feb 16 23:09:07 2010
@@ -56,6 +56,9 @@
 
     transformations.add(new SamplePruner());
     transformations.add(new MapJoinProcessor());
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
+      transformations.add(new BucketMapJoinOptimizer());
+    }
     transformations.add(new UnionProcessor());
     transformations.add(new JoinReorder());
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Tue Feb 16 23:09:07 2010
@@ -58,6 +58,7 @@
   private HashMap<String, Operator<? extends Serializable>> topSelOps;
   private LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx;
   private Map<JoinOperator, QBJoinTree> joinContext;
+  private Map<MapJoinOperator, QBJoinTree> mapJoinContext;
   private HashMap<TableScanOperator, Table> topToTable;
   private List<LoadTableDesc> loadTableWork;
   private List<LoadFileDesc> loadFileWork;
@@ -439,4 +440,12 @@
       Map<String, PrunedPartitionList> prunedPartitions) {
     this.prunedPartitions = prunedPartitions;
   }
+
+  public Map<MapJoinOperator, QBJoinTree> getMapJoinContext() {
+    return mapJoinContext;
+  }
+
+  public void setMapJoinContext(Map<MapJoinOperator, QBJoinTree> mapJoinContext) {
+    this.mapJoinContext = mapJoinContext;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Tue Feb 16 23:09:07 2010
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -42,6 +43,10 @@
   private int posBigTable;
 
   private Map<Byte, List<Integer>> retainList;
+  
+  private transient String bigTableAlias;
+  
+  private LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping;
 
   public MapJoinDesc() {
   }
@@ -141,4 +146,27 @@
   public void setValueTblDescs(List<TableDesc> valueTblDescs) {
     this.valueTblDescs = valueTblDescs;
   }
+
+  /**
+   * @return bigTableAlias
+   */
+  public String getBigTableAlias() {
+    return bigTableAlias;
+  }
+
+  /**
+   * @param bigTableAlias
+   */
+  public void setBigTableAlias(String bigTableAlias) {
+    this.bigTableAlias = bigTableAlias;
+  }
+
+  public LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> getAliasBucketFileNameMapping() {
+    return aliasBucketFileNameMapping;
+  }
+
+  public void setAliasBucketFileNameMapping(
+      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping) {
+    this.aliasBucketFileNameMapping = aliasBucketFileNameMapping;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java?rev=910755&r1=910754&r2=910755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java Tue Feb 16 23:09:07 2010
@@ -19,9 +19,12 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.BucketMatcher;
 
 /**
  * MapredLocalWork.
@@ -33,7 +36,9 @@
 
   private LinkedHashMap<String, Operator<? extends Serializable>> aliasToWork;
   private LinkedHashMap<String, FetchWork> aliasToFetchWork;
-
+  private boolean inputFileChangeSensitive;
+  private BucketMapJoinContext bucketMapjoinContext;
+  
   public MapredLocalWork() {
   }
 
@@ -70,4 +75,65 @@
       final LinkedHashMap<String, FetchWork> aliasToFetchWork) {
     this.aliasToFetchWork = aliasToFetchWork;
   }
+
+  public boolean getInputFileChangeSensitive() {
+    return inputFileChangeSensitive;
+  }
+
+  public void setInputFileChangeSensitive(boolean inputFileChangeSensitive) {
+    this.inputFileChangeSensitive = inputFileChangeSensitive;
+  }
+  
+  @Explain(displayName = "Bucket Mapjoin Context", normalExplain = false)
+  public BucketMapJoinContext getBucketMapjoinContext() {
+    return bucketMapjoinContext;
+  }
+
+  public void setBucketMapjoinContext(BucketMapJoinContext bucketMapjoinContext) {
+    this.bucketMapjoinContext = bucketMapjoinContext;
+  }
+  
+  public static class BucketMapJoinContext implements Serializable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    // used for bucket map join
+    private LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping;
+    private String mapJoinBigTableAlias;
+    private Class<? extends BucketMatcher> bucketMatcherClass;
+
+    public void setMapJoinBigTableAlias(String bigTableAlias) {
+      this.mapJoinBigTableAlias = bigTableAlias;
+    }
+
+    public String getMapJoinBigTableAlias() {
+      return mapJoinBigTableAlias;
+    }
+
+    public Class<? extends BucketMatcher> getBucketMatcherClass() {
+      return bucketMatcherClass;
+    }
+
+    public void setBucketMatcherClass(
+        Class<? extends BucketMatcher> bucketMatcherClass) {
+      this.bucketMatcherClass = bucketMatcherClass;
+    }
+
+    @Explain(displayName = "Alias Bucket File Name Mapping", normalExplain = false)
+    public LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> getAliasBucketFileNameMapping() {
+      return aliasBucketFileNameMapping;
+    }
+
+    public void setAliasBucketFileNameMapping(
+        LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping) {
+      this.aliasBucketFileNameMapping = aliasBucketFileNameMapping;
+    }
+    
+    public String toString() {
+      if (aliasBucketFileNameMapping != null)
+        return "Mapping:" + aliasBucketFileNameMapping.toString();
+      else
+        return "";
+    }
+  }
 }

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin1.q?rev=910755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin1.q Tue Feb 16 23:09:07 2010
@@ -0,0 +1,50 @@
+CREATE TABLE srcbucket_mapjoin(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin;
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin;
+
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+
+set hive.optimize.bucketmapjoin = true;
+create table bucketmapjoin_tmp_result (key string , value1 string, value2 string);
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part b 
+on a.key=b.key where b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part b 
+on a.key=b.key where b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part b 
+on a.key=b.key where b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part b 
+on a.key=b.key where b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+
+
+drop table bucketmapjoin_tmp_result;
+drop table srcbucket_mapjoin;
+drop table srcbucket_mapjoin_part;
+drop table srcbucket_mapjoin_part_2;
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q?rev=910755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin2.q Tue Feb 16 23:09:07 2010
@@ -0,0 +1,50 @@
+CREATE TABLE srcbucket_mapjoin(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin;
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin;
+
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+
+set hive.optimize.bucketmapjoin = true;
+create table bucketmapjoin_tmp_result (key string , value1 string, value2 string);
+
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin_part_2 b 
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+
+drop table bucketmapjoin_tmp_result;
+drop table srcbucket_mapjoin;
+drop table srcbucket_mapjoin_part;
+drop table srcbucket_mapjoin_part_2;
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin3.q?rev=910755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin3.q Tue Feb 16 23:09:07 2010
@@ -0,0 +1,50 @@
+CREATE TABLE srcbucket_mapjoin(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin;
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin;
+
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+
+set hive.optimize.bucketmapjoin = true;
+create table bucketmapjoin_tmp_result (key string , value1 string, value2 string);
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin_part_2 a join srcbucket_mapjoin_part b 
+on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin_part_2 a join srcbucket_mapjoin_part b 
+on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+
+
+explain extended 
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin_part_2 a join srcbucket_mapjoin_part b 
+on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin_part_2 a join srcbucket_mapjoin_part b 
+on a.key=b.key and b.ds="2008-04-08" and a.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+
+
+drop table bucketmapjoin_tmp_result;
+drop table srcbucket_mapjoin;
+drop table srcbucket_mapjoin_part;
+drop table srcbucket_mapjoin_part_2;
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin4.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin4.q?rev=910755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin4.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin4.q Tue Feb 16 23:09:07 2010
@@ -0,0 +1,50 @@
+CREATE TABLE srcbucket_mapjoin(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin;
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin;
+
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+
+set hive.optimize.bucketmapjoin = true;
+create table bucketmapjoin_tmp_result (key string , value1 string, value2 string);
+
+
+explain extended 
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin b 
+on a.key=b.key;
+
+insert overwrite table bucketmapjoin_tmp_result
+select /*+mapjoin(b)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin b 
+on a.key=b.key;
+
+select count(1) from bucketmapjoin_tmp_result;
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin b 
+on a.key=b.key;
+
+insert overwrite table bucketmapjoin_tmp_result 
+select /*+mapjoin(a)*/ a.key, a.value, b.value 
+from srcbucket_mapjoin a join srcbucket_mapjoin b 
+on a.key=b.key;
+
+select count(1) from bucketmapjoin_tmp_result;
+
+
+drop table bucketmapjoin_tmp_result;
+drop table srcbucket_mapjoin;
+drop table srcbucket_mapjoin_part;
+drop table srcbucket_mapjoin_part_2;
\ No newline at end of file



Mime
View raw message