tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [45/50] [abbrv] git commit: DAG-execplan: tmp
Date Sat, 28 Dec 2013 06:36:42 GMT
DAG-execplan: tmp


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5291ed49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5291ed49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5291ed49

Branch: refs/heads/DAG-execplan
Commit: 5291ed492c3ddfa6499a6893f1dd2fb1df30e8ac
Parents: 53e8464
Author: Jihoon Son <jihoonson@apache.org>
Authored: Fri Dec 27 17:06:39 2013 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Fri Dec 27 17:06:39 2013 +0900

----------------------------------------------------------------------
 .../engine/planner/PhysicalPlannerImpl.java     |   7 +-
 .../tajo/engine/planner/global/DataChannel.java |  43 ++--
 .../engine/planner/global/ExecutionBlock.java   |   4 +-
 .../planner/global/ExecutionBlockPID.java       |   8 +
 .../engine/planner/global/ExecutionPlan.java    | 230 ++++++++++++++-----
 .../planner/global/ExecutionPlanEdge.java       |  12 +-
 .../engine/planner/global/GlobalPlanner.java    | 170 ++++++++------
 .../tajo/engine/planner/global/MasterPlan.java  |   3 +-
 .../tajo/master/querymaster/SubQuery.java       |   4 +
 .../planner/global/TestExecutionPlan.java       |  33 ++-
 10 files changed, 332 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c3cf697..19a6574 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -33,7 +33,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionPlan;
-import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.EdgeType;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.exception.InternalException;
@@ -141,8 +141,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         }
 
         plan.remove(node, root);
-        plan.add(node, storeTableNode, Tag.SINGLE);
-        plan.add(storeTableNode, root, Tag.SINGLE);
+        plan.add(node, storeTableNode, EdgeType.SINGLE);
+        plan.add(storeTableNode, root, EdgeType.SINGLE);
         channel.updateSrcPID(storeTableNode.getPID());
       }
     }
@@ -732,6 +732,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       for (DataChannel outChannel : ctx.getOutgoingChannels()) {
         if (outChannel.getSrcPID() == plan.getPID()) {
           channel = outChannel;
+          break;
         }
       }
       switch (channel.getPartitionType()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 034d71f..a8eb1c3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -22,13 +22,12 @@ import com.google.common.base.Preconditions;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.LogicalNodeGroup;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 
 public class DataChannel {
-//  private ExecutionBlockId srcId;
-//  private ExecutionBlockId targetId;
   private ExecutionBlockPID srcId;
   private ExecutionBlockPID targetId;
   private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
@@ -40,36 +39,27 @@ public class DataChannel {
 
   private StoreType storeType = StoreType.RAW;
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID) {
-//    this.srcId = srcId;
-//    this.targetId = targetId;
-    this.srcId = new ExecutionBlockPID(srcId, srcPID);
-    this.targetId = new ExecutionBlockPID(targetId, targetPID);
-//    this.srcPID = srcPID;
-//    this.targetPID = targetPID;
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
+    this.srcId = new ExecutionBlockPID(srcId);
+    this.targetId = new ExecutionBlockPID(targetId);
   }
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
-                     PartitionType partitionType) {
-    this(srcId, targetId, srcPID, targetPID);
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
+    this(srcId, targetId);
     this.partitionType = partitionType;
   }
 
-  public DataChannel(ExecutionBlock src, ExecutionBlock target, Integer srcPID, Integer targetPID,
-                     PartitionType partitionType, int partNum, Schema schema) {
-    this(src.getId(), target.getId(), srcPID, targetPID, partitionType, partNum);
+  public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum, Schema schema) {
+    this(src.getId(), target.getId(), partitionType, partNum);
     setSchema(schema);
   }
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
-                     PartitionType partitionType, int partNum) {
-    this(srcId, targetId, srcPID, targetPID, partitionType);
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
+    this(srcId, targetId, partitionType);
     this.partitionNum = partNum;
   }
 
   public DataChannel(DataChannelProto proto) {
-//    this.srcId = new ExecutionBlockId(proto.getSrcId());
-//    this.targetId = new ExecutionBlockId(proto.getTargetId());
     this.transmitType = proto.getTransmitType();
     this.partitionType = proto.getPartitionType();
     if (proto.hasSchema()) {
@@ -86,12 +76,6 @@ public class DataChannel {
     if (proto.hasPartitionNum()) {
       this.partitionNum = proto.getPartitionNum();
     }
-//    if (proto.hasSrcPID()) {
-//      this.srcPID = proto.getSrcPID();
-//    }
-//    if (proto.hasTargetPID()) {
-//      this.targetPID = proto.getTargetPID();
-//    }
 
     Integer srcPID = proto.hasSrcPID() ? proto.getSrcPID() : null;
     Integer targetPID = proto.hasTargetPID() ? proto.getTargetPID() : null;
@@ -250,4 +234,11 @@ public class DataChannel {
   public Integer getTargetPID() {
     return targetId.getPid();
   }
+
+  public static DataChannel linkChannelAndLogicalNodeGroups(LogicalNodeGroup src, LogicalNodeGroup target,
+                                                            DataChannel channel) {
+    channel.srcId.setPID(src.getId());
+    channel.targetId.setPID(target.getId());
+    return channel;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 17e365a..378726b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -39,9 +39,9 @@ public class ExecutionBlock {
 
   private Set<String> broadcasted = new HashSet<String>();
 
-  public ExecutionBlock(ExecutionBlockId executionBlockId, PIDFactory pidFactory, LogicalRootNode rootNode) {
+  public ExecutionBlock(ExecutionBlockId executionBlockId, PIDFactory pidFactory) {
     this.executionBlockId = executionBlockId;
-    this.executionPlan = new ExecutionPlan(pidFactory, rootNode);
+    this.executionPlan = new ExecutionPlan(pidFactory);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
index d120057..990de91 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockPID.java
@@ -24,11 +24,19 @@ public class ExecutionBlockPID {
   private ExecutionBlockId executionBlockId;
   private Integer pid;
 
+  public ExecutionBlockPID(ExecutionBlockId executionBlockId) {
+    this.executionBlockId = executionBlockId;
+  }
+
   public ExecutionBlockPID(ExecutionBlockId executionBlockId, Integer pid) {
     this.executionBlockId = executionBlockId;
     this.pid = pid;
   }
 
+  public void setPID(int pid) {
+    this.pid = pid;
+  }
+
   public ExecutionBlockId getExecutionBlockId() {
     return executionBlockId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
index 4a83614..614d128 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@ -18,15 +18,15 @@
 
 package org.apache.tajo.engine.planner.global;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.EdgeType;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.json.GsonObject;
@@ -49,47 +49,146 @@ public class ExecutionPlan implements GsonObject {
   @Expose private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
       = new SimpleDirectedGraph<Integer, ExecutionPlanEdge>();
 
-  @VisibleForTesting
-  public ExecutionPlan(PIDFactory pidFactory) {
-    this.pidFactory = pidFactory;
+  private NavigableMap<Integer, LogicalNodeGroup> logicalNodeGroups = Maps.newTreeMap();
+  private boolean built = false;
+
+  public static class LogicalNodeGroup {
+    private int rootPID;
+    private List<LogicalNode> nodes = Lists.newArrayList();  // order: root -> leaf
+
+    public LogicalNodeGroup(int rootPID) {
+      setId(rootPID);
+    }
+
+    public void setId(int rootPID) {
+      this.rootPID = rootPID;
+    }
+
+    public int getId() {
+      return rootPID;
+    }
+
+    public void addNodeAndDescendants(LogicalNode logicalNode) {
+      add(logicalNode);
+      if (logicalNode instanceof UnaryNode) {
+        add(((UnaryNode) logicalNode).getChild());
+      } else if (logicalNode instanceof BinaryNode) {
+        add(((BinaryNode) logicalNode).getLeftChild());
+        add(((BinaryNode) logicalNode).getRightChild());
+      } else if (logicalNode instanceof TableSubQueryNode) {
+        add(((TableSubQueryNode) logicalNode).getSubQuery());
+      }
+    }
+
+    public void add(LogicalNode logicalNode) {
+      nodes.add(logicalNode);
+    }
+
+    public LogicalNode toLinkedLogicalNode() {
+      LogicalNode[] nodes = this.nodes.toArray(new LogicalNode[this.nodes.size()]);
+
+      for (int i = 0; i < nodes.length; i++) {
+        if (nodes[i] instanceof UnaryNode) {
+          ((UnaryNode)nodes[i]).setChild(nodes[++i]);
+        } else if (nodes[i] instanceof BinaryNode) {
+          ((BinaryNode)nodes[i]).setLeftChild(nodes[++i]);
+          ((BinaryNode)nodes[i]).setRightChild(nodes[++i]);
+        } else if (nodes[i] instanceof TableSubQueryNode) {
+          ((TableSubQueryNode)nodes[i]).setSubQuery(nodes[++i]);
+        }
+      }
+      return nodes[0];
+    }
+
+    public LogicalNode getRootNode() {
+      return nodes.get(0);
+    }
+
+    public LogicalNode getLeafNode() {
+      return nodes.get(nodes.size()-1);
+    }
+
+    public void clear() {
+      nodes.clear();
+    }
   }
 
-  public ExecutionPlan(PIDFactory pidFactory, LogicalRootNode terminalNode) {
-    this(pidFactory);
+  private ExecutionPlan(PIDFactory pidFactory, LogicalRootNode terminalNode) {
+    this.pidFactory = pidFactory;
     this.terminalNode = terminalNode;
   }
 
+  public ExecutionPlan(PIDFactory pidFactory) {
+    this(pidFactory, new LogicalRootNode(pidFactory.newPID()));
+  }
+
   public void setPlan(LogicalNode plan) {
     this.clear();
     this.addPlan(plan);
   }
 
-  private void clear() {
+  public void clear() {
     for (ExecutionPlanEdge edge : graph.getEdgesAll()) {
       graph.removeEdge(edge.getChildId(), edge.getParentId());
     }
+    for (LogicalNodeGroup eachGroup : logicalNodeGroups.values()) {
+      eachGroup.clear();
+    }
+    logicalNodeGroups.clear();
     vertices.clear();
     this.inputContext = null;
     this.hasUnionPlan = false;
     this.hasJoinPlan = false;
+    this.built = false;
   }
 
   public void addPlan(LogicalNode plan) {
-    LogicalNode current = PlannerUtil.clone(pidFactory, plan);
-    if (current.getType() == NodeType.ROOT) {
-      terminalNode = (LogicalRootNode) current;
-    } else {
-      this.add(current, terminalNode, Tag.SINGLE);
-      terminalNode.setChild(current);
+//    Preconditions.checkState(built==false, "Execution plan is already built.");
+    built = false;
+
+    LogicalNode topNode = plan;
+    if (topNode.getType() == NodeType.ROOT) {
+      topNode = ((LogicalRootNode)topNode).getChild();
     }
+
+    // add group
+    LogicalNodeGroup nodeGroup = new LogicalNodeGroup(topNode.getPID());
+    nodeGroup.addNodeAndDescendants(topNode);
+    logicalNodeGroups.put(nodeGroup.rootPID, nodeGroup);
+  }
+
+  public LogicalNodeGroup getLogicalNodeGroupWithPID(int pid) {
+    return logicalNodeGroups.get(pid);
+  }
+
+  public LogicalNodeGroup getFirstLogicalNodeGroup() {
+    return logicalNodeGroups.firstEntry().getValue();
+  }
+
+  public void build() {
+//    Preconditions.checkState(built==false, "Execution plan is already built.");
+    if (built) {
+      return;
+    }
+
     ExecutionPlanBuilder builder = new ExecutionPlanBuilder(this);
-    builder.visit(terminalNode);
+
+    for (LogicalNodeGroup logicalNodeGroup : logicalNodeGroups.values()) {
+      LogicalNode topNode = logicalNodeGroup.nodes.iterator().next();
+      builder.visit(topNode);
+      this.add(topNode, terminalNode, EdgeType.SINGLE);
+    }
+    this.built = true;
   }
 
-  public void add(LogicalNode child, LogicalNode parent, Tag tag) {
+  public boolean isBuilt() {
+    return built;
+  }
+
+  public void add(LogicalNode child, LogicalNode parent, EdgeType edgeType) {
     vertices.put(child.getPID(), child);
     vertices.put(parent.getPID(), parent);
-    graph.addEdge(child.getPID(), parent.getPID(), new ExecutionPlanEdge(child, parent, tag));
+    graph.addEdge(child.getPID(), parent.getPID(), new ExecutionPlanEdge(child, parent, edgeType));
   }
 
   public void setInputContext(InputContext contexts) {
@@ -116,14 +215,14 @@ public class ExecutionPlan implements GsonObject {
     return graph.toStringGraph(terminalNode.getPID());
   }
 
-  public Tag getTag(LogicalNode child, LogicalNode parent) {
-    return graph.getEdge(child.getPID(), parent.getPID()).getTag();
+  public EdgeType getEdgeType(LogicalNode child, LogicalNode parent) {
+    return graph.getEdge(child.getPID(), parent.getPID()).getEdgeType();
   }
 
-  public LogicalNode getChild(LogicalNode parent, Tag tag) {
+  public LogicalNode getChild(LogicalNode parent, EdgeType edgeType) {
     List<ExecutionPlanEdge> incomingEdges = graph.getIncomingEdges(parent.getPID());
     for (ExecutionPlanEdge inEdge : incomingEdges) {
-      if (inEdge.getTag() == tag) {
+      if (inEdge.getEdgeType() == edgeType) {
         return vertices.get(inEdge.getChildId());
       }
     }
@@ -203,13 +302,13 @@ public class ExecutionPlan implements GsonObject {
     this.graph.removeEdge(child.getPID(), parent.getPID());
   }
 
-  private static class PIDAndTag {
+  private static class PIDAndEdgeType {
     @Expose int id;
-    @Expose Tag tag;
+    @Expose EdgeType edgeType;
 
-    public PIDAndTag(int id, Tag tag) {
+    public PIDAndEdgeType(int id, EdgeType edgeType) {
       this.id = id;
-      this.tag = tag;
+      this.edgeType = edgeType;
     }
   }
 
@@ -220,7 +319,7 @@ public class ExecutionPlan implements GsonObject {
     @Expose private final InputContext inputContext;
     @Expose private final LogicalRootNode terminalNode;
     @Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
-    @Expose Map<Integer, List<PIDAndTag>> adjacentList = new HashMap<Integer, List<PIDAndTag>>();
+    @Expose Map<Integer, List<PIDAndEdgeType>> adjacentList = new HashMap<Integer, List<PIDAndEdgeType>>();
 
     public ExecutionPlanJsonHelper(ExecutionPlan plan) {
       this.pidFactory = plan.pidFactory;
@@ -231,7 +330,7 @@ public class ExecutionPlan implements GsonObject {
       this.vertices.putAll(plan.vertices);
       Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
       int parentId, childId;
-      List<PIDAndTag> adjacents;
+      List<PIDAndEdgeType> adjacents;
 
       // convert the graph to an adjacent list
       for (ExecutionPlanEdge edge : edges) {
@@ -241,10 +340,10 @@ public class ExecutionPlan implements GsonObject {
         if (adjacentList.containsKey(childId)) {
           adjacents = adjacentList.get(childId);
         } else {
-          adjacents = new ArrayList<PIDAndTag>();
+          adjacents = new ArrayList<PIDAndEdgeType>();
           adjacentList.put(childId, adjacents);
         }
-        adjacents.add(new PIDAndTag(parentId, edge.getTag()));
+        adjacents.add(new PIDAndEdgeType(parentId, edge.getEdgeType()));
       }
     }
 
@@ -261,10 +360,10 @@ public class ExecutionPlan implements GsonObject {
       plan.setInputContext(this.inputContext);
       plan.vertices.putAll(this.vertices);
 
-      for (Entry<Integer, List<PIDAndTag>> e : this.adjacentList.entrySet()) {
+      for (Entry<Integer, List<PIDAndEdgeType>> e : this.adjacentList.entrySet()) {
         LogicalNode child = this.vertices.get(e.getKey());
-        for (PIDAndTag idAndTag : e.getValue()) {
-          plan.add(child, this.vertices.get(idAndTag.id), idAndTag.tag);
+        for (PIDAndEdgeType pidAndEdgeType : e.getValue()) {
+          plan.add(child, this.vertices.get(pidAndEdgeType.id), pidAndEdgeType.edgeType);
         }
       }
 
@@ -283,11 +382,19 @@ public class ExecutionPlan implements GsonObject {
     }
 
     public boolean compare() {
-      Stack<Integer> s1 = new Stack<Integer>();
-      Stack<Integer> s2 = new Stack<Integer>();
-      s1.push(plan1.getTopNode(0).getPID());
-      s2.push(plan2.getTopNode(0).getPID());
-      return recursiveCompare(s1, s2);
+      if (plan1.getChildCount(plan1.terminalNode)
+          == plan2.getChildCount(plan2.terminalNode)) {
+        Stack<Integer> s1 = new Stack<Integer>();
+        Stack<Integer> s2 = new Stack<Integer>();
+        int childCount = plan1.getChildCount(plan1.terminalNode);
+        for (int i = 0; i < childCount; i++) {
+          s1.push(plan1.getTopNode(i).getPID());
+          s2.push(plan2.getTopNode(i).getPID());
+        }
+        return recursiveCompare(s1, s2);
+      } else {
+        return false;
+      }
     }
 
     private boolean recursiveCompare(Stack<Integer> s1, Stack<Integer> s2) {
@@ -328,43 +435,42 @@ public class ExecutionPlan implements GsonObject {
     @Override
     public void visit(LogicalNode current) {
       try {
-        Preconditions.checkArgument(current instanceof UnaryNode, "The current node should be an unary node");
-        visit(current, Tag.SINGLE);
+        visit(current, EdgeType.SINGLE);
       } catch (PlanningException e) {
         throw new RuntimeException(e);
       }
     }
 
-    private void visit(LogicalNode current, Tag tag) throws PlanningException {
+    private void visit(LogicalNode current, EdgeType edgeType) throws PlanningException {
       if (current instanceof UnaryNode) {
-        visitUnary((UnaryNode) current, tag);
+        visitUnary((UnaryNode) current, edgeType);
       } else if (current instanceof BinaryNode) {
-        visitBinary((BinaryNode) current, tag);
+        visitBinary((BinaryNode) current, edgeType);
       } else if (current instanceof ScanNode) {
-        visitScan((ScanNode) current, tag);
+        visitScan((ScanNode) current, edgeType);
       } else if (current instanceof TableSubQueryNode) {
-        visitTableSubQuery((TableSubQueryNode) current, tag);
+        visitTableSubQuery((TableSubQueryNode) current, edgeType);
       }
     }
 
-    private void visitScan(ScanNode node, Tag tag) throws PlanningException {
+    private void visitScan(ScanNode node, EdgeType edgeType) throws PlanningException {
       if (plan.inputContext == null) {
         plan.inputContext = new InputContext();
       }
       plan.inputContext.addScanNode(node);
     }
 
-    private void visitUnary(UnaryNode node, Tag tag) throws PlanningException {
+    private void visitUnary(UnaryNode node, EdgeType edgeType) throws PlanningException {
       if (node.getChild() != null) {
-        LogicalNode child = PlannerUtil.clone(plan.pidFactory, node.getChild());
-        plan.add(child, node, tag);
+        LogicalNode child = node.getChild();
+        plan.add(child, node, edgeType);
         node.setChild(null);
-        visit(child, tag);
+        visit(child, edgeType);
       }
     }
 
-    private void visitBinary(BinaryNode node, Tag tag) throws PlanningException {
-      Preconditions.checkArgument(tag == Tag.SINGLE);
+    private void visitBinary(BinaryNode node, EdgeType edgeType) throws PlanningException {
+      Preconditions.checkArgument(edgeType == EdgeType.SINGLE);
 
       LogicalNode child;
       if (node.getType() == NodeType.JOIN) {
@@ -373,23 +479,23 @@ public class ExecutionPlan implements GsonObject {
         plan.hasUnionPlan = true;
       }
       if (node.getLeftChild() != null) {
-        child = PlannerUtil.clone(plan.pidFactory, node.getLeftChild());
-        plan.add(child, node, Tag.LEFT);
+        child = node.getLeftChild();
+        plan.add(child, node, EdgeType.LEFT);
         node.setLeftChild(null);
-        visit(child, Tag.LEFT);
+        visit(child, EdgeType.LEFT);
       }
       if (node.getRightChild() != null) {
-        child = PlannerUtil.clone(plan.pidFactory, node.getRightChild());
-        plan.add(child, node, Tag.RIGHT);
+        child = node.getRightChild();
+        plan.add(child, node, EdgeType.RIGHT);
         node.setRightChild(null);
-        visit(child, Tag.RIGHT);
+        visit(child, EdgeType.RIGHT);
       }
     }
 
-    private void visitTableSubQuery(TableSubQueryNode node, Tag tag) throws PlanningException {
-      LogicalNode child = PlannerUtil.clone(plan.pidFactory, node.getSubQuery());
-      plan.add(child, node, tag);
-      visit(child, tag);
+    private void visitTableSubQuery(TableSubQueryNode node, EdgeType edgeType) throws PlanningException {
+      LogicalNode child = node.getSubQuery();
+      plan.add(child, node, edgeType);
+      visit(child, edgeType);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java
index c8b1415..00669df 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java
@@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner.global;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 
 public class ExecutionPlanEdge {
-  public static enum Tag {
+  public static enum EdgeType {
     LEFT,
     RIGHT,
     SINGLE
@@ -29,12 +29,12 @@ public class ExecutionPlanEdge {
 
   private final Integer parentId;
   private final Integer childId;
-  private final Tag tag;
+  private final EdgeType edgeType;
 
-  public ExecutionPlanEdge(LogicalNode child, LogicalNode parent, Tag tag) {
+  public ExecutionPlanEdge(LogicalNode child, LogicalNode parent, EdgeType edgeType) {
     this.parentId = parent.getPID();
     this.childId = child.getPID();
-    this.tag = tag;
+    this.edgeType = edgeType;
   }
 
   public Integer getParentId() {
@@ -45,7 +45,7 @@ public class ExecutionPlanEdge {
     return childId;
   }
 
-  public Tag getTag() {
-    return tag;
+  public EdgeType getEdgeType() {
+    return edgeType;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 1cebc8e..efa4259 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -35,7 +35,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.EdgeType;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.storage.AbstractStorageManager;
 
@@ -84,6 +84,7 @@ public class GlobalPlanner {
     ExecutionBlock childExecBlock = globalPlanContext.execBlockMap.get(lastNode.getPID());
 
     if (childExecBlock.getPlan() != null) {
+      childExecBlock.getPlan().build();
       ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
       DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, lastNode.getPID(), -1, NONE_PARTITION, 1,
           lastNode.getOutSchema());
@@ -119,13 +120,22 @@ public class GlobalPlanner {
     int targetPid;
     if (leftTable) {
       childBlock = leftBlock;
+      if (!childBlock.getPlan().isBuilt()) {
+        childBlock.getPlan().build();
+      }
       targetPid = parent.getInputContext().getScanNodes()[0].getPID();
     } else {
       childBlock = rightBlock;
+      if (!childBlock.getPlan().isBuilt()) {
+        childBlock.getPlan().build();
+      }
       targetPid = parent.getInputContext().getScanNodes()[1].getPID();
     }
-    LogicalNode srcTopNode = childBlock.getPlan().getChild(childBlock.getPlan().getTerminalNode(), 0);
+    LogicalNode srcTopNode = childBlock.getPlan().getTopNode(0);
     int srcPid = srcTopNode.getPID();
+    if (!parent.getPlan().isBuilt()) {
+      parent.getPlan().build();
+    }
 
     DataChannel channel = new DataChannel(childBlock, parent, srcPid, targetPid, HASH_PARTITION, 32,
         srcTopNode.getOutSchema());
@@ -187,17 +197,23 @@ public class GlobalPlanner {
 
     // symmetric repartition join
     currentBlock = masterPlan.newExecutionBlock();
+    leftBlock.getPlan().build();
+    rightBlock.getPlan().build();
+    LogicalNode leftTopNode = leftBlock.getPlan().getTopNode(0);
+    LogicalNode rightTopNode = rightBlock.getPlan().getTopNode(0);
 
-    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
-    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
-
-    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
-    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
+    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftTopNode.getOutSchema(),
+        leftBlock.getId(), currentBlock.getId(), storeType);
+    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightTopNode.getOutSchema(),
+        rightBlock.getId(), currentBlock.getId(), storeType);
 
     joinNode.setLeftChild(leftScan);
     joinNode.setRightChild(rightScan);
     currentBlock.setPlan(joinNode);
 
+    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
+    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+
     masterPlan.addConnect(leftChannel);
     masterPlan.addConnect(rightChannel);
 
@@ -245,6 +261,8 @@ public class GlobalPlanner {
     currentBlock.setPlan(groupbyNode);
 
     // setup channel
+    childBlock.getPlan().build();
+    currentBlock.getPlan().build();
     DataChannel channel;
     channel = new DataChannel(childBlock, currentBlock, topMostOfFirstPhase.getPID(), scanNode.getPID(), HASH_PARTITION,
         32, topMostOfFirstPhase.getOutSchema());
@@ -283,10 +301,10 @@ public class GlobalPlanner {
 
           ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
           ExecutionPlan subBlockPlan = subBlock.getPlan();
-          LogicalNode topNode = subBlockPlan.getChild(subBlockPlan.getTerminalNode(), 0);
+          LogicalNode topNodeOfSubBlock = subBlockPlan.getLogicalNodeGroupWithPID(dataChannel.getSrcPID())
+              .toLinkedLogicalNode();
           GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), firstPhaseGroupBy);
-          subBlockPlan.add(topNode, g1, Tag.SINGLE);
-//          g1.setChild(subBlock.getPlan());
+          g1.setChild(topNodeOfSubBlock);
           subBlock.setPlan(g1);
 
           GroupbyNode g2 = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), groupbyNode);
@@ -298,23 +316,28 @@ public class GlobalPlanner {
         childBlock.setPlan(firstPhaseGroupBy);
         currentBlock = masterPlan.newExecutionBlock();
 
+        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), firstPhaseGroupBy.getOutSchema(),
+            childBlock.getId(), currentBlock.getId(), storeType);
+        groupbyNode.setChild(scanNode);
+        groupbyNode.setInSchema(scanNode.getOutSchema());
+        currentBlock.setPlan(groupbyNode);
+
         DataChannel channel;
+        childBlock.getPlan().build();
+        currentBlock.getPlan().build();
         int srcPID = childBlock.getPlan().getTopNodePid(0);
         int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
+        int partitionNum;
         if (firstPhaseGroupBy.isEmptyGrouping()) {
-          channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 1, firstPhaseGroupBy.getOutSchema());
-          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+          partitionNum = 1;
         } else {
-          channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32, firstPhaseGroupBy.getOutSchema());
-          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+          partitionNum = 32;
         }
-//        channel.setSchema(firstPhaseGroupBy.getOutSchema());
+        channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, partitionNum,
+            firstPhaseGroupBy.getOutSchema());
+        channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         channel.setStoreType(storeType);
 
-        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-        groupbyNode.setChild(scanNode);
-        groupbyNode.setInSchema(scanNode.getOutSchema());
-        currentBlock.setPlan(groupbyNode);
         masterPlan.addConnect(channel);
       }
     }
@@ -327,28 +350,32 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock;
 
     SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), currentNode);
-//    LogicalNode childBlockPlan = childBlock.getPlan();
-    ExecutionPlan childBlockPlan = childBlock.getPlan();
-    childBlockPlan.add(childBlockPlan.getTopNode(0), firstSortNode, Tag.SINGLE);
-//    firstSortNode.setChild(childBlockPlan.getTopNode(0));
+    LogicalNode childBlockPlan = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
+    firstSortNode.setChild(childBlockPlan);
     // sort is a non-projectable operator. So, in/out schemas are the same to its child operator.
-    firstSortNode.setInSchema(childBlockPlan.getOutSchema(0));
-    firstSortNode.setOutSchema(childBlockPlan.getOutSchema(0));
-//    childBlock.setPlan(firstSortNode);
+    firstSortNode.setInSchema(childBlockPlan.getOutSchema());
+    firstSortNode.setOutSchema(childBlockPlan.getOutSchema());
+    childBlock.setPlan(firstSortNode);
 
     currentBlock = masterPlan.newExecutionBlock();
+    childBlock.getPlan().build();
     int srcPID = childBlock.getPlan().getTopNodePid(0);
     int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
+
+    ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), firstSortNode.getOutSchema(),
+        childBlock.getId(), currentBlock.getId(), storeType);
+    currentNode.setChild(secondScan);
+    currentNode.setInSchema(secondScan.getOutSchema());
+    currentBlock.setPlan(currentNode);
+
+    childBlock.getPlan().build();
+    currentBlock.getPlan().build();
+
     DataChannel channel = new DataChannel(childBlock, currentBlock, srcPID, targetPID, HASH_PARTITION, 32,
         firstSortNode.getOutSchema());
     channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
 //    channel.setSchema(firstSortNode.getOutSchema());
     channel.setStoreType(storeType);
-
-    ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-    currentNode.setChild(secondScan);
-    currentNode.setInSchema(secondScan.getOutSchema());
-    currentBlock.setPlan(currentNode);
     masterPlan.addConnect(channel);
 
     return currentBlock;
@@ -362,11 +389,10 @@ public class GlobalPlanner {
 
     // if result table is not a partitioned table, directly store it
     if(partitionDesc == null) {
-//      currentNode.setChild(childBlock.getPlan().getTopNode(0));
-      ExecutionPlan executionPlan = childBlock.getPlan();
-      executionPlan.add(executionPlan.getTopNode(0), currentNode, Tag.SINGLE);
-      currentNode.setInSchema(childBlock.getPlan().getOutSchema(0));
-//      childBlock.setPlan(currentNode);
+      LogicalNode topNodeOfBlock = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
+      currentNode.setChild(topNodeOfBlock);
+      currentNode.setInSchema(topNodeOfBlock.getOutSchema());
+      childBlock.setPlan(currentNode);
       return childBlock;
     }
 
@@ -376,11 +402,21 @@ public class GlobalPlanner {
     LogicalNode childNode = currentNode.getChild();
     childBlock.setPlan(childNode);
 
-    // 2. create a new execution block, pipeline 2 exec blocks through a DataChannel
+    // 2. create a ScanNode for scanning shuffle data
+    //    StoreTableNode as the root node of the new execution block
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), childNode.getOutSchema(),
+        childBlock.getId(), currentBlock.getId(), storeType);
+    currentNode.setChild(scanNode);
+    currentNode.setInSchema(scanNode.getOutSchema());
+    currentBlock.setPlan(currentNode);
+
+    // 3. pipeline 2 exec blocks through a DataChannel
     DataChannel channel = null;
     CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+    childBlock.getPlan().build();
+    currentBlock.getPlan().build();
     int srcPID = childBlock.getPlan().getTopNodePid(0);
     int targetPID = currentBlock.getInputContext().getScanNodes()[0].getPID();
     if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
@@ -402,13 +438,6 @@ public class GlobalPlanner {
       // TODO
     }
 
-    // 3. create a ScanNode for scanning shuffle data
-    //    StoreTableNode as the root node of the new execution block
-    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-    currentNode.setChild(scanNode);
-    currentNode.setInSchema(scanNode.getOutSchema());
-    currentBlock.setPlan(currentNode);
-
     masterPlan.addConnect(channel);
 
     return currentBlock;
@@ -429,12 +458,11 @@ public class GlobalPlanner {
       LogicalNode child = super.visitProjection(context, plan, node, stack);
 
       ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
+      LogicalNode nodeOfExecBlock = execBlock.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
 
-//      node.setChild(execBlock.getPlan().getTopNode(0));
-      ExecutionPlan executionPlan = execBlock.getPlan();
-      executionPlan.add(executionPlan.getTopNode(0), node, Tag.SINGLE);
-      node.setInSchema(execBlock.getPlan().getOutSchema(0));
-//      execBlock.setPlan(node);
+      node.setChild(nodeOfExecBlock);
+      node.setInSchema(nodeOfExecBlock.getOutSchema());
+      execBlock.setPlan(node);
       context.execBlockMap.put(node.getPID(), execBlock);
       return node;
     }
@@ -445,28 +473,26 @@ public class GlobalPlanner {
       LogicalNode child = super.visitLimit(context, plan, node, stack);
 
       ExecutionBlock block;
+      LogicalNode topNodeOfBlock;
       block = context.execBlockMap.remove(child.getPID());
       if (child.getType() == NodeType.SORT) {
-//        node.setChild(block.getPlan());
-//        block.setPlan(node);
-        ExecutionPlan blockPlan = block.getPlan();
-        blockPlan.add(blockPlan.getTopNode(0), node, Tag.SINGLE);
+        topNodeOfBlock = block.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
+        node.setChild(topNodeOfBlock);
+        block.setPlan(node);
 
         ExecutionBlock childBlock = context.plan.getChild(block, 0);
         LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
-//        childLimit.setChild(childBlock.getPlan());
-//        childBlock.setPlan(childLimit);
-        ExecutionPlan childPlan = childBlock.getPlan();
-        childPlan.add(childPlan.getTopNode(0), childLimit, Tag.SINGLE);
+        topNodeOfBlock = childBlock.getPlan().getFirstLogicalNodeGroup().toLinkedLogicalNode();
+        childLimit.setChild(topNodeOfBlock);
+        childBlock.setPlan(childLimit);
 
         DataChannel channel = context.plan.getChannel(childBlock, block);
         channel.setPartitionNum(1);
         context.execBlockMap.put(node.getPID(), block);
       } else {
-//        node.setChild(block.getPlan());
-//        block.setPlan(node);
-        ExecutionPlan blockPlan = block.getPlan();
-        blockPlan.add(blockPlan.getTopNode(0), node, Tag.SINGLE);
+        topNodeOfBlock = block.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
+        node.setChild(topNodeOfBlock);
+        block.setPlan(node);
 
         ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
         ScanNode scanNode = buildInputExecutor(plan, node.getOutSchema(), block.getId(), newExecBlock.getId(), storeType);
@@ -474,6 +500,8 @@ public class GlobalPlanner {
         parentLimit.setChild(scanNode);
         newExecBlock.setPlan(parentLimit);
 
+        block.getPlan().build();
+        newExecBlock.getPlan().build();
         int srcPID = block.getPlan().getTopNodePid(0);
         int targetPID = newExecBlock.getInputContext().getScanNodes()[0].getPID();
         DataChannel newChannel = new DataChannel(block, newExecBlock, srcPID, targetPID, HASH_PARTITION, 1, node.getOutSchema());
@@ -520,11 +548,10 @@ public class GlobalPlanner {
       LogicalNode child = super.visitFilter(context, plan, node, stack);
 
       ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
-//      node.setChild(execBlock.getPlan());
-      node.setInSchema(execBlock.getPlan().getOutSchema(0));
-//      execBlock.setPlan(node);
-      ExecutionPlan executionPlan = execBlock.getPlan();
-      executionPlan.add(executionPlan.getTopNode(0), node, Tag.SINGLE);
+      LogicalNode nodeOfExecBlock = execBlock.getPlan().getLogicalNodeGroupWithPID(child.getPID()).toLinkedLogicalNode();
+      node.setChild(nodeOfExecBlock);
+      node.setInSchema(nodeOfExecBlock.getOutSchema());
+      execBlock.setPlan(node);
       context.execBlockMap.put(node.getPID(), execBlock);
 
       return node;
@@ -570,21 +597,26 @@ public class GlobalPlanner {
       }
 
       ExecutionBlock execBlock;
+      int targetPID;
       if (unionBlocks.size() == 0) {
         execBlock = context.plan.newExecutionBlock();
+        targetPID = -1;
       } else {
         execBlock = unionBlocks.get(0);
+        targetPID = execBlock.getPlan().getTopNodePid(0);
       }
 
       for (ExecutionBlock childBlocks : unionBlocks) {
         ExecutionPlan executionPlan = childBlocks.getPlan();
+        executionPlan.build();
         LogicalNode unionCandidate = executionPlan.getTopNode(0);
-        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, Tag.LEFT).getPID()));
-        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, Tag.RIGHT).getPID()));
+        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, EdgeType.LEFT).getPID()));
+        queryBlockBlocks.add(context.execBlockMap.get(executionPlan.getChild(unionCandidate, EdgeType.RIGHT).getPID()));
       }
 
-      int targetPID = execBlock.getInputContext().getScanNodes()[0].getPID();
+      execBlock.getPlan().build();
       for (ExecutionBlock childBlocks : queryBlockBlocks) {
+        childBlocks.getPlan().build();
         LogicalNode topNode = childBlocks.getPlan().getTopNode(0);
         int srcPID = topNode.getPID();
         DataChannel channel = new DataChannel(childBlocks, execBlock, srcPID, targetPID, NONE_PARTITION, 1, topNode.getOutSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index bef025a..6d03642 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -93,8 +93,7 @@ public class MasterPlan {
   }
 
   public ExecutionBlock newExecutionBlock() {
-    ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId(), plan.getPidFactory(),
-        (LogicalRootNode) plan.getRootBlock().getRoot());
+    ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId(), plan.getPidFactory());
     execBlockMap.put(newExecBlock.getId(), newExecBlock);
     return newExecBlock;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 894da09..48c7f71 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -656,7 +656,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     private static void createTasks(SubQuery subQuery) throws IOException {
       MasterPlan masterPlan = subQuery.getMasterPlan();
       ExecutionBlock execBlock = subQuery.getBlock();
+//      execBlock.getPlan().build();
       QueryUnit [] tasks;
+      if (execBlock.getInputContext() == null) {
+        return;
+      }
       if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) &&
           execBlock.getInputContext().size() == 1) {
         tasks = createLeafTasks(subQuery);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5291ed49/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
index e208d24..e12a94b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
@@ -42,14 +42,17 @@ public class TestExecutionPlan {
     schema.addColumn("name", Type.TEXT);
     schema.addColumn("age", Type.INT2);
 
-    GroupbyNode groupbyNode = new GroupbyNode(3, new Column[]{schema.getColumn(1), schema.getColumn(2)});
-    ScanNode scanNode = new ScanNode(0,
+    PIDFactory pidFactory = new PIDFactory();
+    GroupbyNode groupbyNode = new GroupbyNode(pidFactory.newPID(),
+        new Column[]{schema.getColumn(1), schema.getColumn(2)});
+    ScanNode scanNode = new ScanNode(pidFactory.newPID(),
         CatalogUtil.newTableDesc("in", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in")));
 
     groupbyNode.setChild(scanNode);
 
-    ExecutionPlan plan = new ExecutionPlan(new PIDFactory(), new LogicalRootNode(4));
+    ExecutionPlan plan = new ExecutionPlan(pidFactory);
     plan.addPlan(groupbyNode);
+    plan.build();
 
     String json = plan.toJson();
     ExecutionPlan fromJson = CoreGsonHelper.fromJson(json, ExecutionPlan.class);
@@ -62,36 +65,42 @@ public class TestExecutionPlan {
     schema.addColumn("id", Type.INT4);
     schema.addColumn("name", Type.TEXT);
     schema.addColumn("age", Type.INT2);
+    PIDFactory pidFactory = new PIDFactory();
 
-    LogicalRootNode root1 = new LogicalRootNode(10);
-    GroupbyNode groupbyNode = new GroupbyNode(3, new Column[]{schema.getColumn(1), schema.getColumn(2)});
-    ScanNode scanNode = new ScanNode(0,
+    LogicalRootNode root1 = new LogicalRootNode(pidFactory.newPID());
+    GroupbyNode groupbyNode = new GroupbyNode(pidFactory.newPID(),
+        new Column[]{schema.getColumn(1), schema.getColumn(2)});
+    ScanNode scanNode = new ScanNode(pidFactory.newPID(),
         CatalogUtil.newTableDesc("in", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in")));
     root1.setChild(groupbyNode);
     groupbyNode.setChild(scanNode);
 
-    LogicalRootNode root2 = new LogicalRootNode(11);
-    SortNode sortNode = new SortNode(2, new SortSpec[]{new SortSpec(schema.getColumn(2))});
+    LogicalRootNode root2 = new LogicalRootNode(pidFactory.newPID());
+    SortNode sortNode = new SortNode(pidFactory.newPID(),
+        new SortSpec[]{new SortSpec(schema.getColumn(2))});
     root2.setChild(sortNode);
     sortNode.setChild(scanNode);
 
-    LogicalRootNode root3 = new LogicalRootNode(12);
-    JoinNode joinNode = new JoinNode(4);
-    ScanNode scanNode2 = new ScanNode(1,
+    LogicalRootNode root3 = new LogicalRootNode(pidFactory.newPID());
+    JoinNode joinNode = new JoinNode(pidFactory.newPID());
+    ScanNode scanNode2 = new ScanNode(pidFactory.newPID(),
         CatalogUtil.newTableDesc("in2", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in2")));
     root3.setChild(joinNode);
     joinNode.setLeftChild(scanNode);
     joinNode.setRightChild(scanNode2);
 
-    ExecutionPlan plan = new ExecutionPlan(new PIDFactory(), new LogicalRootNode(5));
+    ExecutionPlan plan = new ExecutionPlan(pidFactory);
     plan.addPlan(root1);
     plan.addPlan(root2);
+    plan.build();
     assertEquals(1, plan.getInputContext().size());
     assertEquals(1, plan.getChildCount(groupbyNode));
     assertEquals(1, plan.getChildCount(sortNode));
     assertEquals(plan.getChild(groupbyNode, 0), plan.getChild(sortNode, 0));
 
+//    plan.clear();
     plan.addPlan(root3);
+    plan.build();
     assertEquals(2, plan.getInputContext().size());
     assertEquals(3, plan.getParentCount(scanNode));
 


Mime
View raw message