Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 03C381720A for ; Fri, 4 Sep 2015 08:52:36 +0000 (UTC) Received: (qmail 37446 invoked by uid 500); 4 Sep 2015 08:52:35 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 37410 invoked by uid 500); 4 Sep 2015 08:52:35 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 37401 invoked by uid 99); 4 Sep 2015 08:52:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Sep 2015 08:52:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7BE5DFBAB; Fri, 4 Sep 2015 08:52:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tajo git commit: TAJO-1809: Change default value of several configurations. Date: Fri, 4 Sep 2015 08:52:35 +0000 (UTC) Repository: tajo Updated Branches: refs/heads/master d794c1d7b -> 7e0a4a1e4 TAJO-1809: Change default value of several configurations. Closes #718 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7e0a4a1e Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7e0a4a1e Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7e0a4a1e Branch: refs/heads/master Commit: 7e0a4a1e4686314feed22c7a352c9c70ef8a0e40 Parents: d794c1d Author: Jihoon Son Authored: Fri Sep 4 17:52:22 2015 +0900 Committer: Jihoon Son Committed: Fri Sep 4 17:52:22 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../main/java/org/apache/tajo/SessionVars.java | 8 ++-- .../java/org/apache/tajo/conf/TajoConf.java | 33 ++++++++-------- .../TooLargeInputForCrossJoinException.java | 2 +- .../planner/physical/TestHashJoinExec.java | 23 +++++++---- .../apache/tajo/engine/query/TestJoinQuery.java | 14 +++---- .../TestTajoCli/testHelpSessionVars.result | 4 +- .../engine/planner/PhysicalPlannerImpl.java | 30 ++++++++------ .../rewriter/rules/BroadcastJoinRule.java | 7 +++- .../java/org/apache/tajo/querymaster/Stage.java | 41 ++++++++++++++++++-- .../plan/verifier/PostLogicalPlanVerifier.java | 5 ++- 11 files changed, 112 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e93e4e8..6e55fb4 100644 --- a/CHANGES +++ b/CHANGES @@ -482,6 +482,8 @@ Release 0.11.0 - unreleased TASKS + TAJO-1809: Change default value of several configurations. (jihoon) + TAJO-1803: Use in-memory derby as the default catalog for unit tests. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 7d47c23..36234e0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -90,14 +90,16 @@ public enum SessionVars implements ConfigKey { // for distributed query strategies BROADCAST_NON_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD, - "restriction for the total bytes of broadcasted table for non-cross join", DEFAULT, Long.class, + "restriction for the total size of broadcasted table for non-cross join (kb)", DEFAULT, Long.class, Validators.min("0")), BROADCAST_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD, - "restriction for the total bytes of broadcasted table for cross join", DEFAULT, Long.class, Validators.min("0")), + "restriction for the total size of broadcasted table for cross join (kb)", DEFAULT, Long.class, + Validators.min("0")), JOIN_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_JOIN_TASK_VOLUME, "join task input size (mb) ", DEFAULT, Integer.class, Validators.min("1")), - SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)", DEFAULT), + SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)", DEFAULT, + Integer.class, Validators.min("1")), GROUPBY_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_GROUPBY_TASK_VOLUME, "group by task input size (mb)", DEFAULT), JOIN_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_JOIN_PARTITION_VOLUME, "shuffle output size for join (mb)", DEFAULT, http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 2a30995..909f266 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -199,7 +199,7 @@ public class TajoConf extends Configuration { // Query Configuration QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")), - QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 1024, Validators.min("0")), + QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 0, Validators.min("0")), // Shuffle Configuration -------------------------------------------------- PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")), @@ -308,14 +308,14 @@ public class TajoConf extends Configuration { // Query and Optimization --------------------------------------------------- // for distributed query strategies - $DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-bytes", - (long)5 * 1048576), // 5 MB - $DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-bytes", - (long)1 * 1048576), // 1 MB - - $DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128), - $DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 128), - $DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 128), + $DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-kb", 5 * 1024l, + Validators.min("0")), // 5 MB + $DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-kb", 1 * 1024l, + Validators.min("0")), // 1 MB + + $DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 64), + $DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 64), + $DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 64), $DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128, Validators.min("1")), $DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256, Validators.min("1")), $DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256, Validators.min("1")), @@ -326,14 +326,13 @@ public class TajoConf extends Configuration { // for physical Executors $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L), - $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes", - (long)256 * 1048576), - $EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-bytes", - (long)256 * 1048576), - $EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-bytes", - (long)256 * 1048576), - $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes", - (long)256 * 1048576), + $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb", 64l, Validators.min("0")), + $EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb", 64l, + Validators.min("0")), + $EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-mb", 64l, + Validators.min("0")), + $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb", 64l, + Validators.min("0")), $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation (todo this is broken) http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java index 32cd44e..55d5f46 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java @@ -33,6 +33,6 @@ public class TooLargeInputForCrossJoinException extends TajoException { } public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold) { - super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), "" + currentBroadcastThreshold); + super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " MB"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java index 4550db9..6f665ea 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -42,6 +42,7 @@ import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; @@ -205,7 +206,7 @@ public class TestHashJoinExec { LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l); + ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 1); // set hash join limit as 1 MB PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); @@ -224,9 +225,9 @@ public class TestHashJoinExec { * we use some boolean variable leftSmaller to indicate which side is small. */ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx, - PhysicalPlannerImpl phyPlanner, - JoinNode joinNode, BinaryPhysicalExec joinExec) throws - IOException { + PhysicalPlannerImpl phyPlanner, + JoinNode joinNode, BinaryPhysicalExec joinExec) + throws IOException { String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild()); String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild()); @@ -268,12 +269,18 @@ public class TestHashJoinExec { assertEquals("default.p", right[0]); } + // To test the behaviour of PhysicalPlannerImpl.checkIfInMemoryInnerJoinIsPossible(), + // use a fake value for table volumes. if (leftSmaller) { - assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true)); - assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false)); + assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, + PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 1 * StorageUnit.MB, true)); + assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, + PlannerUtil.getRelationLineage(joinNode.getRightChild()), 5 * StorageUnit.MB, false)); } else { - assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true)); - assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false)); + assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, + PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 5 * StorageUnit.MB, true)); + assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, + PlannerUtil.getRelationLineage(joinNode.getRightChild()), 1 * StorageUnit.MB, false)); } return leftSmaller; http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java index 706f201..4dcf562 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java @@ -60,9 +60,9 @@ public class TestJoinQuery extends QueryTestCaseBase { testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "true"); testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname, - "" + (5 * 1024)); + "" + 5); testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname, - "" + (2 * 1024)); + "" + 2); testingCluster.setAllTajoDaemonConfValue( ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, @@ -79,19 +79,19 @@ public class TestJoinQuery extends QueryTestCaseBase { if (joinOption.indexOf("Hash") >= 0) { testingCluster.setAllTajoDaemonConfValue( - ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256 * 1048576)); + ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256)); testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, - String.valueOf(256 * 1048576)); + String.valueOf(256)); testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname, - String.valueOf(256 * 1048576)); + String.valueOf(256)); } if (joinOption.indexOf("Sort") >= 0) { testingCluster.setAllTajoDaemonConfValue( ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1)); testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, - String.valueOf(1)); + String.valueOf(0)); testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname, - String.valueOf(1)); + String.valueOf(0)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index 33443da..51fb61f 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -18,8 +18,8 @@ Available Session Variables: \set LC_MONETARY [text value] - Formatting of currency amounts \set LC_NUMERIC [text value] - Formatting of numbers \set LC_TIME [text value] - Formatting of dates and times -\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for non-cross join -\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for cross join +\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of broadcasted table for non-cross join (kb) +\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of broadcasted table for cross join (kb) \set JOIN_TASK_INPUT_SIZE [int value] - join task input size (mb) \set SORT_TASK_INPUT_SIZE [int value] - sort task input size (mb) \set GROUPBY_TASK_INPUT_SIZE [int value] - group by task input size (mb) http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 9847ff6..36d80da 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -53,6 +53,7 @@ import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.TUtil; @@ -261,26 +262,33 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { return size; } - @VisibleForTesting - public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left) + private boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left) throws IOException { String [] lineage = PlannerUtil.getRelationLineage(node); long volume = estimateSizeRecursive(context, lineage); - boolean inMemoryInnerJoinFlag = false; + return checkIfInMemoryInnerJoinIsPossible(context, lineage, volume, left); + } + + @VisibleForTesting + public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, String [] lineage, long tableVolume, + boolean left) throws IOException { + boolean inMemoryInnerJoinFlag; QueryContext queryContext = context.getQueryContext(); if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) { - inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT); + inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT) + * StorageUnit.MB; } else { - inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); + inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT) + * StorageUnit.MB; } LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.", context.getTaskId().toString(), (left ? "Left" : "Right"), StringUtils.join(lineage), - FileUtil.humanReadableByteCount(volume, false), + FileUtil.humanReadableByteCount(tableVolume, false), (inMemoryInnerJoinFlag ? "" : "not "))); return inMemoryInnerJoinFlag; } @@ -484,9 +492,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { QueryContext queryContext = context.getQueryContext(); if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) { - hashJoin = rightTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT); + hashJoin = rightTableVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB; } else { - hashJoin = rightTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); + hashJoin = rightTableVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB; } if (hashJoin) { @@ -512,9 +520,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { QueryContext queryContext = context.getQueryContext(); if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) { - hashJoin = leftTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT); + hashJoin = leftTableVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB; } else { - hashJoin = leftTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT); + hashJoin = leftTableVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)* StorageUnit.MB; } if (hashJoin){ @@ -1015,7 +1023,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild()); long estimatedSize = estimateSizeRecursive(context, outerLineage); - final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT); + final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT) * StorageUnit.MB; // if the relation size is less than the threshold, // the hash aggregation will be used. http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index dbb92e1..b320a81 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -32,6 +32,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.graph.DirectedGraphVisitor; @@ -80,8 +81,10 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { @Override public boolean isEligible(OverridableConf queryContext, MasterPlan plan) { - long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD); - long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD); + long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD) * + StorageUnit.KB; + long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD) * + StorageUnit.KB; boolean broadcastJoinEnabled = queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED); if (broadcastJoinEnabled && (thresholdForNonCrossJoin > 0 || thresholdForCrossJoin > 0)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 2ded786..0276cc2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -1048,14 +1048,47 @@ public class Stage implements EventHandler { * @return */ public static int getNonLeafTaskNum(Stage stage) { + // This method is assumed to be called only for aggregation or sort. + LogicalNode plan = stage.getBlock().getPlan(); + LogicalNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); + LogicalNode groupbyNode = PlannerUtil.findTopNode(plan, NodeType.GROUP_BY); + + // Task volume is assumed to be 64 MB by default. + long taskVolume = 64; + + if (groupbyNode != null && sortNode == null) { + // aggregation plan + taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE); + } else if (sortNode != null && groupbyNode == null) { + // sort plan + taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE); + } else if (sortNode != null /* && groupbyNode != null */) { + // NOTE: when the plan includes both aggregation and sort, usually aggregation is executed first. + // If not, we need to check the query plan is valid. + LogicalNode aggChildOfSort = PlannerUtil.findTopNode(sortNode, NodeType.GROUP_BY); + boolean aggFirst = aggChildOfSort != null && aggChildOfSort.equals(groupbyNode); + // Set task volume according to the operator which will be executed first. + if (aggFirst) { + // choose aggregation task volume + taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE); + } else { + // choose sort task volume + LOG.warn("Sort is executed before aggregation."); + taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE); + } + } else { + LOG.warn("Task volume is chosen as " + taskVolume + " in unexpected case."); + } + // Getting intermediate data size long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock()); - int mb = (int) Math.ceil((double)volume / 1048576); + int mb = (int) Math.ceil((double)volume / (double)StorageUnit.MB); LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); - // determine the number of task per 64MB - int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1)); - int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / 64)); + // determine the number of task + int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf(). + getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1)); + int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / taskVolume)); LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum); return maxTaskNum; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java index a46d66e..7442561 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java @@ -26,6 +26,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.TUtil; import java.util.List; @@ -89,7 +90,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor largeRelationNames = TUtil.newList(); if (isSimpleRelationNode(node.getLeftChild())) { - if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin) { + if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) { crossJoinAllowed = true; } else { largeRelationNames.add(((ScanNode) node.getLeftChild()).getCanonicalName()); @@ -97,7 +98,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor