tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-925: Child ExecutionBlock of JOIN node has different number of shuffle keys. (Hyoungjun Kim via hyunsik)
Date Fri, 11 Jul 2014 05:43:29 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 52c8600dd -> 438010f92


TAJO-925: Child ExecutionBlock of JOIN node has different number of shuffle keys. (Hyoungjun
Kim via hyunsik)

Closes #61


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/438010f9
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/438010f9
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/438010f9

Branch: refs/heads/master
Commit: 438010f92bdbde50447d9fbc3438e57ddaff776f
Parents: 52c8600
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Fri Jul 11 14:42:18 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Fri Jul 11 14:42:18 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../planner/global/ExecutionBlockCursor.java    | 131 ++++++++++++++++++-
 .../tajo/engine/planner/global/MasterPlan.java  |  11 ++
 .../apache/tajo/master/querymaster/Query.java   |  15 ++-
 .../tajo/master/querymaster/SubQuery.java       |  22 ++--
 .../apache/tajo/engine/query/TestJoinQuery.java |  48 +++++++
 6 files changed, 212 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 563d64e..e9e512d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -90,6 +90,9 @@ Release 0.9.0 - unreleased
 
     TAJO-912: Tsql prints wrong version. (Mai Hai Thanh via hyunsik)
 
+    TAJO-925: Child ExecutionBlock of JOIN node has different number of 
+    shuffle keys. (Hyoungjun Kim via hyunsik)
+
     TAJO-902: Unicode delimiter does not work correctly. (jinho)
 
     TAJO-905: When to_date() parses some date without day, the result will be 

http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
index d4ab068..0372769 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
@@ -14,22 +14,35 @@
 
 package org.apache.tajo.engine.planner.global;
 
-import java.util.ArrayList;
-import java.util.Stack;
+import org.apache.tajo.ExecutionBlockId;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
  * This class is a pointer to an ExecutionBlock that the query engine should execute.
- * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
  */
 public class ExecutionBlockCursor {
   private MasterPlan masterPlan;
   private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
   private int cursor = 0;
 
+  private List<BuildOrderItem> executionOrderedBlocks = new ArrayList<BuildOrderItem>();
+  private List<BuildOrderItem> notOrderedSiblingBlocks = new ArrayList<BuildOrderItem>();
+  private Map<ExecutionBlockId, AtomicInteger> orderRequiredChildCountMap = new HashMap<ExecutionBlockId,
AtomicInteger>();
+
   public ExecutionBlockCursor(MasterPlan plan) {
+    this(plan, false);
+  }
+
+  public ExecutionBlockCursor(MasterPlan plan, boolean siblingFirstOrder) {
     this.masterPlan = plan;
-    buildOrder(plan.getRoot());
+    if (siblingFirstOrder) {
+      buildSiblingFirstOrder(plan.getRoot());
+    } else {
+      buildDepthFirstOrder(plan.getRoot());
+    }
   }
 
   public int size() {
@@ -37,23 +50,127 @@ public class ExecutionBlockCursor {
   }
 
   // Add all execution blocks in a depth first and postfix order
-  private void buildOrder(ExecutionBlock current) {
+  private void buildDepthFirstOrder(ExecutionBlock current) {
     Stack<ExecutionBlock> stack = new Stack<ExecutionBlock>();
     if (!masterPlan.isLeaf(current.getId())) {
       for (ExecutionBlock execBlock : masterPlan.getChilds(current)) {
         if (!masterPlan.isLeaf(execBlock)) {
-          buildOrder(execBlock);
+          buildDepthFirstOrder(execBlock);
         } else {
           stack.push(execBlock);
         }
       }
       for (ExecutionBlock execBlock : stack) {
-        buildOrder(execBlock);
+        buildDepthFirstOrder(execBlock);
       }
     }
     orderedBlocks.add(current);
   }
 
+
+  private void buildSiblingFirstOrder(ExecutionBlock current) {
+    /*
+     |-eb_1404887024677_0004_000007
+       |-eb_1404887024677_0004_000006
+          |-eb_1404887024677_0004_000005
+             |-eb_1404887024677_0004_000004
+                |-eb_1404887024677_0004_000003
+             |-eb_1404887024677_0004_000002
+                |-eb_1404887024677_0004_000001
+
+     In the case of the upper plan, buildDepthFirstOrder() makes the following order in a
depth first and postfix order.
+       [eb_1, eb_2, eb_3, eb_4, eb_5, eb_6, eb_7]
+     The eb_2 doesn't know eb_3's output bytes and uses a size of eb_4's all scan nodes.
+
+     buildSiblingFirstOrder() makes the following order in a sibling order.
+       [eb_1, eb_3, eb_2, eb_4, eb_5, eb_6, eb_7]
+     In this order the eb_2 knows eb_3's output bytes and the eb_4 also knows eb_1's output
bytes.
+     */
+    preExecutionOrder(new BuildOrderItem(null, current));
+
+    for (BuildOrderItem eachItem: executionOrderedBlocks) {
+      if (masterPlan.isLeaf(eachItem.eb.getId())) {
+        orderedBlocks.add(eachItem.eb);
+        orderRequiredChildCountMap.get(eachItem.parentEB.getId()).decrementAndGet();
+      } else {
+        if (eachItem.allSiblingsOrdered()) {
+          for (BuildOrderItem eachSiblingItem: notOrderedSiblingBlocks) {
+            orderedBlocks.add(eachSiblingItem.eb);
+          }
+          orderedBlocks.add(eachItem.eb);
+          notOrderedSiblingBlocks.clear();
+        } else {
+          notOrderedSiblingBlocks.add(eachItem);
+        }
+      }
+    }
+  }
+
+  private void preExecutionOrder(BuildOrderItem current) {
+    Stack<BuildOrderItem> stack = new Stack<BuildOrderItem>();
+    if (!masterPlan.isLeaf(current.eb.getId())) {
+      List<ExecutionBlock> children = masterPlan.getChilds(current.eb);
+      orderRequiredChildCountMap.put(current.eb.getId(), new AtomicInteger(children.size()));
+      for (ExecutionBlock execBlock : children) {
+        BuildOrderItem item = new BuildOrderItem(current.eb, execBlock);
+        item.setSiblings(children);
+        if (!masterPlan.isLeaf(execBlock)) {
+          preExecutionOrder(item);
+        } else {
+          stack.push(item);
+        }
+      }
+      for (BuildOrderItem eachItem : stack) {
+        preExecutionOrder(eachItem);
+      }
+    }
+    executionOrderedBlocks.add(current);
+  }
+
+  class BuildOrderItem {
+    ExecutionBlock eb;
+    ExecutionBlock parentEB;
+    List<ExecutionBlockId> siblings = new ArrayList<ExecutionBlockId>();
+
+    BuildOrderItem(ExecutionBlock parentEB, ExecutionBlock eb) {
+      this.parentEB = parentEB;
+      this.eb = eb;
+    }
+
+    public void setSiblings(List<ExecutionBlock> siblings) {
+      for (ExecutionBlock eachEB: siblings) {
+        if (eachEB.getId().equals(eb.getId())) {
+          continue;
+        }
+
+        this.siblings.add(eachEB.getId());
+      }
+    }
+
+    public boolean allSiblingsOrdered() {
+      for (ExecutionBlockId eachSibling: siblings) {
+        if (orderRequiredChildCountMap.get(eachSibling) != null &&
+            orderRequiredChildCountMap.get(eachSibling).get() > 0) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return eb.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof BuildOrderItem)) {
+        return false;
+      }
+      return eb.equals(((BuildOrderItem) obj).eb);
+    }
+  }
+
   public boolean hasNext() {
     return cursor < orderedBlocks.size();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index a8593e5..9b99c5f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -212,6 +212,17 @@ public class MasterPlan {
     sb.append(execBlockGraph.toStringGraph(getRoot().getId()));
     sb.append("-------------------------------------------------------------------------------\n");
 
+    ExecutionBlockCursor executionOrderCursor = new ExecutionBlockCursor(this, true);
+    sb.append("Order of Execution\n");
+    sb.append("-------------------------------------------------------------------------------");
+    int order = 1;
+    while (executionOrderCursor.hasNext()) {
+      ExecutionBlock currentEB = executionOrderCursor.nextBlock();
+      sb.append("\n").append(order).append(": ").append(currentEB.getId());
+      order++;
+    }
+    sb.append("\n-------------------------------------------------------------------------------\n");
+
     while(cursor.hasNext()) {
       ExecutionBlock block = cursor.nextBlock();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 0ce6d7e..31199ba 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -209,7 +209,19 @@ public class Query implements EventHandler<QueryEvent> {
     this.eventHandler = eventHandler;
     this.plan = plan;
     this.sm = context.getStorageManager();
-    cursor = new ExecutionBlockCursor(plan);
+    this.cursor = new ExecutionBlockCursor(plan, true);
+
+    StringBuilder sb = new StringBuilder("\n=======================================================");
+    sb.append("\nThe order of execution: \n");
+    int order = 1;
+    while (cursor.hasNext()) {
+      ExecutionBlock currentEB = cursor.nextBlock();
+      sb.append("\n").append(order).append(": ").append(currentEB.getId());
+      order++;
+    }
+    sb.append("\n=======================================================");
+    LOG.info(sb);
+    cursor.reset();
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
@@ -340,6 +352,7 @@ public class Query implements EventHandler<QueryEvent> {
 
     @Override
     public void transition(Query query, QueryEvent queryEvent) {
+
       query.setStartTime();
       SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
           query.getExecutionBlockCursor().nextBlock(), query.sm);

http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 0776722..94f8b32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -746,35 +746,37 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         int totalMem = getClusterTotalMemory(subQuery);
         LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
         int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
-
         // determine the number of task
         taskNum = Math.min(taskNum, slots);
+
         if (conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM) > 0) {
           taskNum = conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM);
           LOG.warn("!!!!! TESTCASE MODE !!!!!");
         }
-        LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
 
         // The shuffle output numbers of join may be inconsistent by execution block order.
         // Thus, we need to compare the number with DataChannel output numbers.
         // If the number is right, the number and DataChannel output numbers will be consistent.
-        int outerShuffleOutptNum = 0, innerShuffleOutputNum = 0;
+        int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0;
         for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
-          outerShuffleOutptNum = Math.max(outerShuffleOutptNum, eachChannel.getShuffleOutputNum());
+          outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum());
         }
-
         for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
           innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
         }
-
-        if (outerShuffleOutptNum != innerShuffleOutputNum
-            && taskNum != outerShuffleOutptNum
+        if (outerShuffleOutputNum != innerShuffleOutputNum
+            && taskNum != outerShuffleOutputNum
             && taskNum != innerShuffleOutputNum) {
-          taskNum = Math.max(outerShuffleOutptNum, innerShuffleOutputNum);
+          LOG.info(subQuery.getId() + ", Change determined number of join partitions cause
difference of outputNum" +
+                  ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum,
innerShuffleOutputNum) +
+                  ", outerShuffleOutptNum=" + outerShuffleOutputNum +
+                  ", innerShuffleOutputNum=" + innerShuffleOutputNum);
+          taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum);
         }
 
-        return taskNum;
+        LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
 
+        return taskNum;
         // Is this subquery the first step of group-by?
       } else if (grpNode != null) {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/438010f9/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 13a0b2b..ca1ece1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -35,8 +35,10 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -1051,4 +1053,50 @@ public class TestJoinQuery extends QueryTestCaseBase {
       cleanupQuery(res);
     }
   }
+
+  @Test
+  public void testJoinWithDifferentShuffleKey() throws Exception {
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+
+    List<String> data = new ArrayList<String>();
+
+    int bytes = 0;
+    for (int i = 0; i < 1000000; i++) {
+      String row = i + "|" + i + "name012345678901234567890123456789012345678901234567890";
+      bytes += row.getBytes().length;
+      data.add(row);
+      if (bytes > 2 * 1024 * 1024) {
+        break;
+      }
+    }
+    TajoTestingCluster.createTable("large_table", schema, tableOptions, data.toArray(new
String[]{}));
+
+    int originConfValue = conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME);
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME.varname,
"1");
+    ResultSet res = executeString(
+       "select count(b.id) " +
+           "from (select id, count(*) as cnt from large_table group by id) a " +
+           "left outer join (select id, count(*) as cnt from large_table where id < 200
group by id) b " +
+           "on a.id = b.id"
+    );
+
+    try {
+      String expected =
+          "?count\n" +
+              "-------------------------------\n" +
+              "200\n";
+
+      assertEquals(expected, resultSetToString(res));
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME.varname,
"" + originConfValue);
+      cleanupQuery(res);
+      executeString("DROP TABLE large_table PURGE").close();
+    }
+  }
 }


Mime
View raw message