tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject tajo git commit: TAJO-1809: Change default value of several configurations.
Date Fri, 04 Sep 2015 08:52:35 GMT
Repository: tajo
Updated Branches:
  refs/heads/master d794c1d7b -> 7e0a4a1e4


TAJO-1809: Change default value of several configurations.

Closes #718


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7e0a4a1e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7e0a4a1e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7e0a4a1e

Branch: refs/heads/master
Commit: 7e0a4a1e4686314feed22c7a352c9c70ef8a0e40
Parents: d794c1d
Author: Jihoon Son <jihoonson@apache.org>
Authored: Fri Sep 4 17:52:22 2015 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Fri Sep 4 17:52:22 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../main/java/org/apache/tajo/SessionVars.java  |  8 ++--
 .../java/org/apache/tajo/conf/TajoConf.java     | 33 ++++++++--------
 .../TooLargeInputForCrossJoinException.java     |  2 +-
 .../planner/physical/TestHashJoinExec.java      | 23 +++++++----
 .../apache/tajo/engine/query/TestJoinQuery.java | 14 +++----
 .../TestTajoCli/testHelpSessionVars.result      |  4 +-
 .../engine/planner/PhysicalPlannerImpl.java     | 30 ++++++++------
 .../rewriter/rules/BroadcastJoinRule.java       |  7 +++-
 .../java/org/apache/tajo/querymaster/Stage.java | 41 ++++++++++++++++++--
 .../plan/verifier/PostLogicalPlanVerifier.java  |  5 ++-
 11 files changed, 112 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e93e4e8..6e55fb4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -482,6 +482,8 @@ Release 0.11.0 - unreleased
   
   TASKS
 
+    TAJO-1809: Change default value of several configurations. (jihoon)
+
     TAJO-1803: Use in-memory derby as the default catalog for unit tests. 
     (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 7d47c23..36234e0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -90,14 +90,16 @@ public enum SessionVars implements ConfigKey {
 
   // for distributed query strategies
   BROADCAST_NON_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD,
-      "restriction for the total bytes of broadcasted table for non-cross join", DEFAULT,
Long.class,
+      "restriction for the total size of broadcasted table for non-cross join (kb)", DEFAULT,
Long.class,
       Validators.min("0")),
   BROADCAST_CROSS_JOIN_THRESHOLD(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD,
-      "restriction for the total bytes of broadcasted table for cross join", DEFAULT, Long.class,
Validators.min("0")),
+      "restriction for the total size of broadcasted table for cross join (kb)", DEFAULT,
Long.class,
+      Validators.min("0")),
 
   JOIN_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_JOIN_TASK_VOLUME, "join task input size (mb)
", DEFAULT,
       Integer.class, Validators.min("1")),
-  SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)",
DEFAULT),
+  SORT_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_SORT_TASK_VOLUME, "sort task input size (mb)",
DEFAULT,
+      Integer.class, Validators.min("1")),
   GROUPBY_TASK_INPUT_SIZE(ConfVars.$DIST_QUERY_GROUPBY_TASK_VOLUME, "group by task input
size (mb)", DEFAULT),
 
   JOIN_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_JOIN_PARTITION_VOLUME, "shuffle output size
for join (mb)", DEFAULT,

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 2a30995..909f266 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -199,7 +199,7 @@ public class TajoConf extends Configuration {
 
     // Query Configuration
     QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),
-    QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 1024, Validators.min("0")),
+    QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 0, Validators.min("0")),
 
     // Shuffle Configuration --------------------------------------------------
     PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
@@ -308,14 +308,14 @@ public class TajoConf extends Configuration {
     // Query and Optimization ---------------------------------------------------
 
     // for distributed query strategies
-    $DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-bytes",
-        (long)5 * 1048576), // 5 MB
-    $DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-bytes",
-        (long)1 * 1048576), // 1 MB
-
-    $DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 128),
-    $DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 128),
-    $DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 128),
+    $DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.non-cross-join.threshold-kb",
5 * 1024l,
+        Validators.min("0")), // 5 MB
+    $DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD("tajo.dist-query.broadcast.cross-join.threshold-kb",
1 * 1024l,
+        Validators.min("0")), // 1 MB
+
+    $DIST_QUERY_JOIN_TASK_VOLUME("tajo.dist-query.join.task-volume-mb", 64),
+    $DIST_QUERY_SORT_TASK_VOLUME("tajo.dist-query.sort.task-volume-mb", 64),
+    $DIST_QUERY_GROUPBY_TASK_VOLUME("tajo.dist-query.groupby.task-volume-mb", 64),
     $DIST_QUERY_JOIN_PARTITION_VOLUME("tajo.dist-query.join.partition-volume-mb", 128, Validators.min("1")),
     $DIST_QUERY_GROUPBY_PARTITION_VOLUME("tajo.dist-query.groupby.partition-volume-mb", 256,
Validators.min("1")),
     $DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb",
256, Validators.min("1")),
@@ -326,14 +326,13 @@ public class TajoConf extends Configuration {
 
     // for physical Executors
     $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
-    $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
-        (long)256 * 1048576),
-    $EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-bytes",
-        (long)256 * 1048576),
-    $EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-bytes",
-        (long)256 * 1048576),
-    $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes",
-        (long)256 * 1048576),
+    $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-mb",
64l, Validators.min("0")),
+    $EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-mb",
64l,
+        Validators.min("0")),
+    $EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-mb",
64l,
+        Validators.min("0")),
+    $EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-mb",
64l,
+        Validators.min("0")),
     $MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite
     $CODEGEN("tajo.executor.codegen.enabled", false), // Runtime code generation (todo this
is broken)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
index 32cd44e..55d5f46 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
@@ -33,6 +33,6 @@ public class TooLargeInputForCrossJoinException extends TajoException {
   }
 
   public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold)
{
-    super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), "" + currentBroadcastThreshold);
+    super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold
+ " MB");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 4550db9..6f665ea 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -42,6 +42,7 @@ import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -205,7 +206,7 @@ public class TestHashJoinExec {
         LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l);
+    ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 1); // set
hash join limit as 1 MB
     PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
@@ -224,9 +225,9 @@ public class TestHashJoinExec {
    * we use some boolean variable <code>leftSmaller</code> to indicate which
side is small.
    */
   private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
-                                                       PhysicalPlannerImpl phyPlanner,
-                                                       JoinNode joinNode, BinaryPhysicalExec
joinExec) throws
-      IOException {
+                                                              PhysicalPlannerImpl phyPlanner,
+                                                              JoinNode joinNode, BinaryPhysicalExec
joinExec)
+      throws IOException {
 
     String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
     String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
@@ -268,12 +269,18 @@ public class TestHashJoinExec {
       assertEquals("default.p", right[0]);
     }
 
+    // To test the behaviour of PhysicalPlannerImpl.checkIfInMemoryInnerJoinIsPossible(),
+    // use a fake value for table volumes.
     if (leftSmaller) {
-      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(),
true));
-      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(),
false));
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+          PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 1 * StorageUnit.MB, true));
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+          PlannerUtil.getRelationLineage(joinNode.getRightChild()), 5 * StorageUnit.MB, false));
     } else {
-      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(),
true));
-      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(),
false));
+      assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+          PlannerUtil.getRelationLineage(joinNode.getLeftChild()), 5 * StorageUnit.MB, true));
+      assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx,
+          PlannerUtil.getRelationLineage(joinNode.getRightChild()), 1 * StorageUnit.MB, false));
     }
 
     return leftSmaller;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 706f201..4dcf562 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -60,9 +60,9 @@ public class TestJoinQuery extends QueryTestCaseBase {
 
     testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname,
"true");
     testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_NON_CROSS_JOIN_THRESHOLD.varname,
-        "" + (5 * 1024));
+        "" + 5);
     testingCluster.setAllTajoDaemonConfValue(ConfVars.$DIST_QUERY_BROADCAST_CROSS_JOIN_THRESHOLD.varname,
-        "" + (2 * 1024));
+        "" + 2);
 
     testingCluster.setAllTajoDaemonConfValue(
         ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
@@ -79,19 +79,19 @@ public class TestJoinQuery extends QueryTestCaseBase {
 
     if (joinOption.indexOf("Hash") >= 0) {
       testingCluster.setAllTajoDaemonConfValue(
-          ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256 * 1048576));
+          ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(256));
       testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
-          String.valueOf(256 * 1048576));
+          String.valueOf(256));
       testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
-          String.valueOf(256 * 1048576));
+          String.valueOf(256));
     }
     if (joinOption.indexOf("Sort") >= 0) {
       testingCluster.setAllTajoDaemonConfValue(
           ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname, String.valueOf(1));
       testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD.varname,
-          String.valueOf(1));
+          String.valueOf(0));
       testingCluster.setAllTajoDaemonConfValue(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
-          String.valueOf(1));
+          String.valueOf(0));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 33443da..51fb61f 100644
--- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -18,8 +18,8 @@ Available Session Variables:
 \set LC_MONETARY [text value] - Formatting of currency amounts
 \set LC_NUMERIC [text value] - Formatting of numbers
 \set LC_TIME [text value] - Formatting of dates and times
-\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of
broadcasted table for non-cross join
-\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted
table for cross join
+\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of
broadcasted table for non-cross join (kb)
+\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total size of broadcasted
table for cross join (kb)
 \set JOIN_TASK_INPUT_SIZE [int value] - join task input size (mb) 
 \set SORT_TASK_INPUT_SIZE [int value] - sort task input size (mb)
 \set GROUPBY_TASK_INPUT_SIZE [int value] - group by task input size (mb)

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 9847ff6..36d80da 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -53,6 +53,7 @@ import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.TUtil;
@@ -261,26 +262,33 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return size;
   }
 
-  @VisibleForTesting
-  public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode
node, boolean left)
+  private boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode
node, boolean left)
       throws IOException {
     String [] lineage = PlannerUtil.getRelationLineage(node);
     long volume = estimateSizeRecursive(context, lineage);
-    boolean inMemoryInnerJoinFlag = false;
+    return checkIfInMemoryInnerJoinIsPossible(context, lineage, volume, left);
+  }
+
+  @VisibleForTesting
+  public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, String []
lineage, long tableVolume,
+                                                    boolean left) throws IOException {
+    boolean inMemoryInnerJoinFlag;
 
     QueryContext queryContext = context.getQueryContext();
 
     if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) {
-      inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT);
+      inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)
+          * StorageUnit.MB;
     } else {
-      inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+      inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)
+          * StorageUnit.MB;
     }
 
     LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main
maemory.",
         context.getTaskId().toString(),
         (left ? "Left" : "Right"),
         StringUtils.join(lineage),
-        FileUtil.humanReadableByteCount(volume, false),
+        FileUtil.humanReadableByteCount(tableVolume, false),
         (inMemoryInnerJoinFlag ? "" : "not ")));
     return inMemoryInnerJoinFlag;
   }
@@ -484,9 +492,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     QueryContext queryContext = context.getQueryContext();
 
     if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
-      hashJoin = rightTableVolume <  queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
+      hashJoin = rightTableVolume <=  queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)
* StorageUnit.MB;
     } else {
-      hashJoin = rightTableVolume <  queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+      hashJoin = rightTableVolume <=  queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)
* StorageUnit.MB;
     }
 
     if (hashJoin) {
@@ -512,9 +520,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     QueryContext queryContext = context.getQueryContext();
 
     if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
-      hashJoin = leftTableVolume <  queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
+      hashJoin = leftTableVolume <=  queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)
* StorageUnit.MB;
     } else {
-      hashJoin = leftTableVolume <  queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+      hashJoin = leftTableVolume <=  queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)*
StorageUnit.MB;
     }
 
     if (hashJoin){
@@ -1015,7 +1023,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
-    final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT);
+    final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT)
* StorageUnit.MB;
 
     // if the relation size is less than the threshold,
     // the hash aggregation will be used.

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
index dbb92e1..b320a81 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -32,6 +32,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.graph.DirectedGraphVisitor;
 
@@ -80,8 +81,10 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
 
   @Override
   public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
-    long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD);
-    long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD);
+    long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD)
*
+        StorageUnit.KB;
+    long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD)
*
+        StorageUnit.KB;
     boolean broadcastJoinEnabled = queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
     if (broadcastJoinEnabled &&
         (thresholdForNonCrossJoin > 0 || thresholdForCrossJoin > 0)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 2ded786..0276cc2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -1048,14 +1048,47 @@ public class Stage implements EventHandler<StageEvent> {
      * @return
      */
     public static int getNonLeafTaskNum(Stage stage) {
+      // This method is assumed to be called only for aggregation or sort.
+      LogicalNode plan = stage.getBlock().getPlan();
+      LogicalNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+      LogicalNode groupbyNode = PlannerUtil.findTopNode(plan, NodeType.GROUP_BY);
+
+      // Task volume is assumed to be 64 MB by default.
+      long taskVolume = 64;
+
+      if (groupbyNode != null && sortNode == null) {
+        // aggregation plan
+        taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
+      } else if (sortNode != null && groupbyNode == null) {
+        // sort plan
+        taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
+      } else if (sortNode != null /* && groupbyNode != null */) {
+        // NOTE: when the plan includes both aggregation and sort, usually aggregation is
executed first.
+        // If not, we need to check the query plan is valid.
+        LogicalNode aggChildOfSort = PlannerUtil.findTopNode(sortNode, NodeType.GROUP_BY);
+        boolean aggFirst = aggChildOfSort != null && aggChildOfSort.equals(groupbyNode);
+        // Set task volume according to the operator which will be executed first.
+        if (aggFirst) {
+          // choose aggregation task volume
+          taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.GROUPBY_TASK_INPUT_SIZE);
+        } else {
+          // choose sort task volume
+          LOG.warn("Sort is executed before aggregation.");
+          taskVolume = stage.getContext().getQueryContext().getLong(SessionVars.SORT_TASK_INPUT_SIZE);
+        }
+      } else {
+        LOG.warn("Task volume is chosen as " + taskVolume + " in unexpected case.");
+      }
+
       // Getting intermediate data size
       long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock());
 
-      int mb = (int) Math.ceil((double)volume / 1048576);
+      int mb = (int) Math.ceil((double)volume / (double)StorageUnit.MB);
       LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
-      // determine the number of task per 64MB
-      int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().getInt(ConfVars.$TEST_MIN_TASK_NUM.varname,
1));
-      int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / 64));
+      // determine the number of task
+      int minTaskNum = Math.max(1, stage.getContext().getQueryMasterContext().getConf().
+          getInt(ConfVars.$TEST_MIN_TASK_NUM.varname, 1));
+      int maxTaskNum = Math.max(minTaskNum, (int) Math.ceil((double)mb / taskVolume));
       LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
       return maxTaskNum;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7e0a4a1e/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
index a46d66e..7442561 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
@@ -26,6 +26,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.TUtil;
 
 import java.util.List;
@@ -89,7 +90,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor<Context,
Ob
         List<String> largeRelationNames = TUtil.newList();
 
         if (isSimpleRelationNode(node.getLeftChild())) {
-          if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin)
{
+          if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin
* StorageUnit.KB) {
             crossJoinAllowed = true;
           } else {
             largeRelationNames.add(((ScanNode) node.getLeftChild()).getCanonicalName());
@@ -97,7 +98,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor<Context,
Ob
         }
 
         if (isSimpleRelationNode(node.getRightChild())) {
-          if (getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin)
{
+          if (getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin
* StorageUnit.KB) {
             crossJoinAllowed = true;
           } else {
             largeRelationNames.add(((ScanNode) node.getRightChild()).getCanonicalName());


Mime
View raw message