hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1643976 [1/4] - in /hive/branches/spark: itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apach...
Date Tue, 09 Dec 2014 02:23:08 GMT
Author: xuefu
Date: Tue Dec  9 02:23:08 2014
New Revision: 1643976

URL: http://svn.apache.org/r1643976
Log:
HIVE-8638: Implement bucket map join optimization [Spark Branch] (Jimmy via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkBucketMapJoinContext.java
    hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark1.q
    hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark2.q
    hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark3.q
    hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark2.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark3.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_spark1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_spark2.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_spark3.q.out
Modified:
    hive/branches/spark/itests/src/test/resources/testconfiguration.properties
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_11.q.out

Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Tue Dec  9 02:23:08 2014
@@ -536,6 +536,9 @@ spark.query.files=add_part_multiple.q, \
   bucketmapjoin_negative3.q, \
   bucket_map_join_1.q, \
   bucket_map_join_2.q, \
+  bucket_map_join_spark1.q \
+  bucket_map_join_spark2.q \
+  bucket_map_join_spark3.q \
   bucket_map_join_tez1.q, \
   bucket_map_join_tez2.q, \
   column_access_stats.q, \

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Tue Dec  9 02:23:08 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Set;
 
 import org.apache.commons.io.FileExistsException;
 import org.apache.commons.logging.Log;
@@ -33,6 +34,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinPersistableTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -86,12 +90,22 @@ public class SparkHashTableSinkOperator
 
   protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
       byte tag) throws IOException, HiveException {
+    MapredLocalWork localWork = getExecContext().getLocalWork();
+    BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
+    Path inputPath = getExecContext().getCurrentInputPath();
+    String bigInputPath = null;
+    if (inputPath != null && mapJoinCtx != null) {
+      Set<String> aliases =
+        ((SparkBucketMapJoinContext)mapJoinCtx).getPosToAliasMap().get((int)tag);
+      bigInputPath = mapJoinCtx.getMappingBigFile(
+        aliases.iterator().next(), inputPath.toString());
+    }
+
     // get tmp file URI
-    Path tmpURI = getExecContext().getLocalWork().getTmpHDFSPath();
+    Path tmpURI = localWork.getTmpHDFSPath();
     LOG.info("Temp URI for side table: " + tmpURI);
-    // get current input file name
-    String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
-    String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
+    // get current bucket file name
+    String fileName = localWork.getBucketFileName(bigInputPath);
     // get the tmp URI path; it will be a hdfs path if not local mode
     String dumpFilePrefix = conf.getDumpFilePrefix();
     Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Tue Dec  9 02:23:08 2014
@@ -116,9 +116,9 @@ public class MapJoinTableContainerSerDe
 
       MapJoinPersistableTableContainer tableContainer = null;
 
-      for (FileStatus fileStatus: fs.listStatus(folder)) {
+      for (FileStatus fileStatus: fileStatuses) {
         Path filePath = fileStatus.getPath();
-        if (fileStatus.isDir()) {
+        if (fileStatus.isDirectory()) {
           throw new HiveException("Error, not a file: " + filePath);
         }
         InputStream is = null;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Tue Dec  9 02:23:08 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.s
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,9 +37,11 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
@@ -87,11 +90,22 @@ public class HashTableLoader implements
         return;
       }
       FileSystem fs = FileSystem.get(baseDir.toUri(), hconf);
-      String fileName = localWork.getBucketFileName(currentInputPath);
+      BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
       for (int pos = 0; pos < mapJoinTables.length; pos++) {
         if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) {
           continue;
         }
+        String bigInputPath = currentInputPath;
+        if (currentInputPath != null && mapJoinCtx != null) {
+          Set<String> aliases =
+            ((SparkBucketMapJoinContext)mapJoinCtx).getPosToAliasMap().get(pos);
+          String alias = aliases.iterator().next();
+          // Any one small table input path
+          String smallInputPath =
+            mapJoinCtx.getAliasBucketFileNameMapping().get(alias).get(bigInputPath).get(0);
+          bigInputPath = mapJoinCtx.getMappingBigFile(alias, smallInputPath);
+        }
+        String fileName = localWork.getBucketFileName(bigInputPath);
         Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName);
         LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
         mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java Tue Dec  9 02:23:08 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,7 +28,9 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 
 public class BucketMapjoinProc extends AbstractBucketJoinProc implements NodeProcessor {
   public BucketMapjoinProc(ParseContext pGraphContext) {
@@ -34,7 +38,6 @@ public class BucketMapjoinProc extends A
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
       Object... nodeOutputs) throws SemanticException {
     BucketJoinProcCtx context = (BucketJoinProcCtx) procCtx;
@@ -58,4 +61,20 @@ public class BucketMapjoinProc extends A
 
     return null;
   }
+
+  /**
+   * Check if a mapjoin can be converted to a bucket mapjoin,
+   * and do the version if possible.
+   */
+  public static void checkAndConvertBucketMapJoin(ParseContext pGraphContext,
+      MapJoinOperator mapJoinOp, QBJoinTree joinCtx, String baseBigAlias,
+      List<String> joinAliases) throws SemanticException {
+    BucketJoinProcCtx ctx = new BucketJoinProcCtx(pGraphContext.getConf());
+    BucketMapjoinProc proc = new BucketMapjoinProc(pGraphContext);
+    Map<Byte, List<ExprNodeDesc>> keysMap = mapJoinOp.getConf().getKeys();
+    if (proc.checkConvertBucketMapJoin(pGraphContext, ctx,
+        joinCtx, keysMap, baseBigAlias, joinAliases)) {
+      proc.convertMapJoinToBucketMapJoin(mapJoinOp, ctx);
+    }
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Tue Dec  9 02:23:08 2014
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,9 +43,11 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
 public class SparkMapJoinResolver implements PhysicalPlanResolver {
@@ -66,6 +67,10 @@ public class SparkMapJoinResolver implem
   // Check whether the specified BaseWork's operator tree contains a operator
   // of the specified operator class
   private boolean containsOp(BaseWork work, Class<?> clazz) {
+    return getOp(work, clazz) != null;
+  }
+
+  private Operator<? extends OperatorDesc> getOp(BaseWork work, Class<?> clazz) {
     Set<Operator<? extends OperatorDesc>> ops = new HashSet<Operator<? extends OperatorDesc>>();
     if (work instanceof MapWork) {
       Collection<Operator<?>> opSet = ((MapWork) work).getAliasToWork().values();
@@ -85,12 +90,13 @@ public class SparkMapJoinResolver implem
 
     for (Operator<? extends OperatorDesc> op : ops) {
       if (clazz.isInstance(op)) {
-        return true;
+        return op;
       }
     }
-    return false;
+    return null;
   }
 
+  @SuppressWarnings("unchecked")
   class SparkMapJoinTaskDispatcher implements Dispatcher {
 
     private final PhysicalContext physicalContext;
@@ -168,16 +174,43 @@ public class SparkMapJoinResolver implem
       Context ctx = physicalContext.getContext();
 
       for (BaseWork work : allBaseWorks) {
-        if (containsOp(work, MapJoinOperator.class)) {
+        Operator<? extends OperatorDesc> op = getOp(work, MapJoinOperator.class);
+        if (op != null) {
+          MapJoinOperator mapJoinOp = (MapJoinOperator) op;
           Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId());
           MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork();
           List<Operator<? extends OperatorDesc>> dummyOps =
               new ArrayList<Operator<? extends OperatorDesc>>(work.getDummyOps());
           bigTableLocalWork.setDummyParentOp(dummyOps);
 
+          SparkBucketMapJoinContext bucketMJCxt = null;
+          MapJoinDesc mapJoinDesc = mapJoinOp.getConf();
+          if (mapJoinDesc.isBucketMapJoin()) {
+            bucketMJCxt = new SparkBucketMapJoinContext();
+            bigTableLocalWork.setBucketMapjoinContext(bucketMJCxt);
+            bucketMJCxt.setAliasBucketFileNameMapping(
+              mapJoinDesc.getAliasBucketFileNameMapping());
+            bucketMJCxt.setBucketFileNameMapping(
+              mapJoinDesc.getBigTableBucketNumMapping());
+            bucketMJCxt.setMapJoinBigTableAlias(mapJoinDesc.getBigTableAlias());
+            bucketMJCxt.setBucketMatcherClass(
+              org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
+            bucketMJCxt.setBigTablePartSpecToFileMapping(
+              mapJoinDesc.getBigTablePartSpecToFileMapping());
+            bucketMJCxt.setPosToAliasMap(mapJoinOp.getPosToAliasMap());
+            ((MapWork) work).setUseBucketizedHiveInputFormat(true);
+            bigTableLocalWork.setInputFileChangeSensitive(true);
+          }
+
           for (BaseWork parentWork : originalWork.getParents(work)) {
             if (containsOp(parentWork,SparkHashTableSinkOperator.class)) {
-              parentWork.getMapRedLocalWork().setTmpHDFSPath(tmpPath);
+              MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork();
+              parentLocalWork.setTmpHDFSPath(tmpPath);
+              if (bucketMJCxt != null) {
+                ((MapWork) parentWork).setUseBucketizedHiveInputFormat(true);
+                parentLocalWork.setBucketMapjoinContext(bucketMJCxt);
+                parentLocalWork.setInputFileChangeSensitive(true);
+              }
             }
           }
 
@@ -257,4 +290,4 @@ public class SparkMapJoinResolver implem
       return null;
     }
   }
-}
\ No newline at end of file
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Tue Dec  9 02:23:08 2014
@@ -18,16 +18,15 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -38,11 +37,13 @@ import org.apache.hadoop.hive.ql.exec.Un
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.BucketMapjoinProc;
 import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
-import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
@@ -58,7 +59,6 @@ public class SparkMapJoinOptimizer imple
 
   private static final Log LOG = LogFactory.getLog(SparkMapJoinOptimizer.class.getName());
 
-  @SuppressWarnings("unchecked")
   @Override
   /*
    * (non-Javadoc) we should ideally not modify the tree we traverse. However,
@@ -71,10 +71,8 @@ public class SparkMapJoinOptimizer imple
 
     OptimizeSparkProcContext context = (OptimizeSparkProcContext) procCtx;
     HiveConf conf = context.getConf();
-    ParseContext parseContext = context.getParseContext();
     JoinOperator joinOp = (JoinOperator) nd;
 
-
     if (!conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
       // && !(conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
       // we are just converting to a common merge join operator. The shuffle
@@ -84,16 +82,12 @@ public class SparkMapJoinOptimizer imple
       return null;
     }
 
-    // if we have traits, and table info is present in the traits, we know the
-    // exact number of buckets. Else choose the largest number of estimated
-    // reducers from the parent operators.
-    //TODO  enable later. disabling this check for now
-    int numBuckets = 1;
+    LOG.info("Check if it can be converted to map join");
+    long[] mapJoinInfo = getMapJoinConversionInfo(joinOp, context);
+    int mapJoinConversionPos = (int)mapJoinInfo[0];
 
-    LOG.info("Estimated number of buckets " + numBuckets);
-
-    /* TODO: handle this later
     if (mapJoinConversionPos < 0) {
+      /* TODO: handle this later
       // we cannot convert to bucket map join, we cannot convert to
       // map join either based on the size. Check if we can convert to SMB join.
       if (conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
@@ -139,43 +133,32 @@ public class SparkMapJoinOptimizer imple
         // join in map-reduce case.
         int pos = 0; // it doesn't matter which position we use in this case.
         convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-      }
+      }  */
       return null;
     }
 
-    if (numBuckets > 1) {
-      if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
-        if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
-          return null;
-        }
-      }
-    }*/
+    int numBuckets = -1;
+    List<List<String>> bucketColNames = null;
 
     LOG.info("Convert to non-bucketed map join");
-    // check if we can convert to map join no bucket scaling.
-    ObjectPair<Integer, Long> mapJoinInfo = getMapJoinConversionInfo(joinOp, context, 1);
-    int mapJoinConversionPos = mapJoinInfo.getFirst();
-
-    if (mapJoinConversionPos < 0) {
-      // we are just converting to a common merge join operator. The shuffle
-      // join in map-reduce case.
-      /*
-      int pos = 0; // it doesn't matter which position we use in this case.
-      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-      */
-      return null;
+    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
+      LOG.info("Check if it can be converted to bucketed map join");
+      numBuckets = convertJoinBucketMapJoin(joinOp, mapJoinOp,
+        context, mapJoinConversionPos);
+      if (numBuckets > 1) {
+        bucketColNames = joinOp.getOpTraits().getBucketColNames();
+        mapJoinInfo[2] /= numBuckets;
+      }
     }
 
-    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
-    // map join operator by default has no bucket cols
-    mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+    // we can set the traits for this join operator
+    OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null);
+    mapJoinOp.setOpTraits(opTraits);
     mapJoinOp.setStatistics(joinOp.getStatistics());
-    // propagate this change till the next RS
-    for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
-      setAllChildrenTraitsToNull(childOp);
-    }
+    setNumberOfBucketsOnChildren(mapJoinOp);
 
-    context.getMjOpSizes().put(mapJoinOp, mapJoinInfo.getSecond());
+    context.getMjOpSizes().put(mapJoinOp, mapJoinInfo[1] + mapJoinInfo[2]);
 
     return null;
   }
@@ -278,30 +261,47 @@ public class SparkMapJoinOptimizer imple
   }
   */
 
-  private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
-    if (currentOp instanceof ReduceSinkOperator) {
-      return;
-    }
-    currentOp.setOpTraits(new OpTraits(null, -1, null));
-    for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
-      if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
-        break;
-      }
-      setAllChildrenTraitsToNull(childOp);
-    }
-  }
-
-
   private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
     int numBuckets = currentOp.getOpTraits().getNumBuckets();
     for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
       if (!(op instanceof ReduceSinkOperator) && !(op instanceof GroupByOperator)) {
         op.getOpTraits().setNumBuckets(numBuckets);
+        if (numBuckets < 0) {
+          op.getOpTraits().setBucketColNames(null);
+        }
         setNumberOfBucketsOnChildren(op);
       }
     }
   }
 
+  private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoinOp,
+      OptimizeSparkProcContext context, int bigTablePosition) throws SemanticException {
+    ParseContext parseContext = context.getParseContext();
+    QBJoinTree joinTree = parseContext.getJoinContext().get(joinOp);
+    List<String> joinAliases = new ArrayList<String>();
+    String baseBigAlias = null;
+    Map<Integer, Set<String>> posToAliasMap = joinOp.getPosToAliasMap();
+    for (Map.Entry<Integer, Set<String>> entry: posToAliasMap.entrySet()) {
+      if (entry.getKey().intValue() == bigTablePosition) {
+        baseBigAlias = entry.getValue().iterator().next();
+      }
+      for (String alias: entry.getValue()) {
+        if (!joinAliases.contains(alias)) {
+          joinAliases.add(alias);
+        }
+      }
+    }
+    BucketMapjoinProc.checkAndConvertBucketMapJoin(
+      parseContext, mapJoinOp, joinTree, baseBigAlias, joinAliases);
+    int numBuckets = -1;
+    MapJoinDesc joinDesc = mapJoinOp.getConf();
+    if (joinDesc.isBucketMapJoin()) {
+      numBuckets = joinDesc.getBigTableBucketNumMapping().size();
+      mapJoinOp.setPosToAliasMap(joinOp.getPosToAliasMap());
+    }
+    return numBuckets;
+  }
+
   /**
    *   This method returns the big table position in a map-join. If the given join
    *   cannot be converted to a map-join (This could happen for several reasons - one
@@ -312,11 +312,11 @@ public class SparkMapJoinOptimizer imple
    *
    * @param joinOp
    * @param context
-   * @param buckets
-   * @return pair, first value is the position, second value is the in-memory size of this mapjoin.
+   * @return an array of 3 long values, first value is the position,
+   *   second value is the connected map join size, and the third is big table data size.
    */
-  private ObjectPair<Integer, Long> getMapJoinConversionInfo(JoinOperator joinOp, OptimizeSparkProcContext context,
-                                                                int buckets) {
+  private long[] getMapJoinConversionInfo(
+      JoinOperator joinOp, OptimizeSparkProcContext context) {
     Set<Integer> bigTableCandidateSet =
         MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
 
@@ -338,7 +338,7 @@ public class SparkMapJoinOptimizer imple
       Statistics currInputStat = parentOp.getStatistics();
       if (currInputStat == null) {
         LOG.warn("Couldn't get statistics from: "+parentOp);
-        return new ObjectPair(-1, 0);
+        return new long[]{-1, 0, 0};
       }
 
       // Union is hard to handle. For instance, the following case:
@@ -361,7 +361,7 @@ public class SparkMapJoinOptimizer imple
       // But, this is tricky to implement, and we'll leave it as a future work for now.
       // TODO: handle this as a MJ case
       if (containUnionWithoutRS(parentOp.getParentOperators().get(0))) {
-        return new ObjectPair(-1, 0);
+        return new long[]{-1, 0, 0};
       }
 
       long inputSize = currInputStat.getDataSize();
@@ -372,14 +372,14 @@ public class SparkMapJoinOptimizer imple
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
           // on size and there's another one that's bigger.
-          return new ObjectPair(-1, 0);
+          return new long[]{-1, 0, 0};
         }
 
-        if (inputSize/buckets > maxSize) {
+        if (inputSize > maxSize) {
           if (!bigTableCandidateSet.contains(pos)) {
             // can't use the current table as the big table, but it's too
             // big for the map side.
-            return new ObjectPair(-1, 0);
+            return new long[]{-1, 0, 0};
           }
 
           bigTableFound = true;
@@ -391,10 +391,10 @@ public class SparkMapJoinOptimizer imple
           totalSize += bigInputStat.getDataSize();
         }
 
-        if (totalSize/buckets > maxSize) {
+        if (totalSize > maxSize) {
           // sum of small tables size in this join exceeds configured limit
           // hence cannot convert.
-          return new ObjectPair(-1, 0);
+          return new long[]{-1, 0, 0};
         }
 
         if (bigTableCandidateSet.contains(pos)) {
@@ -403,9 +403,9 @@ public class SparkMapJoinOptimizer imple
         }
       } else {
         totalSize += currInputStat.getDataSize();
-        if (totalSize/buckets > maxSize) {
+        if (totalSize > maxSize) {
           // cannot hold all map tables in memory. Cannot convert.
-          return new ObjectPair(-1, 0);
+          return new long[]{-1, 0, 0};
         }
       }
       pos++;
@@ -413,17 +413,17 @@ public class SparkMapJoinOptimizer imple
 
     if (bigTablePosition == -1) {
       //No big table candidates.
-      return new ObjectPair(-1, 0);
+      return new long[]{-1, 0, 0};
     }
 
     //Final check, find size of already-calculated Mapjoin Operators in same work (spark-stage).  We need to factor
     //this in to prevent overwhelming Spark executor-memory.
     long connectedMapJoinSize = getConnectedMapJoinSize(joinOp.getParentOperators().get(bigTablePosition), joinOp, context);
-    if ((connectedMapJoinSize + (totalSize / buckets)) > maxSize) {
-      return new ObjectPair(-1, 0);
+    if ((connectedMapJoinSize + totalSize) > maxSize) {
+      return new long[]{-1, 0, 0};
     }
 
-    return new ObjectPair(bigTablePosition, connectedMapJoinSize + (totalSize / buckets));
+    return new long[]{bigTablePosition, connectedMapJoinSize, totalSize};
   }
 
   /**
@@ -433,6 +433,7 @@ public class SparkMapJoinOptimizer imple
    * @param ctx context to pass information.
    * @return total size of parent mapjoins in same work as this operator.
    */
+  @SuppressWarnings({"rawtypes", "unchecked"})
   private long getConnectedMapJoinSize(Operator<? extends OperatorDesc> parentOp, Operator joinOp, OptimizeSparkProcContext ctx) {
     long result = 0;
     for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java?rev=1643976&r1=1643975&r2=1643976&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java Tue Dec  9 02:23:08 2014
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -176,6 +177,33 @@ public class BucketMapJoinContext implem
     this.bigTablePartSpecToFileMapping = bigTablePartSpecToFileMapping;
   }
 
+  /**
+   * Given a small table input file, find the mapping
+   * big table input file with the smallest bucket number.
+   */
+  public String getMappingBigFile(String alias, String smallFile) {
+    HashSet<String> bigFiles = new HashSet<String>();
+    Map<String, List<String>> mapping = aliasBucketFileNameMapping.get(alias);
+    for (Map.Entry<String, List<String>> entry: mapping.entrySet()) {
+      if (entry.getValue().contains(smallFile)) {
+        bigFiles.add(entry.getKey());
+      }
+    }
+    // There could be several big table input files
+    // mapping to the same small input file.
+    // Find that one with the lowest bucket id.
+    int bucketId = Integer.MAX_VALUE;
+    String bigFile = null;
+    for (String f: bigFiles) {
+      int id = bucketFileNameMapping.get(f);
+      if (id < bucketId) {
+        bucketId = id;
+        bigFile = f;
+      }
+    }
+    return bigFile;
+  }
+
   // returns fileId for SMBJoin, which consists part of result file name
   // needed to avoid file name conflict when big table is partitioned
   public String createFileId(String inputPath) {

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkBucketMapJoinContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkBucketMapJoinContext.java?rev=1643976&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkBucketMapJoinContext.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkBucketMapJoinContext.java Tue Dec  9 02:23:08 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.Map;
+import java.util.Set;
+
+public class SparkBucketMapJoinContext extends BucketMapJoinContext {
+  private static final long serialVersionUID = 1L;
+
+  private Map<Integer, Set<String>> posToAliasMap;
+
+  public void setPosToAliasMap(Map<Integer, Set<String>> posToAliasMap) {
+    this.posToAliasMap = posToAliasMap;
+  }
+
+  public Map<Integer, Set<String>> getPosToAliasMap() {
+    return posToAliasMap;
+  }
+}

Added: hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark1.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark1.q?rev=1643976&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark1.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark1.q Tue Dec  9 02:23:08 2014
@@ -0,0 +1,53 @@
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+create table bucketmapjoin_hash_result_1 (key bigint , value1 bigint, value2 bigint);
+create table bucketmapjoin_hash_result_2 (key bigint , value1 bigint, value2 bigint);
+
+set hive.auto.convert.join = true;
+
+set hive.optimize.bucketmapjoin = true;
+
+create table bucketmapjoin_tmp_result (key string , value1 string, value2 string);
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+
+set hive.optimize.bucketmapjoin = false;
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+

Added: hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark2.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark2.q?rev=1643976&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark2.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark2.q Tue Dec  9 02:23:08 2014
@@ -0,0 +1,51 @@
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+create table bucketmapjoin_hash_result_1 (key bigint , value1 bigint, value2 bigint);
+create table bucketmapjoin_hash_result_2 (key bigint , value1 bigint, value2 bigint);
+
+set hive.auto.convert.join = true;
+
+set hive.optimize.bucketmapjoin = true;
+
+create table bucketmapjoin_tmp_result (key string , value1 string, value2 string);
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+
+set hive.optimize.bucketmapjoin = true;
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+

Added: hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark3.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark3.q?rev=1643976&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark3.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/bucket_map_join_spark3.q Tue Dec  9 02:23:08 2014
@@ -0,0 +1,51 @@
+CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08');
+
+CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08');
+
+create table bucketmapjoin_hash_result_1 (key bigint , value1 bigint, value2 bigint);
+create table bucketmapjoin_hash_result_2 (key bigint , value1 bigint, value2 bigint);
+
+set hive.auto.convert.join = true;
+
+set hive.optimize.bucketmapjoin = true;
+
+create table bucketmapjoin_tmp_result (key string , value1 string, value2 string);
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+
+set hive.optimize.bucketmapjoin = false;
+
+explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08";
+
+select count(1) from bucketmapjoin_tmp_result;
+insert overwrite table bucketmapjoin_hash_result_1
+select sum(hash(key)), sum(hash(value1)), sum(hash(value2)) from bucketmapjoin_tmp_result;
+

Added: hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark1.q.out?rev=1643976&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark1.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/bucket_map_join_spark1.q.out Tue Dec  9 02:23:08 2014
@@ -0,0 +1,854 @@
+PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcbucket_mapjoin_part
+POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcbucket_mapjoin_part
+PREHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part
+POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+PREHOOK: query: load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+PREHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+PREHOOK: query: load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part@ds=2008-04-08
+PREHOOK: query: CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcbucket_mapjoin_part_2
+POSTHOOK: query: CREATE TABLE srcbucket_mapjoin_part_2 (key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2
+PREHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part_2
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket20.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+PREHOOK: query: load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket21.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+PREHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket22.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+PREHOOK: query: load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+POSTHOOK: query: load data local inpath '../../data/files/srcbucket23.txt' INTO TABLE srcbucket_mapjoin_part_2 partition(ds='2008-04-08')
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@srcbucket_mapjoin_part_2@ds=2008-04-08
+PREHOOK: query: create table bucketmapjoin_hash_result_1 (key bigint , value1 bigint, value2 bigint)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@bucketmapjoin_hash_result_1
+POSTHOOK: query: create table bucketmapjoin_hash_result_1 (key bigint , value1 bigint, value2 bigint)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@bucketmapjoin_hash_result_1
+PREHOOK: query: create table bucketmapjoin_hash_result_2 (key bigint , value1 bigint, value2 bigint)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@bucketmapjoin_hash_result_2
+POSTHOOK: query: create table bucketmapjoin_hash_result_2 (key bigint , value1 bigint, value2 bigint)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@bucketmapjoin_hash_result_2
+PREHOOK: query: create table bucketmapjoin_tmp_result (key string , value1 string, value2 string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@bucketmapjoin_tmp_result
+POSTHOOK: query: create table bucketmapjoin_tmp_result (key string , value1 string, value2 string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@bucketmapjoin_tmp_result
+PREHOOK: query: explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08"
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended
+insert overwrite table bucketmapjoin_tmp_result
+select a.key, a.value, b.value
+from srcbucket_mapjoin_part a join srcbucket_mapjoin_part_2 b
+on a.key=b.key and b.ds="2008-04-08"
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  
+TOK_QUERY
+   TOK_FROM
+      TOK_JOIN
+         TOK_TABREF
+            TOK_TABNAME
+               srcbucket_mapjoin_part
+            a
+         TOK_TABREF
+            TOK_TABNAME
+               srcbucket_mapjoin_part_2
+            b
+         and
+            =
+               .
+                  TOK_TABLE_OR_COL
+                     a
+                  key
+               .
+                  TOK_TABLE_OR_COL
+                     b
+                  key
+            =
+               .
+                  TOK_TABLE_OR_COL
+                     b
+                  ds
+               "2008-04-08"
+   TOK_INSERT
+      TOK_DESTINATION
+         TOK_TAB
+            TOK_TABNAME
+               bucketmapjoin_tmp_result
+      TOK_SELECT
+         TOK_SELEXPR
+            .
+               TOK_TABLE_OR_COL
+                  a
+               key
+         TOK_SELEXPR
+            .
+               TOK_TABLE_OR_COL
+                  a
+               value
+         TOK_SELEXPR
+            .
+               TOK_TABLE_OR_COL
+                  b
+               value
+
+
+STAGE DEPENDENCIES:
+  Stage-5 is a root stage
+  Stage-4 depends on stages: Stage-5
+  Stage-0 depends on stages: Stage-4
+  Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-5
+    Map Reduce Local Work
+      Alias -> Map Local Tables:
+        a 
+          Fetch Operator
+            limit: -1
+            Partition Description:
+                Partition
+                  base file name: ds=2008-04-08
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  partition values:
+                    ds 2008-04-08
+                  properties:
+                    COLUMN_STATS_ACCURATE true
+                    bucket_count 4
+                    bucket_field_name key
+                    columns key,value
+                    columns.comments 
Mime
View raw message