tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/2] tajo git commit: TAJO-2177: In BroadcastJoinRule, the total volume of broadcast tables should be checked before stages are merged.
Date Tue, 19 Jul 2016 13:40:55 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.4 f02f0e393 -> 27a709c25


http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 02e6609..4534235 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -39,7 +39,7 @@ public class ExecutionBlock {
   // Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId.
   private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<ExecutionBlockId,
ExecutionBlockId>();
 
-  private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap();
+  private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap(); // map of
table name and corresponding scan node
 
   private PlanContext planContext;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
index aff5038..a9bedb6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.global.rewriter.rules;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -109,7 +110,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
 
     @Override
     public int compare(ScanNode o1, ScanNode o2) {
-      long compare = GlobalPlanRewriteUtil.getTableVolume(o1) - GlobalPlanRewriteUtil.getTableVolume(o2);
+      long compare = PlannerUtil.getTableVolume(o1) - PlannerUtil.getTableVolume(o2);
       if (compare == 0) {
         return 0;
       } else if (compare > 0) {
@@ -157,7 +158,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
     private final long thresholdForCrossJoin;
     private final boolean broadcastForNonCrossJoinEnabled;
     private final GlobalPlanRewriteUtil.ParentFinder parentFinder;
-    private final Map<ExecutionBlockId, Long> estimatedEbOutputSize = TUtil.newHashMap();
+    private final Map<String, Long> estimatedEbOutputSize = new HashMap<>();
// map of table name and its volume
 
     public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relationSizeComparator,
                                     GlobalPlanRewriteUtil.ParentFinder parentFinder,
@@ -192,9 +193,9 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
       if (!current.isPreservedRow()) {
         long totalVolume = 0;
         for (ScanNode scanNode : current.getScanNodes()) {
-          totalVolume += GlobalPlanRewriteUtil.getTableVolume(scanNode);
+          totalVolume += PlannerUtil.getTableVolume(scanNode);
         }
-        estimatedEbOutputSize.put(current.getId(), totalVolume);
+        estimatedEbOutputSize.put(current.getId().toString(), totalVolume);
       }
     }
 
@@ -223,21 +224,51 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
           for (ExecutionBlock child : childs) {
             if (!child.isPreservedRow()) {
               updateBroadcastableRelForChildEb(child, joinType);
+              // Mark the scan node for the child eb as broadcastable to figure out the current
and child ebs can be merged.
               updateInputBasedOnChildEb(child, current);
             }
           }
 
           if (current.hasBroadcastRelation()) {
+            long broadcastThreshold = joinType.equals(JoinType.CROSS) ?
+                thresholdForCrossJoin : thresholdForNonCrossJoin;
+
             // The current execution block and its every child are able to be merged.
             for (ExecutionBlock child : childs) {
               addUnionNodeIfNecessary(unionScanMap, plan, child, current);
-              mergeTwoPhaseJoinIfPossible(plan, child, current);
-            }
 
-            checkTotalSizeOfBroadcastableRelations(current);
+              // First check that two stages can be merged.
+              // If the total volume of broadcast candidates of the merged stage exceeds
the threshold,
+              // these stages cannot be merged.
+              //
+              // Note: this is a greedy approach, and there may be a better solution to find
more optimized broadcast
+              // join plan. For example, it would be better to split the merged stage by
marking the largest broadcast
+              // candidate as not being broadcasted because it can reduce the network cost
a little bit.
+              // However, the benefit looks not large (every broadcast candidates are very
small), so the simple greedy
+              // solution is used here.
+              if (getTotalVolumeOfBroadcastableRelations(current) +
+                  getTotalVolumeOfBroadcastableRelations(child)
+                  > broadcastThreshold) {
+                // If a scan node for the child eb is marked as a broadcast candidate, mark
it as not being broadcasted
+                // again.
+                List<ScanNode> notBroadcastable = new ArrayList<>();
+                for (ScanNode eachScan : current.getBroadcastRelations()) {
+                  if (eachScan.getTableName().equals(child.getId().toString())) {
+                    notBroadcastable.add(eachScan);
+                  }
+                }
+
+                for (ScanNode eachScan : notBroadcastable) {
+                  current.removeBroadcastRelation(eachScan);
+                }
+              } else {
+                mergeTwoPhaseJoinIfPossible(plan, child, current);
+              }
+
+            }
 
             long outputVolume = estimateOutputVolume(current);
-            estimatedEbOutputSize.put(current.getId(), outputVolume);
+            estimatedEbOutputSize.put(current.getId().toString(), outputVolume);
           }
         } else {
           List<ScanNode> relations = TUtil.newList(current.getBroadcastRelations());
@@ -265,7 +296,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
     private void updateBroadcastableRelForChildEb(ExecutionBlock child, JoinType joinType)
{
       long threshold = joinType == JoinType.CROSS ? thresholdForCrossJoin : thresholdForNonCrossJoin;
       for (ScanNode scanNode : child.getScanNodes()) {
-        long volume = GlobalPlanRewriteUtil.getTableVolume(scanNode);
+        long volume = PlannerUtil.getTableVolume(scanNode);
         if (volume >= 0 && volume <= threshold) {
           // If the child eb is already visited, the below line may update its broadcast
relations.
           // Furthermore, this operation might mark the preserved-row relation as the broadcast
relation with outer join.
@@ -392,7 +423,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
           thresholdForCrossJoin : thresholdForNonCrossJoin;
       int i;
       for (i = 0; i < broadcastCandidates.size(); i++) {
-        long volumeOfCandidate = GlobalPlanRewriteUtil.getTableVolume(broadcastCandidates.get(i));
+        long volumeOfCandidate = PlannerUtil.getTableVolume(broadcastCandidates.get(i));
         if (totalBroadcastVolume + volumeOfCandidate > largeThreshold) {
           break;
         }
@@ -405,6 +436,20 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
       }
     }
 
+    private long getTotalVolumeOfBroadcastableRelations(ExecutionBlock block) {
+      long sum = 0;
+
+      for (ScanNode eachScan : block.getBroadcastRelations()) {
+        if (!estimatedEbOutputSize.containsKey(eachScan.getTableName())) {
+          long volume = PlannerUtil.getTableVolume(eachScan);
+          sum += volume == TajoConstants.UNKNOWN_LENGTH ?
+              Integer.MAX_VALUE : volume; // Use Integer.MAX to prevent overflow
+        }
+      }
+
+      return sum;
+    }
+
     private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current,
ExecutionBlock parent) {
       if (parent != null && !plan.isTerminal(parent)) {
         ScanNode scanForCurrent = findScanForChildEb(current, parent);

http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
index b13cb0f..fd9d022 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
@@ -90,27 +90,6 @@ public class GlobalPlanRewriteUtil {
   }
 
   /**
-   * Get a volume of a table of a partitioned table
-   * @param scanNode ScanNode corresponding to a table
-   * @return table volume (bytes)
-   */
-  public static long getTableVolume(ScanNode scanNode) {
-    if (scanNode.getTableDesc().hasStats()) {
-      long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
-      if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
-        PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode;
-        if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
-          scanBytes = 0L;
-        }
-      }
-
-      return scanBytes;
-    } else {
-      return -1;
-    }
-  }
-
-  /**
    * It calculates the total volume of all descendent relation nodes.
    */
   public static long computeDescendentVolume(LogicalNode node) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index cb3f9b8..6979a64 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -21,6 +21,7 @@ package org.apache.tajo.plan.util;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
@@ -977,4 +978,25 @@ public class PlannerUtil {
     }
     return false;
   }
+
+  /**
+   * Get a volume of a table of a partitioned table
+   * @param scanNode ScanNode corresponding to a table
+   * @return table volume (bytes)
+   */
+  public static long getTableVolume(ScanNode scanNode) {
+    if (scanNode.getTableDesc().hasStats()) {
+      long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
+      if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
+        PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode;
+        if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
+          scanBytes = 0L;
+        }
+      }
+
+      return scanBytes;
+    } else {
+      return TajoConstants.UNKNOWN_LENGTH;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
index 7442561..eb4530d 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java
@@ -24,6 +24,7 @@ import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TooLargeInputForCrossJoinException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.unit.StorageUnit;
@@ -90,7 +91,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor<Context,
Ob
         List<String> largeRelationNames = TUtil.newList();
 
         if (isSimpleRelationNode(node.getLeftChild())) {
-          if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin
* StorageUnit.KB) {
+          if (PlannerUtil.getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin
* StorageUnit.KB) {
             crossJoinAllowed = true;
           } else {
             largeRelationNames.add(((ScanNode) node.getLeftChild()).getCanonicalName());
@@ -98,7 +99,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor<Context,
Ob
         }
 
         if (isSimpleRelationNode(node.getRightChild())) {
-          if (getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin
* StorageUnit.KB) {
+          if (PlannerUtil.getTableVolume((ScanNode) node.getRightChild()) <= context.bcastLimitForCrossJoin
* StorageUnit.KB) {
             crossJoinAllowed = true;
           } else {
             largeRelationNames.add(((ScanNode) node.getRightChild()).getCanonicalName());
@@ -125,25 +126,4 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor<Context,
Ob
       return false;
     }
   }
-
-  /**
-   * Get a volume of a table of a partitioned table
-   * @param scanNode ScanNode corresponding to a table
-   * @return table volume (bytes)
-   */
-  private static long getTableVolume(ScanNode scanNode) {
-    if (scanNode.getTableDesc().hasStats()) {
-      long scanBytes = scanNode.getTableDesc().getStats().getNumBytes();
-      if (scanNode.getType() == NodeType.PARTITIONS_SCAN) {
-        PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode;
-        if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
-          scanBytes = 0L;
-        }
-      }
-
-      return scanBytes;
-    } else {
-      return -1;
-    }
-  }
 }


Mime
View raw message