hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r1475661 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/s...
Date Thu, 25 Apr 2013 07:58:16 GMT
Author: namit
Date: Thu Apr 25 07:58:16 2013
New Revision: 1475661

URL: http://svn.apache.org/r1475661
Log:
HIVE-3952 merge map-job followed by map-reduce job
(Vinod Kumar Vavilapalli via namit)


Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q
    hive/trunk/ql/src/test/results/clientpositive/multiMapJoin1.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Apr 25 07:58:16
2013
@@ -494,6 +494,7 @@ public class HiveConf extends Configurat
     HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),
     HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD("hive.auto.convert.join.noconditionaltask.size",
         10000000L),
+    HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR("hive.optimize.mapjoin.mapreduce", false),
     HIVESKEWJOINKEY("hive.skewjoin.key", 100000),
     HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
     HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Thu Apr 25 07:58:16 2013
@@ -842,6 +842,16 @@
 </property>
 
 <property>
+  <name>hive.optimize.mapjoin.mapreduce</name>
+  <value>false</value>
+  <description>If hive.auto.convert.join is off, this parameter does not take
+    affect. If it is on, and if there are map-join jobs followed by a map-reduce
+    job (for e.g a group by), each map-only job is merged with the following
+    map-reduce job.
+  </description>
+</property>
+
+<property>
   <name>hive.script.auto.progress</name>
   <value>false</value>
   <description>Whether Hive Tranform/Map/Reduce Clause should automatically send progress
information to TaskTracker to avoid the task getting killed because of inactivity.  Hive sends
progress information when the script is outputting to stderr.  This option removes the need
of periodically producing stderr messages, but users should be cautious because this may prevent
infinite loops in the scripts to be killed by TaskTracker.  </description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Apr
25 07:58:16 2013
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -761,6 +763,41 @@ public final class GenMapRedUtils {
   }
 
   /**
+   * Set the key and value description for all the tasks rooted at the given
+   * task. Loops over all the tasks recursively.
+   *
+   * @param task
+   */
+  public static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task)
{
+
+    if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+          .getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        setKeyAndValueDescForTaskTree(tsk);
+      }
+    } else if (task instanceof ExecDriver) {
+      MapredWork work = (MapredWork) task.getWork();
+      work.deriveExplainAttributes();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = work
+          .getAliasToWork();
+      if (opMap != null && !opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          setKeyAndValueDesc(work, op);
+        }
+      }
+    }
+
+    if (task.getChildTasks() == null) {
+      return;
+    }
+
+    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+      setKeyAndValueDescForTaskTree(childTask);
+    }
+  }
+
+  /**
    * create a new plan and return.
    *
    * @return the new plan

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Thu Apr
25 07:58:16 2013
@@ -113,9 +113,11 @@ public class MapJoinProcessor implements
   }
 
   /**
-   * Generate the MapRed Local Work
+   * Generate the MapRed Local Work for the given map-join operator
+   *
    * @param newWork
    * @param mapJoinOp
+   *          map-join operator for which local work needs to be generated.
    * @param bigTablePos
    * @return
    * @throws SemanticException
@@ -225,6 +227,16 @@ public class MapJoinProcessor implements
     return bigTableAlias;
   }
 
+  /**
+   * Convert the join to a map-join and also generate any local work needed.
+   *
+   * @param newWork MapredWork in which the conversion is to happen
+   * @param op
+   *          The join operator that needs to be converted to map-join
+   * @param bigTablePos
+   * @return the alias to the big table
+   * @throws SemanticException
+   */
   public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int
mapJoinPos)
       throws SemanticException {
     LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap
=

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
Thu Apr 25 07:58:16 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
@@ -239,6 +240,35 @@ public class CommonJoinTaskDispatcher ex
         oldChildTask.getParentTasks().add(task);
       }
     }
+
+    boolean convertToSingleJob = HiveConf.getBoolVar(conf,
+        HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
+    if (convertToSingleJob) {
+      copyReducerConf(task, childTask);
+    }
+  }
+
+  /**
+   * Copy reducer configuration if the childTask also has a reducer.
+   *
+   * @param task
+   * @param childTask
+   */
+  private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
+    MapredWork childWork = childTask.getWork();
+    Operator childReducer = childWork.getReducer();
+    MapredWork work = task.getWork();
+    if (childReducer == null) {
+      return;
+    }
+    work.setReducer(childReducer);
+    work.setNumReduceTasks(childWork.getNumReduceTasks());
+    work.setJoinTree(childWork.getJoinTree());
+    work.setNeedsTagging(childWork.getNeedsTagging());
+
+    // Make sure the key configuration is correct, clear and regenerate.
+    work.getTagToValueDesc().clear();
+    GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
   }
 
   // create map join task and set big table as bigTablePosition
@@ -255,6 +285,125 @@ public class CommonJoinTaskDispatcher ex
     return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
   }
 
+  /*
+   * A task and its child task has been converted from join to mapjoin.
+   * See if the two tasks can be merged.
+   */
+  private void mergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Configuration conf)
{
+    if (mapJoinTask.getChildTasks() == null
+        || mapJoinTask.getChildTasks().size() > 1) {
+      // No child-task to merge, nothing to do or there are more than one
+      // child-tasks in which case we don't want to do anything.
+      return;
+    }
+    Task<? extends Serializable> firstChildTask = mapJoinTask.getChildTasks().get(0);
+    if (!(firstChildTask instanceof MapRedTask)) {
+      // Nothing to do if it is not a mapreduce task.
+      return;
+    }
+    MapRedTask childTask = (MapRedTask) firstChildTask;
+    MapredWork mapJoinWork = mapJoinTask.getWork();
+    MapredWork childWork = childTask.getWork();
+    Operator childReducer = childWork.getReducer();
+    if (childReducer == null) {
+      // Not a MR job, nothing to merge.
+      return;
+    }
+
+    // Can this be merged
+    Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapJoinWork.getAliasToWork();
+    if (aliasToWork.size() > 1) {
+      return;
+    }
+    Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
+    if (childPathToAliases.size() > 1) {
+      return;
+    }
+
+    // Locate leaf operator of the map-join task. Start by initializing leaf
+    // operator to be root operator.
+    Operator<? extends OperatorDesc> mapJoinLeafOperator = aliasToWork.values().iterator().next();
+    while (mapJoinLeafOperator.getChildOperators() != null) {
+      // Dont perform this optimization for multi-table inserts
+      if (mapJoinLeafOperator.getChildOperators().size() > 1) {
+        return;
+      }
+      mapJoinLeafOperator = mapJoinLeafOperator.getChildOperators().get(0);
+    }
+
+    assert (mapJoinLeafOperator instanceof FileSinkOperator);
+    if (!(mapJoinLeafOperator instanceof FileSinkOperator)) {
+      // Sanity check, shouldn't happen.
+      return;
+    }
+
+    FileSinkOperator mapJoinTaskFileSinkOperator = (FileSinkOperator) mapJoinLeafOperator;
+
+    // The filesink writes to a different directory
+    String workDir = mapJoinTaskFileSinkOperator.getConf().getDirName();
+    if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
+      return;
+    }
+
+    MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
+    MapredLocalWork childLocalWork = childWork.getMapLocalWork();
+
+    // Either of them should not be bucketed
+    if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() !=
null) ||
+        (childLocalWork != null && childLocalWork.getBucketMapjoinContext() != null))
{
+      return;
+    }
+
+    if (childWork.getAliasToWork().size() > 1) {
+      return;
+    }
+
+    Operator<? extends Serializable> childAliasOp =
+        childWork.getAliasToWork().values().iterator().next();
+    if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
+      return;
+    }
+
+    // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
+    // top of the second
+    Operator<? extends Serializable> parentFOp = mapJoinTaskFileSinkOperator
+        .getParentOperators().get(0);
+    parentFOp.getChildOperators().remove(mapJoinTaskFileSinkOperator);
+    parentFOp.getChildOperators().add(childAliasOp);
+    List<Operator<? extends OperatorDesc>> parentOps =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    parentOps.add(parentFOp);
+    childAliasOp.setParentOperators(parentOps);
+
+    mapJoinWork.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
+    for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
+        .entrySet()) {
+      if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
+        mapJoinWork.getPathToPartitionInfo()
+            .put(childWorkEntry.getKey(), childWorkEntry.getValue());
+      }
+    }
+
+    // Fill up stuff in local work
+    if (mapJoinLocalWork != null && childLocalWork != null) {
+      mapJoinLocalWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
+      mapJoinLocalWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
+    }
+
+    // remove the child task
+    List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
+    mapJoinTask.setChildTasks(oldChildTasks);
+    if (oldChildTasks != null) {
+      for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
+        oldChildTask.getParentTasks().remove(childTask);
+        oldChildTask.getParentTasks().add(mapJoinTask);
+      }
+    }
+
+    // Copy the reducer conf.
+    copyReducerConf(mapJoinTask, childTask);
+  }
+
   @Override
   public Task<? extends Serializable> processCurrentTask(MapRedTask currTask,
       ConditionalTask conditionalTask, Context context)
@@ -365,11 +514,21 @@ public class CommonJoinTaskDispatcher ex
 
         // Can this task be merged with the child task. This can happen if a big table is
being
         // joined with multiple small tables on different keys
-        // Further optimizations are possible here, a join which has been converted to a
mapjoin
-        // followed by a mapjoin can be performed in a single MR job.
-        if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size()
== 1)
-            && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP))
{
-          mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
+        if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size()
== 1)) {
+          if (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)
{
+            // Merging two map-join tasks
+            mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
+          }
+
+          // Converted the join operator into a map-join. Now see if it can
+          // be merged into the following map-reduce job.
+          boolean convertToSingleJob = HiveConf.getBoolVar(conf,
+              HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
+          if (convertToSingleJob) {
+            // Try merging a map-join task with a mapreduce job to have a
+            // single job.
+            mergeMapJoinTaskWithMapReduceTask(newTask, conf);
+          }
         }
 
         return newTask;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Apr 25
07:58:16 2013
@@ -8373,7 +8373,7 @@ public class SemanticAnalyzer extends Ba
 
     // For each task, set the key descriptor for the reducer
     for (Task<? extends Serializable> rootTask : rootTasks) {
-      setKeyDescTaskTree(rootTask);
+      GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask);
     }
 
     // If a task contains an operator which instructs bucketizedhiveinputformat
@@ -8599,36 +8599,6 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
-  // loop over all the tasks recursviely
-  private void setKeyDescTaskTree(Task<? extends Serializable> task) {
-
-    if (task instanceof ExecDriver) {
-      MapredWork work = (MapredWork) task.getWork();
-      work.deriveExplainAttributes();
-      HashMap<String, Operator<? extends OperatorDesc>> opMap = work
-          .getAliasToWork();
-      if (!opMap.isEmpty()) {
-        for (Operator<? extends OperatorDesc> op : opMap.values()) {
-          GenMapRedUtils.setKeyAndValueDesc(work, op);
-        }
-      }
-    } else if (task instanceof ConditionalTask) {
-      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
-          .getListTasks();
-      for (Task<? extends Serializable> tsk : listTasks) {
-        setKeyDescTaskTree(tsk);
-      }
-    }
-
-    if (task.getChildTasks() == null) {
-      return;
-    }
-
-    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-      setKeyDescTaskTree(childTask);
-    }
-  }
-
   @SuppressWarnings("nls")
   public Phase1Ctx initPhase1Ctx() {
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Thu Apr 25 07:58:16
2013
@@ -243,6 +243,12 @@ public class MapredWork extends Abstract
     return keyDesc;
   }
 
+  /**
+   * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc
pointing
+   * to keySerializeInfo of the ReduceSink
+   *
+   * @param keyDesc
+   */
   public void setKeyDesc(final TableDesc keyDesc) {
     this.keyDesc = keyDesc;
   }

Modified: hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q?rev=1475661&r1=1475660&r2=1475661&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q Thu Apr 25 07:58:16 2013
@@ -52,6 +52,8 @@ smallTbl2 on (firstjoin.value1 = smallTb
 set hive.auto.convert.join.noconditionaltask=true;
 set hive.auto.convert.join.noconditionaltask.size=10000;
 
+-- Now run a query with two-way join, which should be converted into a
+-- map-join followed by groupby - two MR jobs overall 
 explain
 select count(*) FROM
 (select bigTbl.key as key, bigTbl.value as value1,
@@ -69,6 +71,32 @@ select count(*) FROM
 JOIN                                                                  
 smallTbl2 on (firstjoin.value1 = smallTbl2.value);
 
+set hive.optimize.mapjoin.mapreduce=true;
+
+-- Now run a query with two-way join, which should first be converted into a
+-- map-join followed by groupby and then finally into a single MR job.
+
+explain insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output'
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1 
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN                                                                  
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key;
+
+insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output'
+select count(*) FROM
+(select bigTbl.key as key, bigTbl.value as value1,
+ bigTbl.value as value2 FROM bigTbl JOIN smallTbl1 
+ on (bigTbl.key = smallTbl1.key)
+) firstjoin
+JOIN                                                                  
+smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+group by smallTbl2.key;
+set hive.optimize.mapjoin.mapreduce=false;
+
 create table smallTbl3(key string, value string);
 insert overwrite table smallTbl3 select * from src where key < 10;
 
@@ -101,6 +129,21 @@ select * from
 
 set hive.auto.convert.join.noconditionaltask=false;
 
+explain
+select count(*) FROM
+ (
+   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
+            bigTbl.value as value1, bigTbl.value as value2 
+     FROM bigTbl JOIN smallTbl1 
+     on (bigTbl.key1 = smallTbl1.key)
+    ) firstjoin
+    JOIN                                                                  
+    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
 select count(*) FROM
  (
    SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
@@ -118,7 +161,8 @@ select count(*) FROM
 set hive.auto.convert.join.noconditionaltask=true;
 set hive.auto.convert.join.noconditionaltask.size=10000;
 
--- join with 4 tables on different keys is also executed as a single MR job
+-- join with 4 tables on different keys is also executed as a single MR job,
+-- So, overall two jobs - one for multi-way join and one for count(*)
 explain
 select count(*) FROM
  (
@@ -147,3 +191,37 @@ select count(*) FROM
     smallTbl2 on (firstjoin.value1 = smallTbl2.value)
  ) secondjoin
  JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
+set hive.optimize.mapjoin.mapreduce=true;
+-- Now run the above query with M-MR optimization
+-- This should be a single MR job end-to-end.
+explain
+select count(*) FROM
+ (
+   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
+            bigTbl.value as value1, bigTbl.value as value2 
+     FROM bigTbl JOIN smallTbl1 
+     on (bigTbl.key1 = smallTbl1.key)
+    ) firstjoin
+    JOIN                                                                  
+    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
+select count(*) FROM
+ (
+   SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3,
+          firstjoin.value1 as value1, firstjoin.value2 as value2 FROM
+    (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, 
+            bigTbl.value as value1, bigTbl.value as value2 
+     FROM bigTbl JOIN smallTbl1 
+     on (bigTbl.key1 = smallTbl1.key)
+    ) firstjoin
+    JOIN                                                                  
+    smallTbl2 on (firstjoin.value1 = smallTbl2.value)
+ ) secondjoin
+ JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key);
+
+set hive.optimize.mapjoin.mapreduce=false;



Mime
View raw message