Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A632010718 for ; Fri, 11 Jul 2014 05:43:29 +0000 (UTC) Received: (qmail 61880 invoked by uid 500); 11 Jul 2014 05:43:29 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 61843 invoked by uid 500); 11 Jul 2014 05:43:29 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 61833 invoked by uid 99); 11 Jul 2014 05:43:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 05:43:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4AB9A9AC906; Fri, 11 Jul 2014 05:43:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TAJO-925: Child ExecutionBlock of JOIN node has different number of shuffle keys. (Hyoungjun Kim via hyunsik) Date: Fri, 11 Jul 2014 05:43:29 +0000 (UTC) Repository: tajo Updated Branches: refs/heads/master 52c8600dd -> 438010f92 TAJO-925: Child ExecutionBlock of JOIN node has different number of shuffle keys. (Hyoungjun Kim via hyunsik) Closes #61 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/438010f9 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/438010f9 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/438010f9 Branch: refs/heads/master Commit: 438010f92bdbde50447d9fbc3438e57ddaff776f Parents: 52c8600 Author: Hyunsik Choi Authored: Fri Jul 11 14:42:18 2014 +0900 Committer: Hyunsik Choi Committed: Fri Jul 11 14:42:18 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../planner/global/ExecutionBlockCursor.java | 131 ++++++++++++++++++- .../tajo/engine/planner/global/MasterPlan.java | 11 ++ .../apache/tajo/master/querymaster/Query.java | 15 ++- .../tajo/master/querymaster/SubQuery.java | 22 ++-- .../apache/tajo/engine/query/TestJoinQuery.java | 48 +++++++ 6 files changed, 212 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 563d64e..e9e512d 100644 --- a/CHANGES +++ b/CHANGES @@ -90,6 +90,9 @@ Release 0.9.0 - unreleased TAJO-912: Tsql prints wrong version. (Mai Hai Thanh via hyunsik) + TAJO-925: Child ExecutionBlock of JOIN node has different number of + shuffle keys. (Hyoungjun Kim via hyunsik) + TAJO-902: Unicode delimiter does not work correctly. (jinho) TAJO-905: When to_date() parses some date without day, the result will be http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java index d4ab068..0372769 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java @@ -14,22 +14,35 @@ package org.apache.tajo.engine.planner.global; -import java.util.ArrayList; -import java.util.Stack; +import org.apache.tajo.ExecutionBlockId; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; /** * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks. * This class is a pointer to an ExecutionBlock that the query engine should execute. - * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order. */ public class ExecutionBlockCursor { private MasterPlan masterPlan; private ArrayList orderedBlocks = new ArrayList(); private int cursor = 0; + private List executionOrderedBlocks = new ArrayList(); + private List notOrderedSiblingBlocks = new ArrayList(); + private Map orderRequiredChildCountMap = new HashMap(); + public ExecutionBlockCursor(MasterPlan plan) { + this(plan, false); + } + + public ExecutionBlockCursor(MasterPlan plan, boolean siblingFirstOrder) { this.masterPlan = plan; - buildOrder(plan.getRoot()); + if (siblingFirstOrder) { + buildSiblingFirstOrder(plan.getRoot()); + } else { + buildDepthFirstOrder(plan.getRoot()); + } } public int size() { @@ -37,23 +50,127 @@ public class ExecutionBlockCursor { } // Add all execution blocks in a depth first and postfix order - private void buildOrder(ExecutionBlock current) { + private void buildDepthFirstOrder(ExecutionBlock current) { Stack stack = new Stack(); if (!masterPlan.isLeaf(current.getId())) { for (ExecutionBlock execBlock : masterPlan.getChilds(current)) { if (!masterPlan.isLeaf(execBlock)) { - buildOrder(execBlock); + buildDepthFirstOrder(execBlock); } else { stack.push(execBlock); } } for (ExecutionBlock execBlock : stack) { - buildOrder(execBlock); + buildDepthFirstOrder(execBlock); } } orderedBlocks.add(current); } + + private void buildSiblingFirstOrder(ExecutionBlock current) { + /* + |-eb_1404887024677_0004_000007 + |-eb_1404887024677_0004_000006 + |-eb_1404887024677_0004_000005 + |-eb_1404887024677_0004_000004 + |-eb_1404887024677_0004_000003 + |-eb_1404887024677_0004_000002 + |-eb_1404887024677_0004_000001 + + In the case of the upper plan, buildDepthFirstOrder() makes the following order in a depth first and postfix order. + [eb_1, eb_2, eb_3, eb_4, eb_5, eb_6, eb_7] + The eb_2 doesn't know eb_3's output bytes and uses a size of eb_4's all scan nodes. + + buildSiblingFirstOrder() makes the following order in a sibling order. + [eb_1, eb_3, eb_2, eb_4, eb_5, eb_6, eb_7] + In this order the eb_2 knows eb_3's output bytes and the eb_4 also knows eb_1's output bytes. + */ + preExecutionOrder(new BuildOrderItem(null, current)); + + for (BuildOrderItem eachItem: executionOrderedBlocks) { + if (masterPlan.isLeaf(eachItem.eb.getId())) { + orderedBlocks.add(eachItem.eb); + orderRequiredChildCountMap.get(eachItem.parentEB.getId()).decrementAndGet(); + } else { + if (eachItem.allSiblingsOrdered()) { + for (BuildOrderItem eachSiblingItem: notOrderedSiblingBlocks) { + orderedBlocks.add(eachSiblingItem.eb); + } + orderedBlocks.add(eachItem.eb); + notOrderedSiblingBlocks.clear(); + } else { + notOrderedSiblingBlocks.add(eachItem); + } + } + } + } + + private void preExecutionOrder(BuildOrderItem current) { + Stack stack = new Stack(); + if (!masterPlan.isLeaf(current.eb.getId())) { + List children = masterPlan.getChilds(current.eb); + orderRequiredChildCountMap.put(current.eb.getId(), new AtomicInteger(children.size())); + for (ExecutionBlock execBlock : children) { + BuildOrderItem item = new BuildOrderItem(current.eb, execBlock); + item.setSiblings(children); + if (!masterPlan.isLeaf(execBlock)) { + preExecutionOrder(item); + } else { + stack.push(item); + } + } + for (BuildOrderItem eachItem : stack) { + preExecutionOrder(eachItem); + } + } + executionOrderedBlocks.add(current); + } + + class BuildOrderItem { + ExecutionBlock eb; + ExecutionBlock parentEB; + List siblings = new ArrayList(); + + BuildOrderItem(ExecutionBlock parentEB, ExecutionBlock eb) { + this.parentEB = parentEB; + this.eb = eb; + } + + public void setSiblings(List siblings) { + for (ExecutionBlock eachEB: siblings) { + if (eachEB.getId().equals(eb.getId())) { + continue; + } + + this.siblings.add(eachEB.getId()); + } + } + + public boolean allSiblingsOrdered() { + for (ExecutionBlockId eachSibling: siblings) { + if (orderRequiredChildCountMap.get(eachSibling) != null && + orderRequiredChildCountMap.get(eachSibling).get() > 0) { + return false; + } + } + return true; + } + + @Override + public String toString() { + return eb.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BuildOrderItem)) { + return false; + } + return eb.equals(((BuildOrderItem) obj).eb); + } + } + public boolean hasNext() { return cursor < orderedBlocks.size(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index a8593e5..9b99c5f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -212,6 +212,17 @@ public class MasterPlan { sb.append(execBlockGraph.toStringGraph(getRoot().getId())); sb.append("-------------------------------------------------------------------------------\n"); + ExecutionBlockCursor executionOrderCursor = new ExecutionBlockCursor(this, true); + sb.append("Order of Execution\n"); + sb.append("-------------------------------------------------------------------------------"); + int order = 1; + while (executionOrderCursor.hasNext()) { + ExecutionBlock currentEB = executionOrderCursor.nextBlock(); + sb.append("\n").append(order).append(": ").append(currentEB.getId()); + order++; + } + sb.append("\n-------------------------------------------------------------------------------\n"); + while(cursor.hasNext()) { ExecutionBlock block = cursor.nextBlock(); http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 0ce6d7e..31199ba 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -209,7 +209,19 @@ public class Query implements EventHandler { this.eventHandler = eventHandler; this.plan = plan; this.sm = context.getStorageManager(); - cursor = new ExecutionBlockCursor(plan); + this.cursor = new ExecutionBlockCursor(plan, true); + + StringBuilder sb = new StringBuilder("\n======================================================="); + sb.append("\nThe order of execution: \n"); + int order = 1; + while (cursor.hasNext()) { + ExecutionBlock currentEB = cursor.nextBlock(); + sb.append("\n").append(order).append(": ").append(currentEB.getId()); + order++; + } + sb.append("\n======================================================="); + LOG.info(sb); + cursor.reset(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); @@ -340,6 +352,7 @@ public class Query implements EventHandler { @Override public void transition(Query query, QueryEvent queryEvent) { + query.setStartTime(); SubQuery subQuery = new SubQuery(query.context, query.getPlan(), query.getExecutionBlockCursor().nextBlock(), query.sm); http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 0776722..94f8b32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -746,35 +746,37 @@ public class SubQuery implements EventHandler { int totalMem = getClusterTotalMemory(subQuery); LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB"); int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1); - // determine the number of task taskNum = Math.min(taskNum, slots); + if (conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM) > 0) { taskNum = conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM); LOG.warn("!!!!! TESTCASE MODE !!!!!"); } - LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum); // The shuffle output numbers of join may be inconsistent by execution block order. // Thus, we need to compare the number with DataChannel output numbers. // If the number is right, the number and DataChannel output numbers will be consistent. - int outerShuffleOutptNum = 0, innerShuffleOutputNum = 0; + int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0; for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) { - outerShuffleOutptNum = Math.max(outerShuffleOutptNum, eachChannel.getShuffleOutputNum()); + outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum()); } - for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) { innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum()); } - - if (outerShuffleOutptNum != innerShuffleOutputNum - && taskNum != outerShuffleOutptNum + if (outerShuffleOutputNum != innerShuffleOutputNum + && taskNum != outerShuffleOutputNum && taskNum != innerShuffleOutputNum) { - taskNum = Math.max(outerShuffleOutptNum, innerShuffleOutputNum); + LOG.info(subQuery.getId() + ", Change determined number of join partitions cause difference of outputNum" + + ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) + + ", outerShuffleOutptNum=" + outerShuffleOutputNum + + ", innerShuffleOutputNum=" + innerShuffleOutputNum); + taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum); } - return taskNum; + LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum); + return taskNum; // Is this subquery the first step of group-by? } else if (grpNode != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java index 13a0b2b..ca1ece1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java @@ -35,8 +35,10 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import java.sql.ResultSet; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import static org.junit.Assert.assertEquals; @@ -1051,4 +1053,50 @@ public class TestJoinQuery extends QueryTestCaseBase { cleanupQuery(res); } } + + @Test + public void testJoinWithDifferentShuffleKey() throws Exception { + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + + List data = new ArrayList(); + + int bytes = 0; + for (int i = 0; i < 1000000; i++) { + String row = i + "|" + i + "name012345678901234567890123456789012345678901234567890"; + bytes += row.getBytes().length; + data.add(row); + if (bytes > 2 * 1024 * 1024) { + break; + } + } + TajoTestingCluster.createTable("large_table", schema, tableOptions, data.toArray(new String[]{})); + + int originConfValue = conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME); + testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME.varname, "1"); + ResultSet res = executeString( + "select count(b.id) " + + "from (select id, count(*) as cnt from large_table group by id) a " + + "left outer join (select id, count(*) as cnt from large_table where id < 200 group by id) b " + + "on a.id = b.id" + ); + + try { + String expected = + "?count\n" + + "-------------------------------\n" + + "200\n"; + + assertEquals(expected, resultSetToString(res)); + } finally { + testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME.varname, "" + originConfValue); + cleanupQuery(res); + executeString("DROP TABLE large_table PURGE").close(); + } + } }