Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 336481084B for ; Thu, 25 Apr 2013 07:58:47 +0000 (UTC) Received: (qmail 27280 invoked by uid 500); 25 Apr 2013 07:58:46 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 24658 invoked by uid 500); 25 Apr 2013 07:58:40 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 23923 invoked by uid 99); 25 Apr 2013 07:58:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Apr 2013 07:58:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Apr 2013 07:58:37 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 698CD2388A33; Thu, 25 Apr 2013 07:58:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1475661 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/s... Date: Thu, 25 Apr 2013 07:58:16 -0000 To: commits@hive.apache.org From: namit@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130425075817.698CD2388A33@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: namit Date: Thu Apr 25 07:58:16 2013 New Revision: 1475661 URL: http://svn.apache.org/r1475661 Log: HIVE-3952 merge map-job followed by map-reduce job (Vinod Kumar Vavilapalli via namit) Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/trunk/conf/hive-default.xml.template hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q hive/trunk/ql/src/test/results/clientpositive/multiMapJoin1.q.out Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Apr 25 07:58:16 2013 @@ -494,6 +494,7 @@ public class HiveConf extends Configurat HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true), HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD("hive.auto.convert.join.noconditionaltask.size", 10000000L), + HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR("hive.optimize.mapjoin.mapreduce", false), HIVESKEWJOINKEY("hive.skewjoin.key", 100000), HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000), HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M Modified: hive/trunk/conf/hive-default.xml.template URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/conf/hive-default.xml.template (original) +++ hive/trunk/conf/hive-default.xml.template Thu Apr 25 07:58:16 2013 @@ -842,6 +842,16 @@ + hive.optimize.mapjoin.mapreduce + false + If hive.auto.convert.join is off, this parameter does not take + affect. If it is on, and if there are map-join jobs followed by a map-reduce + job (for e.g a group by), each map-only job is merged with the following + map-reduce job. + + + + hive.script.auto.progress false Whether Hive Tranform/Map/Reduce Clause should automatically send progress information to TaskTracker to avoid the task getting killed because of inactivity. Hive sends progress information when the script is outputting to stderr. This option removes the need of periodically producing stderr messages, but users should be cautious because this may prevent infinite loops in the scripts to be killed by TaskTracker. Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Thu Apr 25 07:58:16 2013 @@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -761,6 +763,41 @@ public final class GenMapRedUtils { } /** + * Set the key and value description for all the tasks rooted at the given + * task. Loops over all the tasks recursively. + * + * @param task + */ + public static void setKeyAndValueDescForTaskTree(Task task) { + + if (task instanceof ConditionalTask) { + List> listTasks = ((ConditionalTask) task) + .getListTasks(); + for (Task tsk : listTasks) { + setKeyAndValueDescForTaskTree(tsk); + } + } else if (task instanceof ExecDriver) { + MapredWork work = (MapredWork) task.getWork(); + work.deriveExplainAttributes(); + HashMap> opMap = work + .getAliasToWork(); + if (opMap != null && !opMap.isEmpty()) { + for (Operator op : opMap.values()) { + setKeyAndValueDesc(work, op); + } + } + } + + if (task.getChildTasks() == null) { + return; + } + + for (Task childTask : task.getChildTasks()) { + setKeyAndValueDescForTaskTree(childTask); + } + } + + /** * create a new plan and return. * * @return the new plan Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Thu Apr 25 07:58:16 2013 @@ -113,9 +113,11 @@ public class MapJoinProcessor implements } /** - * Generate the MapRed Local Work + * Generate the MapRed Local Work for the given map-join operator + * * @param newWork * @param mapJoinOp + * map-join operator for which local work needs to be generated. * @param bigTablePos * @return * @throws SemanticException @@ -225,6 +227,16 @@ public class MapJoinProcessor implements return bigTableAlias; } + /** + * Convert the join to a map-join and also generate any local work needed. + * + * @param newWork MapredWork in which the conversion is to happen + * @param op + * The join operator that needs to be converted to map-join + * @param bigTablePos + * @return the alias to the big table + * @throws SemanticException + */ public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos) throws SemanticException { LinkedHashMap, OpParseContext> opParseCtxMap = Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Thu Apr 25 07:58:16 2013 @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.Ta import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.QBJoinTree; @@ -239,6 +240,35 @@ public class CommonJoinTaskDispatcher ex oldChildTask.getParentTasks().add(task); } } + + boolean convertToSingleJob = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR); + if (convertToSingleJob) { + copyReducerConf(task, childTask); + } + } + + /** + * Copy reducer configuration if the childTask also has a reducer. + * + * @param task + * @param childTask + */ + private void copyReducerConf(MapRedTask task, MapRedTask childTask) { + MapredWork childWork = childTask.getWork(); + Operator childReducer = childWork.getReducer(); + MapredWork work = task.getWork(); + if (childReducer == null) { + return; + } + work.setReducer(childReducer); + work.setNumReduceTasks(childWork.getNumReduceTasks()); + work.setJoinTree(childWork.getJoinTree()); + work.setNeedsTagging(childWork.getNeedsTagging()); + + // Make sure the key configuration is correct, clear and regenerate. + work.getTagToValueDesc().clear(); + GenMapRedUtils.setKeyAndValueDescForTaskTree(task); } // create map join task and set big table as bigTablePosition @@ -255,6 +285,125 @@ public class CommonJoinTaskDispatcher ex return new ObjectPair(newTask, bigTableAlias); } + /* + * A task and its child task has been converted from join to mapjoin. + * See if the two tasks can be merged. + */ + private void mergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Configuration conf) { + if (mapJoinTask.getChildTasks() == null + || mapJoinTask.getChildTasks().size() > 1) { + // No child-task to merge, nothing to do or there are more than one + // child-tasks in which case we don't want to do anything. + return; + } + Task firstChildTask = mapJoinTask.getChildTasks().get(0); + if (!(firstChildTask instanceof MapRedTask)) { + // Nothing to do if it is not a mapreduce task. + return; + } + MapRedTask childTask = (MapRedTask) firstChildTask; + MapredWork mapJoinWork = mapJoinTask.getWork(); + MapredWork childWork = childTask.getWork(); + Operator childReducer = childWork.getReducer(); + if (childReducer == null) { + // Not a MR job, nothing to merge. + return; + } + + // Can this be merged + Map> aliasToWork = mapJoinWork.getAliasToWork(); + if (aliasToWork.size() > 1) { + return; + } + Map> childPathToAliases = childWork.getPathToAliases(); + if (childPathToAliases.size() > 1) { + return; + } + + // Locate leaf operator of the map-join task. Start by initializing leaf + // operator to be root operator. + Operator mapJoinLeafOperator = aliasToWork.values().iterator().next(); + while (mapJoinLeafOperator.getChildOperators() != null) { + // Dont perform this optimization for multi-table inserts + if (mapJoinLeafOperator.getChildOperators().size() > 1) { + return; + } + mapJoinLeafOperator = mapJoinLeafOperator.getChildOperators().get(0); + } + + assert (mapJoinLeafOperator instanceof FileSinkOperator); + if (!(mapJoinLeafOperator instanceof FileSinkOperator)) { + // Sanity check, shouldn't happen. + return; + } + + FileSinkOperator mapJoinTaskFileSinkOperator = (FileSinkOperator) mapJoinLeafOperator; + + // The filesink writes to a different directory + String workDir = mapJoinTaskFileSinkOperator.getConf().getDirName(); + if (!childPathToAliases.keySet().iterator().next().equals(workDir)) { + return; + } + + MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork(); + MapredLocalWork childLocalWork = childWork.getMapLocalWork(); + + // Either of them should not be bucketed + if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) || + (childLocalWork != null && childLocalWork.getBucketMapjoinContext() != null)) { + return; + } + + if (childWork.getAliasToWork().size() > 1) { + return; + } + + Operator childAliasOp = + childWork.getAliasToWork().values().iterator().next(); + if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) { + return; + } + + // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the + // top of the second + Operator parentFOp = mapJoinTaskFileSinkOperator + .getParentOperators().get(0); + parentFOp.getChildOperators().remove(mapJoinTaskFileSinkOperator); + parentFOp.getChildOperators().add(childAliasOp); + List> parentOps = + new ArrayList>(); + parentOps.add(parentFOp); + childAliasOp.setParentOperators(parentOps); + + mapJoinWork.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo()); + for (Map.Entry childWorkEntry : childWork.getPathToPartitionInfo() + .entrySet()) { + if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) { + mapJoinWork.getPathToPartitionInfo() + .put(childWorkEntry.getKey(), childWorkEntry.getValue()); + } + } + + // Fill up stuff in local work + if (mapJoinLocalWork != null && childLocalWork != null) { + mapJoinLocalWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork()); + mapJoinLocalWork.getAliasToWork().putAll(childLocalWork.getAliasToWork()); + } + + // remove the child task + List> oldChildTasks = childTask.getChildTasks(); + mapJoinTask.setChildTasks(oldChildTasks); + if (oldChildTasks != null) { + for (Task oldChildTask : oldChildTasks) { + oldChildTask.getParentTasks().remove(childTask); + oldChildTask.getParentTasks().add(mapJoinTask); + } + } + + // Copy the reducer conf. + copyReducerConf(mapJoinTask, childTask); + } + @Override public Task processCurrentTask(MapRedTask currTask, ConditionalTask conditionalTask, Context context) @@ -365,11 +514,21 @@ public class CommonJoinTaskDispatcher ex // Can this task be merged with the child task. This can happen if a big table is being // joined with multiple small tables on different keys - // Further optimizations are possible here, a join which has been converted to a mapjoin - // followed by a mapjoin can be performed in a single MR job. - if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1) - && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) { - mergeMapJoinTaskWithChildMapJoinTask(newTask, conf); + if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)) { + if (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP) { + // Merging two map-join tasks + mergeMapJoinTaskWithChildMapJoinTask(newTask, conf); + } + + // Converted the join operator into a map-join. Now see if it can + // be merged into the following map-reduce job. + boolean convertToSingleJob = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR); + if (convertToSingleJob) { + // Try merging a map-join task with a mapreduce job to have a + // single job. + mergeMapJoinTaskWithMapReduceTask(newTask, conf); + } } return newTask; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Apr 25 07:58:16 2013 @@ -8373,7 +8373,7 @@ public class SemanticAnalyzer extends Ba // For each task, set the key descriptor for the reducer for (Task rootTask : rootTasks) { - setKeyDescTaskTree(rootTask); + GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask); } // If a task contains an operator which instructs bucketizedhiveinputformat @@ -8599,36 +8599,6 @@ public class SemanticAnalyzer extends Ba } } - // loop over all the tasks recursviely - private void setKeyDescTaskTree(Task task) { - - if (task instanceof ExecDriver) { - MapredWork work = (MapredWork) task.getWork(); - work.deriveExplainAttributes(); - HashMap> opMap = work - .getAliasToWork(); - if (!opMap.isEmpty()) { - for (Operator op : opMap.values()) { - GenMapRedUtils.setKeyAndValueDesc(work, op); - } - } - } else if (task instanceof ConditionalTask) { - List> listTasks = ((ConditionalTask) task) - .getListTasks(); - for (Task tsk : listTasks) { - setKeyDescTaskTree(tsk); - } - } - - if (task.getChildTasks() == null) { - return; - } - - for (Task childTask : task.getChildTasks()) { - setKeyDescTaskTree(childTask); - } - } - @SuppressWarnings("nls") public Phase1Ctx initPhase1Ctx() { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Thu Apr 25 07:58:16 2013 @@ -243,6 +243,12 @@ public class MapredWork extends Abstract return keyDesc; } + /** + * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing + * to keySerializeInfo of the ReduceSink + * + * @param keyDesc + */ public void setKeyDesc(final TableDesc keyDesc) { this.keyDesc = keyDesc; } Modified: hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q?rev=1475661&r1=1475660&r2=1475661&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q Thu Apr 25 07:58:16 2013 @@ -52,6 +52,8 @@ smallTbl2 on (firstjoin.value1 = smallTb set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; +-- Now run a query with two-way join, which should be converted into a +-- map-join followed by groupby - two MR jobs overall explain select count(*) FROM (select bigTbl.key as key, bigTbl.value as value1, @@ -69,6 +71,32 @@ select count(*) FROM JOIN smallTbl2 on (firstjoin.value1 = smallTbl2.value); +set hive.optimize.mapjoin.mapreduce=true; + +-- Now run a query with two-way join, which should first be converted into a +-- map-join followed by groupby and then finally into a single MR job. + +explain insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output' +select count(*) FROM +(select bigTbl.key as key, bigTbl.value as value1, + bigTbl.value as value2 FROM bigTbl JOIN smallTbl1 + on (bigTbl.key = smallTbl1.key) +) firstjoin +JOIN +smallTbl2 on (firstjoin.value1 = smallTbl2.value) +group by smallTbl2.key; + +insert overwrite directory '${system:test.tmp.dir}/multiJoin1.output' +select count(*) FROM +(select bigTbl.key as key, bigTbl.value as value1, + bigTbl.value as value2 FROM bigTbl JOIN smallTbl1 + on (bigTbl.key = smallTbl1.key) +) firstjoin +JOIN +smallTbl2 on (firstjoin.value1 = smallTbl2.value) +group by smallTbl2.key; +set hive.optimize.mapjoin.mapreduce=false; + create table smallTbl3(key string, value string); insert overwrite table smallTbl3 select * from src where key < 10; @@ -101,6 +129,21 @@ select * from set hive.auto.convert.join.noconditionaltask=false; +explain +select count(*) FROM + ( + SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3, + firstjoin.value1 as value1, firstjoin.value2 as value2 FROM + (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, + bigTbl.value as value1, bigTbl.value as value2 + FROM bigTbl JOIN smallTbl1 + on (bigTbl.key1 = smallTbl1.key) + ) firstjoin + JOIN + smallTbl2 on (firstjoin.value1 = smallTbl2.value) + ) secondjoin + JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key); + select count(*) FROM ( SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3, @@ -118,7 +161,8 @@ select count(*) FROM set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; --- join with 4 tables on different keys is also executed as a single MR job +-- join with 4 tables on different keys is also executed as a single MR job, +-- So, overall two jobs - one for multi-way join and one for count(*) explain select count(*) FROM ( @@ -147,3 +191,37 @@ select count(*) FROM smallTbl2 on (firstjoin.value1 = smallTbl2.value) ) secondjoin JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key); + +set hive.optimize.mapjoin.mapreduce=true; +-- Now run the above query with M-MR optimization +-- This should be a single MR job end-to-end. +explain +select count(*) FROM + ( + SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3, + firstjoin.value1 as value1, firstjoin.value2 as value2 FROM + (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, + bigTbl.value as value1, bigTbl.value as value2 + FROM bigTbl JOIN smallTbl1 + on (bigTbl.key1 = smallTbl1.key) + ) firstjoin + JOIN + smallTbl2 on (firstjoin.value1 = smallTbl2.value) + ) secondjoin + JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key); + +select count(*) FROM + ( + SELECT firstjoin.key1 as key1, firstjoin.key2 as key2, smallTbl2.key as key3, + firstjoin.value1 as value1, firstjoin.value2 as value2 FROM + (SELECT bigTbl.key1 as key1, bigTbl.key2 as key2, + bigTbl.value as value1, bigTbl.value as value2 + FROM bigTbl JOIN smallTbl1 + on (bigTbl.key1 = smallTbl1.key) + ) firstjoin + JOIN + smallTbl2 on (firstjoin.value1 = smallTbl2.value) + ) secondjoin + JOIN smallTbl3 on (secondjoin.key2 = smallTbl3.key); + +set hive.optimize.mapjoin.mapreduce=false;