Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 99DD0200C62 for ; Wed, 26 Apr 2017 19:31:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9852A160B8F; Wed, 26 Apr 2017 17:31:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BA966160BA8 for ; Wed, 26 Apr 2017 19:31:09 +0200 (CEST) Received: (qmail 15716 invoked by uid 500); 26 Apr 2017 17:31:08 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 15621 invoked by uid 99); 26 Apr 2017 17:31:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Apr 2017 17:31:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B9AADE1103; Wed, 26 Apr 2017 17:31:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prasanthj@apache.org To: commits@hive.apache.org Message-Id: <2f7b9456c4704b2ebf3fb9f555adf7ce@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-16503: LLAP: Oversubscribe memory for noconditional task size Date: Wed, 26 Apr 2017 17:31:08 +0000 (UTC) archived-at: Wed, 26 Apr 2017 17:31:11 -0000 Repository: hive Updated Branches: refs/heads/master b271bcb7c -> 17b1110fa HIVE-16503: LLAP: Oversubscribe memory for noconditional task size Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17b1110f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17b1110f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17b1110f Branch: refs/heads/master Commit: 17b1110fa8c01f2e9283546ea205d9ae2bdc13bc Parents: b271bcb Author: Prasanth Jayachandran Authored: Wed Apr 26 10:30:54 2017 -0700 Committer: Prasanth Jayachandran Committed: Wed Apr 26 10:30:54 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 9 + ql/pom.xml | 7 + .../hive/ql/optimizer/ConvertJoinMapJoin.java | 74 +++- .../physical/LlapClusterStateForCompile.java | 11 +- .../hadoop/hive/ql/exec/TestOperators.java | 63 +++ .../dynamic_semijoin_user_level.q | 1 + .../test/queries/clientpositive/explainuser_4.q | 1 + .../test/queries/clientpositive/tez_smb_main.q | 13 +- .../tez_vector_dynpart_hashjoin_1.q | 1 + .../clientpositive/llap/tez_smb_main.q.out | 426 +++++++++++++++++++ 10 files changed, 589 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8e5a9aa..d3ea824 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3143,6 +3143,15 @@ public class HiveConf extends Configuration { LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4, "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" + "executed in parallel.", "llap.daemon.num.executors"), + LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR("hive.llap.mapjoin.memory.oversubscribe.factor", 0.2f, + "Fraction of memory from hive.auto.convert.join.noconditionaltask.size that can be over subscribed\n" + + "by queries running in LLAP mode. This factor has to be from 0.0 to 1.0. Default is 20% over subscription.\n"), + LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", 3, + "Used along with hive.llap.mapjoin.memory.oversubscribe.factor to limit the number of executors from\n" + + "which memory for mapjoin can be borrowed. Default 3 (from 3 other executors\n" + + "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" + + "conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" + + "executors and configured max concurrency."), LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4, "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" + "executors in llap daemon, it would be set to number of executors at runtime.", http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index e5d063f..40a216b 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -229,6 +229,13 @@ org.apache.hadoop + hadoop-yarn-registry + ${hadoop.version} + true + test + + + org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} true http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 637bc54..ad77e87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; @@ -68,6 +69,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * ConvertJoinMapJoin is an optimization that replaces a common join * (aka shuffle join) with a map join (aka broadcast or fragment replicate @@ -95,15 +98,18 @@ public class ConvertJoinMapJoin implements NodeProcessor { JoinOperator joinOp = (JoinOperator) nd; long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + // adjust noconditional task size threshold for LLAP + maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf); + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); + Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize); if (retval == null) { return retval; } else { - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } } @@ -120,13 +126,13 @@ public class ConvertJoinMapJoin implements NodeProcessor { LOG.info("Estimated number of buckets " + numBuckets); int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true); if (mapJoinConversionPos < 0) { - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); + Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize); if (retval == null) { return retval; } else { // only case is full outer join with SMB enabled which is not possible. Convert to regular // join. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } } @@ -147,7 +153,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } @@ -164,15 +170,54 @@ public class ConvertJoinMapJoin implements NodeProcessor { return null; } + @VisibleForTesting + public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) { + if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) { + LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf); + llapInfo.initClusterInfo(); + final int executorsPerNode; + if (!llapInfo.hasClusterInfo()) { + LOG.warn("LLAP cluster information not available. Falling back to getting #executors from hiveconf.."); + executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + } else { + final int numExecutorsPerNodeFromCluster = llapInfo.getNumExecutorsPerNode(); + if (numExecutorsPerNodeFromCluster == -1) { + LOG.warn("Cannot determine executor count from LLAP cluster information. Falling back to getting #executors" + + " from hiveconf.."); + executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + } else { + executorsPerNode = numExecutorsPerNodeFromCluster; + } + } + final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); + if (numSessions > 0) { + final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions); + final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); + final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); + final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery); + final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery)); + LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " + + "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " + + "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions, + availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize); + return Math.max(maxSize, llapMaxSize); + } else { + LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" + + " as no conditional task size for LLAP.", numSessions, maxSize); + } + } + return maxSize; + } + @SuppressWarnings("unchecked") private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp, - TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE)) && joinOp.getOpTraits().getNumReduceSinks() >= 2)) { - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } Class bigTableMatcherClass = null; @@ -201,7 +246,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { // contains aliases from sub-query // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } @@ -211,7 +256,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { } else { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); } return null; } @@ -928,15 +973,14 @@ public class ConvertJoinMapJoin implements NodeProcessor { return numBuckets; } - private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context) + private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context, + final long maxSize) throws SemanticException { // Attempt dynamic partitioned hash join // Since we don't have big table index yet, must start with estimate of numReducers int numReducers = estimateNumBuckets(joinOp, false); LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers"); - int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, - context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD), - false); + int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false); if (bigTablePos >= 0) { // Now that we have the big table index, get real numReducers value based on big table RS ReduceSinkOperator bigTableParentRS = @@ -971,11 +1015,11 @@ public class ConvertJoinMapJoin implements NodeProcessor { return false; } - private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context) + private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize) throws SemanticException { if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) && context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) { - if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) { + if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) { return; } } http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index b2e8614..a5ed308 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -41,6 +41,7 @@ public class LlapClusterStateForCompile { private static final long CLUSTER_UPDATE_INTERVAL_NS = 120 * 1000000000L; // 2 minutes. private Long lastClusterUpdateNs; private Integer noConfigNodeCount, executorCount; + private int numExecutorsPerNode = -1; private LlapRegistryService svc; private final Configuration conf; @@ -82,6 +83,10 @@ public class LlapClusterStateForCompile { return noConfigNodeCount; } + public int getNumExecutorsPerNode() { + return numExecutorsPerNode; + } + public synchronized void initClusterInfo() { if (lastClusterUpdateNs != null) { long elapsed = System.nanoTime() - lastClusterUpdateNs; @@ -111,7 +116,11 @@ public class LlapClusterStateForCompile { continue; } try { - executorsLocal += Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + int numExecutors = Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + executorsLocal += numExecutors; + if (numExecutorsPerNode == -1) { + numExecutorsPerNode = numExecutors; + } } catch (NumberFormatException e) { ++noConfigNodesLocal; } http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index 57e573a..b569549 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.IOContextMap; +import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.CollectDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -435,4 +436,66 @@ public class TestOperators extends TestCase { assertEquals(20, result.size()); driver.close(); } + + @Test + public void testNoConditionalTaskSizeForLlap() { + ConvertJoinMapJoin convertJoinMapJoin = new ConvertJoinMapJoin(); + long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L; + HiveConf hiveConf = new HiveConf(); + + // execution mode not set, default is returned + long gotSize = convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf); + assertEquals(defaultNoConditionalTaskSize, gotSize); + hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap"); + + // default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed + hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "0.2"); + double fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); + int maxSlots = 3; + long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots)); + assertEquals(expectedSize, + convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + + // num executors is less than max executors per query (which is not expected case), default executors will be + // chosen. 4 * 20% of noconditional task size will be oversubscribed + int chosenSlots = hiveConf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5"); + expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots)); + assertEquals(expectedSize, + convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + + // 2 concurrent sessions, 4 executors. 2 * 20% of noconditional task size will be oversubscribed + hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2"); + expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2)); + assertEquals(expectedSize, + convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + + // 4 concurrent sessions, 4 executors. 1 * 20% of noconditional task size will be oversubscribed + hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "4"); + expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 1)); + assertEquals(expectedSize, + convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + + // 8 concurrent sessions, 4 executors. default noconditioanl task will be used (no oversubscription) + hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "8"); + assertEquals(defaultNoConditionalTaskSize, + convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + + // 2 * 120% of noconditional task size will be oversubscribed + hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2"); + hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "1.2"); + fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); + expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2)); + assertEquals(expectedSize, + convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + + // 0 value for number of sessions + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "0"); + assertEquals(defaultNoConditionalTaskSize, + convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q index 88ab46e..11bd17a 100644 --- a/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q +++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q @@ -11,6 +11,7 @@ set hive.stats.autogather=true; set hive.tez.bigtable.minsize.semijoin.reduction=1; set hive.tez.min.bloom.filter.entries=1; set hive.stats.fetch.column.stats=true; +set hive.llap.memory.oversubscription.max.executors.per.query=0; -- Create Tables create table alltypesorc_int ( cint int, cstring string ) stored as ORC; http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/explainuser_4.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/explainuser_4.q b/ql/src/test/queries/clientpositive/explainuser_4.q index f58afa8..8f92140 100644 --- a/ql/src/test/queries/clientpositive/explainuser_4.q +++ b/ql/src/test/queries/clientpositive/explainuser_4.q @@ -1,4 +1,5 @@ set hive.mapred.mode=nonstrict; +set hive.llap.memory.oversubscription.max.executors.per.query=0; set hive.explain.user=true; set hive.auto.convert.join=false; http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/tez_smb_main.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q b/ql/src/test/queries/clientpositive/tez_smb_main.q index ee24691..e4ab75a 100644 --- a/ql/src/test/queries/clientpositive/tez_smb_main.q +++ b/ql/src/test/queries/clientpositive/tez_smb_main.q @@ -70,9 +70,15 @@ from tab a join tab_part b on a.key = b.key; set hive.auto.convert.join.noconditionaltask.size=500; set hive.mapjoin.hybridgrace.minwbsize=125; set hive.mapjoin.hybridgrace.minnumpartitions=4; +set hive.llap.memory.oversubscription.max.executors.per.query=0; +explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value; +set hive.llap.memory.oversubscription.max.executors.per.query=3; explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value; select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value; +set hive.llap.memory.oversubscription.max.executors.per.query=0; +explain select count(*) from tab a join tab_part b on a.value = b.value; +set hive.llap.memory.oversubscription.max.executors.per.query=3; explain select count(*) from tab a join tab_part b on a.value = b.value; select count(*) from tab a join tab_part b on a.value = b.value; @@ -84,10 +90,15 @@ select s2.key as key, s2.value as value from tab s2 set hive.auto.convert.join.noconditionaltask.size=10000; +set hive.llap.memory.oversubscription.max.executors.per.query=0; +explain select count(*) from tab a join tab_part b on a.value = b.value; +set hive.llap.memory.oversubscription.max.executors.per.query=2; explain select count(*) from tab a join tab_part b on a.value = b.value; select count(*) from tab a join tab_part b on a.value = b.value; - +set hive.llap.memory.oversubscription.max.executors.per.query=0; +explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value; +set hive.llap.memory.oversubscription.max.executors.per.query=2; explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value; select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value; set hive.stats.fetch.column.stats=true; http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q index 7dd3003..04683d2 100644 --- a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q +++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q @@ -3,6 +3,7 @@ set hive.mapred.mode=nonstrict; set hive.explain.user=false; set hive.auto.convert.join=false; set hive.optimize.dynamic.partition.hashjoin=false; +set hive.llap.memory.oversubscription.max.executors.per.query=0; -- First try with regular mergejoin explain http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out index b583bff..4f9c95a 100644 --- a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out +++ b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out @@ -679,6 +679,126 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 2 <- Map 1 (CUSTOM_EDGE), Map 4 (BROADCAST_EDGE) + Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Execution mode: llap + LLAP IO: no inputs + Map 2 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col1 + input vertices: + 0 Map 1 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 4 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value PREHOOK: type: QUERY PREHOOK: Input: default@src1 @@ -792,6 +912,102 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col1 (type: string) + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: select count(*) from tab a join tab_part b on a.value = b.value PREHOOK: type: QUERY PREHOOK: Input: default@tab @@ -1042,6 +1258,96 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 2 <- Map 1 (BROADCAST_EDGE) + Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: string) + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Map 2 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col1 (type: string) + input vertices: + 0 Map 1 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: select count(*) from tab a join tab_part b on a.value = b.value PREHOOK: type: QUERY PREHOOK: Input: default@tab @@ -1177,6 +1483,126 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 2 <- Map 1 (CUSTOM_EDGE), Map 4 (BROADCAST_EDGE) + Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Execution mode: llap + LLAP IO: no inputs + Map 2 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col1 + input vertices: + 0 Map 1 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 4 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: value is not null (type: boolean) + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: value (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: no inputs + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value PREHOOK: type: QUERY PREHOOK: Input: default@src1