Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B805200B43 for ; Tue, 19 Jul 2016 15:40:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 19FC9160A8B; Tue, 19 Jul 2016 13:40:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1743B160A5C for ; Tue, 19 Jul 2016 15:40:55 +0200 (CEST) Received: (qmail 1933 invoked by uid 500); 19 Jul 2016 13:40:55 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 1923 invoked by uid 99); 19 Jul 2016 13:40:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jul 2016 13:40:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1CE39E058E; Tue, 19 Jul 2016 13:40:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Tue, 19 Jul 2016 13:40:55 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] tajo git commit: TAJO-2177: In BroadcastJoinRule, the total volume of broadcast tables should be checked before stages are merged. archived-at: Tue, 19 Jul 2016 13:40:57 -0000 Repository: tajo Updated Branches: refs/heads/branch-0.11.4 f02f0e393 -> 27a709c25 http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index 02e6609..4534235 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -39,7 +39,7 @@ public class ExecutionBlock { // Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId. private Map unionScanMap = new HashMap(); - private Map broadcastRelations = TUtil.newHashMap(); + private Map broadcastRelations = TUtil.newHashMap(); // map of table name and corresponding scan node private PlanContext planContext; http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index aff5038..a9bedb6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.global.rewriter.rules; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.OverridableConf; import org.apache.tajo.SessionVars; +import org.apache.tajo.TajoConstants; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; @@ -109,7 +110,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { @Override public int compare(ScanNode o1, ScanNode o2) { - long compare = GlobalPlanRewriteUtil.getTableVolume(o1) - GlobalPlanRewriteUtil.getTableVolume(o2); + long compare = PlannerUtil.getTableVolume(o1) - PlannerUtil.getTableVolume(o2); if (compare == 0) { return 0; } else if (compare > 0) { @@ -157,7 +158,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { private final long thresholdForCrossJoin; private final boolean broadcastForNonCrossJoinEnabled; private final GlobalPlanRewriteUtil.ParentFinder parentFinder; - private final Map estimatedEbOutputSize = TUtil.newHashMap(); + private final Map estimatedEbOutputSize = new HashMap<>(); // map of table name and its volume public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relationSizeComparator, GlobalPlanRewriteUtil.ParentFinder parentFinder, @@ -192,9 +193,9 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { if (!current.isPreservedRow()) { long totalVolume = 0; for (ScanNode scanNode : current.getScanNodes()) { - totalVolume += GlobalPlanRewriteUtil.getTableVolume(scanNode); + totalVolume += PlannerUtil.getTableVolume(scanNode); } - estimatedEbOutputSize.put(current.getId(), totalVolume); + estimatedEbOutputSize.put(current.getId().toString(), totalVolume); } } @@ -223,21 +224,51 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { for (ExecutionBlock child : childs) { if (!child.isPreservedRow()) { updateBroadcastableRelForChildEb(child, joinType); + // Mark the scan node for the child eb as broadcastable to figure out the current and child ebs can be merged. updateInputBasedOnChildEb(child, current); } } if (current.hasBroadcastRelation()) { + long broadcastThreshold = joinType.equals(JoinType.CROSS) ? + thresholdForCrossJoin : thresholdForNonCrossJoin; + // The current execution block and its every child are able to be merged. for (ExecutionBlock child : childs) { addUnionNodeIfNecessary(unionScanMap, plan, child, current); - mergeTwoPhaseJoinIfPossible(plan, child, current); - } - checkTotalSizeOfBroadcastableRelations(current); + // First check that two stages can be merged. + // If the total volume of broadcast candidates of the merged stage exceeds the threshold, + // these stages cannot be merged. + // + // Note: this is a greedy approach, and there may be a better solution to find more optimized broadcast + // join plan. For example, it would be better to split the merged stage by marking the largest broadcast + // candidate as not being broadcasted because it can reduce the network cost a little bit. + // However, the benefit looks not large (every broadcast candidates are very small), so the simple greedy + // solution is used here. + if (getTotalVolumeOfBroadcastableRelations(current) + + getTotalVolumeOfBroadcastableRelations(child) + > broadcastThreshold) { + // If a scan node for the child eb is marked as a broadcast candidate, mark it as not being broadcasted + // again. + List notBroadcastable = new ArrayList<>(); + for (ScanNode eachScan : current.getBroadcastRelations()) { + if (eachScan.getTableName().equals(child.getId().toString())) { + notBroadcastable.add(eachScan); + } + } + + for (ScanNode eachScan : notBroadcastable) { + current.removeBroadcastRelation(eachScan); + } + } else { + mergeTwoPhaseJoinIfPossible(plan, child, current); + } + + } long outputVolume = estimateOutputVolume(current); - estimatedEbOutputSize.put(current.getId(), outputVolume); + estimatedEbOutputSize.put(current.getId().toString(), outputVolume); } } else { List relations = TUtil.newList(current.getBroadcastRelations()); @@ -265,7 +296,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { private void updateBroadcastableRelForChildEb(ExecutionBlock child, JoinType joinType) { long threshold = joinType == JoinType.CROSS ? thresholdForCrossJoin : thresholdForNonCrossJoin; for (ScanNode scanNode : child.getScanNodes()) { - long volume = GlobalPlanRewriteUtil.getTableVolume(scanNode); + long volume = PlannerUtil.getTableVolume(scanNode); if (volume >= 0 && volume <= threshold) { // If the child eb is already visited, the below line may update its broadcast relations. // Furthermore, this operation might mark the preserved-row relation as the broadcast relation with outer join. @@ -392,7 +423,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { thresholdForCrossJoin : thresholdForNonCrossJoin; int i; for (i = 0; i < broadcastCandidates.size(); i++) { - long volumeOfCandidate = GlobalPlanRewriteUtil.getTableVolume(broadcastCandidates.get(i)); + long volumeOfCandidate = PlannerUtil.getTableVolume(broadcastCandidates.get(i)); if (totalBroadcastVolume + volumeOfCandidate > largeThreshold) { break; } @@ -405,6 +436,20 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } } + private long getTotalVolumeOfBroadcastableRelations(ExecutionBlock block) { + long sum = 0; + + for (ScanNode eachScan : block.getBroadcastRelations()) { + if (!estimatedEbOutputSize.containsKey(eachScan.getTableName())) { + long volume = PlannerUtil.getTableVolume(eachScan); + sum += volume == TajoConstants.UNKNOWN_LENGTH ? + Integer.MAX_VALUE : volume; // Use Integer.MAX to prevent overflow + } + } + + return sum; + } + private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current, ExecutionBlock parent) { if (parent != null && !plan.isTerminal(parent)) { ScanNode scanForCurrent = findScanForChildEb(current, parent); http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java index b13cb0f..fd9d022 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java @@ -90,27 +90,6 @@ public class GlobalPlanRewriteUtil { } /** - * Get a volume of a table of a partitioned table - * @param scanNode ScanNode corresponding to a table - * @return table volume (bytes) - */ - public static long getTableVolume(ScanNode scanNode) { - if (scanNode.getTableDesc().hasStats()) { - long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); - if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { - PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; - if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { - scanBytes = 0L; - } - } - - return scanBytes; - } else { - return -1; - } - } - - /** * It calculates the total volume of all descendent relation nodes. */ public static long computeDescendentVolume(LogicalNode node) { http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index cb3f9b8..6979a64 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -21,6 +21,7 @@ package org.apache.tajo.plan.util; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.tajo.TajoConstants; import org.apache.tajo.algebra.*; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; @@ -977,4 +978,25 @@ public class PlannerUtil { } return false; } + + /** + * Get a volume of a table of a partitioned table + * @param scanNode ScanNode corresponding to a table + * @return table volume (bytes) + */ + public static long getTableVolume(ScanNode scanNode) { + if (scanNode.getTableDesc().hasStats()) { + long scanBytes = scanNode.getTableDesc().getStats().getNumBytes(); + if (scanNode.getType() == NodeType.PARTITIONS_SCAN) { + PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) scanNode; + if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) { + scanBytes = 0L; + } + } + + return scanBytes; + } else { + return TajoConstants.UNKNOWN_LENGTH; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/27a709c2/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java index 7442561..eb4530d 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PostLogicalPlanVerifier.java @@ -24,6 +24,7 @@ import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TooLargeInputForCrossJoinException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.PostLogicalPlanVerifier.Context; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.unit.StorageUnit; @@ -90,7 +91,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor largeRelationNames = TUtil.newList(); if (isSimpleRelationNode(node.getLeftChild())) { - if (getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) { + if (PlannerUtil.getTableVolume((ScanNode) node.getLeftChild()) <= context.bcastLimitForCrossJoin * StorageUnit.KB) { crossJoinAllowed = true; } else { largeRelationNames.add(((ScanNode) node.getLeftChild()).getCanonicalName()); @@ -98,7 +99,7 @@ public class PostLogicalPlanVerifier extends BasicLogicalPlanVisitor