tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [3/7] TAJO-184: Refactor GlobalPlanner and global plan data structure. (hyunsik)
Date Mon, 16 Sep 2013 11:33:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index cc5f848..bc2b3eb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -18,27 +18,33 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
 import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -47,6 +53,10 @@ import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.HASH_PARTITION;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.RANGE_PARTITION;
+
 /**
  * Repartitioner creates non-leaf tasks and shuffles intermediate data.
  * It supports two repartition methods, such as hash and range repartition.
@@ -58,53 +68,68 @@ public class Repartitioner {
 
   public static QueryUnit[] createJoinTasks(SubQuery subQuery)
       throws IOException {
+    MasterPlan masterPlan = subQuery.getMasterPlan();
     ExecutionBlock execBlock = subQuery.getBlock();
-    //CatalogService catalog = subQuery.getContext().getCatalog();
+    QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
+    AbstractStorageManager storageManager = subQuery.getStorageManager();
 
     ScanNode[] scans = execBlock.getScanNodes();
+    ExecutionBlock [] childBlocks = new ExecutionBlock[2];
+    childBlocks[0] = masterPlan.getChild(execBlock.getId(), 0);
+    childBlocks[1] = masterPlan.getChild(execBlock.getId(), 1);
+
     Path tablePath;
     Fragment [] fragments = new Fragment[2];
     TableStat [] stats = new TableStat[2];
 
     // initialize variables from the child operators
     for (int i =0; i < 2; i++) {
-      // TODO - temporarily tables should be stored in temporarily catalog for each query
-      TableDesc tableDesc = subQuery.getContext().getTableDescMap().get(scans[i].getFromTable().getTableName());
-      if (scans[i].getTableId().startsWith(ExecutionBlockId.EB_ID_PREFIX)) {
-        tablePath = subQuery.getStorageManager().getTablePath(scans[i].getTableId());
-        stats[i] = subQuery.getChildQuery(scans[i]).getTableStat();
+      TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getFromTable().getTableName());
+      if (tableDesc == null) { // if it is a real table stored on storage
+        // TODO - to be fixed (wrong directory)
+        tablePath = storageManager.getTablePath(scans[i].getTableName());
+        stats[i] = masterContext.getSubQuery(childBlocks[i].getId()).getTableStat();
+        fragments[i] = new Fragment(scans[i].getTableName(), tablePath,
+            CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV), 0, 0);
       } else {
         tablePath = tableDesc.getPath();
         stats[i] = tableDesc.getMeta().getStat();
-      }
-
-      if (scans[i].isLocal()) { // it only requires a dummy fragment.
-        fragments[i] = new Fragment(scans[i].getTableId(), tablePath,
-            CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV), 0, 0);
-      } else {
-        fragments[i] = subQuery.getStorageManager().getSplits(scans[i].getTableId(),
-                tableDesc.getMeta(), tablePath).get(0);
+        fragments[i] = storageManager.getSplits(scans[i].getTableName(),
+            tableDesc.getMeta(), tablePath).get(0);
       }
     }
 
     // Assigning either fragments or fetch urls to query units
     QueryUnit [] tasks;
-    if (scans[0].isBroadcast() || scans[1].isBroadcast()) {
+    boolean leftSmall = execBlock.isBroadcastTable(scans[0].getCanonicalName());
+    boolean rightSmall = execBlock.isBroadcastTable(scans[1].getCanonicalName());
+
+    if (leftSmall && rightSmall) {
+      LOG.info("[Distributed Join Strategy] : Immediate Two Way Join on Single Machine");
       tasks = new QueryUnit[1];
       tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
           false, subQuery.getEventHandler());
       tasks[0].setLogicalPlan(execBlock.getPlan());
-      tasks[0].setFragment(scans[0].getTableId(), fragments[0]);
-      tasks[0].setFragment(scans[1].getTableId(), fragments[1]);
+      tasks[0].setFragment(scans[0].getTableName(), fragments[0]);
+      tasks[0].setFragment(scans[1].getTableName(), fragments[1]);
+    } else if (leftSmall ^ rightSmall) {
+      LOG.info("[Distributed Join Strategy] : Broadcast Join");
+      int broadcastIdx = leftSmall ? 0 : 1;
+      int baseScanIdx = leftSmall ? 1 : 0;
+
+      LOG.info("Broadcasting Table Volume: " + stats[broadcastIdx].getNumBytes());
+      LOG.info("Base Table Volume: " + stats[baseScanIdx].getNumBytes());
+
+      tasks = createLeafTasksWithBroadcastTable(subQuery, baseScanIdx, fragments[broadcastIdx]);
     } else {
+      LOG.info("[Distributed Join Strategy] : Repartition Join");
       // The hash map is modeling as follows:
       // <Partition Id, <Table Name, Intermediate Data>>
-      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries =
-          new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
+      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries = new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
 
       // Grouping IntermediateData by a partition key and a table name
       for (ScanNode scan : scans) {
-        SubQuery childSubQuery = subQuery.getChildQuery(scan);
+        SubQuery childSubQuery = masterContext.getSubQuery(TajoIdUtils.createExecutionBlockId(scan.getTableName()));
         for (QueryUnit task : childSubQuery.getQueryUnits()) {
           if (task.getIntermediateData() != null) {
             for (IntermediateEntry intermEntry : task.getIntermediateData()) {
@@ -112,15 +137,15 @@ public class Repartitioner {
                 Map<String, List<IntermediateEntry>> tbNameToInterm =
                     hashEntries.get(intermEntry.getPartitionId());
 
-                if (tbNameToInterm.containsKey(scan.getTableId())) {
-                  tbNameToInterm.get(scan.getTableId()).add(intermEntry);
+                if (tbNameToInterm.containsKey(scan.getTableName())) {
+                  tbNameToInterm.get(scan.getTableName()).add(intermEntry);
                 } else {
-                  tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
+                  tbNameToInterm.put(scan.getTableName(), TUtil.newList(intermEntry));
                 }
               } else {
                 Map<String, List<IntermediateEntry>> tbNameToInterm =
                     new HashMap<String, List<IntermediateEntry>>();
-                tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
+                tbNameToInterm.put(scan.getTableName(), TUtil.newList(intermEntry));
                 hashEntries.put(intermEntry.getPartitionId(), tbNameToInterm);
               }
             }
@@ -174,6 +199,41 @@ public class Repartitioner {
     return tasks;
   }
 
+  private static QueryUnit [] createLeafTasksWithBroadcastTable(SubQuery subQuery, int baseScanId, Fragment broadcasted) throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    ScanNode[] scans = execBlock.getScanNodes();
+    Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
+    TableMeta meta;
+    Path inputPath;
+    ScanNode scan = scans[baseScanId];
+    TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getTableName());
+    inputPath = desc.getPath();
+    meta = desc.getMeta();
+
+    FileSystem fs = inputPath.getFileSystem(subQuery.getContext().getConf());
+    List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableName(), meta, inputPath);
+    QueryUnit queryUnit;
+    List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
+
+    int i = 0;
+    for (Fragment fragment : fragments) {
+      queryUnit = newQueryUnit(subQuery, i++, fragment);
+      queryUnit.setFragment2(broadcasted);
+      queryUnits.add(queryUnit);
+    }
+    return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
+  }
+
+  private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    QueryUnit unit = new QueryUnit(
+        QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+        subQuery.getEventHandler());
+    unit.setLogicalPlan(execBlock.getPlan());
+    unit.setFragment2(fragment);
+    return unit;
+  }
+
   private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit [] tasks = new QueryUnit[taskNum];
@@ -193,22 +253,22 @@ public class Repartitioner {
   private static void addJoinPartition(QueryUnit task, SubQuery subQuery, int partitionId,
                                        Map<String, List<IntermediateEntry>> grouppedPartitions) {
 
-    for (ScanNode scanNode : subQuery.getBlock().getScanNodes()) {
+    for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) {
       Map<String, List<IntermediateEntry>> requests;
-      if (grouppedPartitions.containsKey(scanNode.getTableId())) {
-          requests = mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
+      if (grouppedPartitions.containsKey(execBlock.getId().toString())) {
+          requests = mergeHashPartitionRequest(grouppedPartitions.get(execBlock.getId().toString()));
       } else {
         return;
       }
       Set<URI> fetchURIs = TUtil.newHashSet();
       for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
         Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
-            subQuery.getChildQuery(scanNode).getId(),
-            partitionId, PartitionType.HASH,
+            execBlock.getId(),
+            partitionId, HASH_PARTITION,
             requestPerNode.getValue());
         fetchURIs.addAll(uris);
       }
-      task.addFetches(scanNode.getTableId(), fetchURIs);
+      task.addFetches(execBlock.getId().toString(), fetchURIs);
     }
   }
 
@@ -233,23 +293,20 @@ public class Repartitioner {
     return mergedPartitions;
   }
 
-  public static QueryUnit [] createNonLeafTask(SubQuery subQuery,
-                                               SubQuery childSubQuery,
-                                               int maxNum)
+  public static QueryUnit [] createNonLeafTask(MasterPlan masterPlan, SubQuery subQuery, SubQuery childSubQuery,
+                                               DataChannel channel, int maxNum)
       throws InternalException {
-    ExecutionBlock childExecBlock = childSubQuery.getBlock();
-    if (childExecBlock.getPartitionType() == PartitionType.HASH) {
-      return createHashPartitionedTasks(subQuery, childSubQuery, maxNum);
-    } else if (childExecBlock.getPartitionType() == PartitionType.RANGE) {
-      return createRangePartitionedTasks(subQuery, childSubQuery, maxNum);
+    if (channel.getPartitionType() == HASH_PARTITION) {
+      return createHashPartitionedTasks(masterPlan, subQuery, childSubQuery, channel, maxNum);
+    } else if (channel.getPartitionType() == RANGE_PARTITION) {
+      return createRangePartitionedTasks(subQuery, childSubQuery, channel, maxNum);
     } else {
       throw new InternalException("Cannot support partition type");
     }
   }
 
   public static QueryUnit [] createRangePartitionedTasks(SubQuery subQuery,
-                                                         SubQuery childSubQuery,
-                                                         int maxNum)
+                                                         SubQuery childSubQuery, DataChannel channel, int maxNum)
       throws InternalException {
     ExecutionBlock execBlock = subQuery.getBlock();
     TableStat stat = childSubQuery.getTableStat();
@@ -259,19 +316,15 @@ public class Repartitioner {
 
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
+    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
 
-    StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan();
-    SortNode sort = (SortNode) store.getChild();
-    SortSpec[] sortSpecs = sort.getSortKeys();
-    Schema sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
+    SortNode sortNode = PlannerUtil.findTopNode(childSubQuery.getBlock().getPlan(), NodeType.SORT);
+    SortSpec [] sortSpecs = sortNode.getSortKeys();
+    Schema sortSchema = new Schema(channel.getPartitionKey());
 
     // calculate the number of maximum query ranges
-    TupleRange mergedRange =
-        TupleUtil.columnStatToRange(sort.getOutSchema(),
-            sortSchema, stat.getColumnStats());
-    RangePartitionAlgorithm partitioner =
-        new UniformRangePartition(sortSchema, mergedRange);
+    TupleRange mergedRange = TupleUtil.columnStatToRange(channel.getSchema(), sortSchema, stat.getColumnStats());
+    RangePartitionAlgorithm partitioner = new UniformRangePartition(sortSchema, mergedRange);
     BigDecimal card = partitioner.getTotalCardinality();
 
     // if the number of the range cardinality is less than the desired number of tasks,
@@ -289,14 +342,12 @@ public class Repartitioner {
         " sub ranges (total units: " + determinedTaskNum + ")");
     TupleRange [] ranges = partitioner.partition(determinedTaskNum);
 
-    Fragment dummyFragment = new Fragment(scan.getTableId(), tablePath,
+    Fragment dummyFragment = new Fragment(scan.getTableName(), tablePath,
         CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),0, 0);
 
     List<String> basicFetchURIs = new ArrayList<String>();
 
-    SubQuery child = childSubQuery.getContext().getSubQuery(
-        subQuery.getBlock().getChildBlock(scan).getId());
-    for (QueryUnit qu : child.getQueryUnits()) {
+    for (QueryUnit qu : childSubQuery.getQueryUnits()) {
       for (IntermediateEntry p : qu.getIntermediateData()) {
         String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(),
             childSubQuery.getId(), p.taskId, p.attemptId);
@@ -330,7 +381,7 @@ public class Repartitioner {
     }
 
     QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, dummyFragment);
-    assignPartitionByRoundRobin(map, scan.getTableId(), tasks);
+    assignPartitionByRoundRobin(map, scan.getTableName(), tasks);
     return tasks;
   }
 
@@ -367,50 +418,58 @@ public class Repartitioner {
     return sb.toString();
   }
 
-  public static QueryUnit [] createHashPartitionedTasks(SubQuery subQuery,
-                                                 SubQuery childSubQuery,
-                                                 int maxNum) {
+  public static QueryUnit [] createHashPartitionedTasks(MasterPlan masterPlan, SubQuery subQuery,
+                                                 SubQuery childSubQuery, DataChannel channel, int maxNum) {
     ExecutionBlock execBlock = subQuery.getBlock();
-    TableStat stat = childSubQuery.getTableStat();
-    if (stat.getNumRows() == 0) {
+
+    List<TableStat> tableStats = new ArrayList<TableStat>();
+    List<ExecutionBlock> childBlocks = masterPlan.getChilds(subQuery.getId());
+    for (ExecutionBlock childBlock : childBlocks) {
+      SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
+      tableStats.add(childExecSM.getTableStat());
+    }
+    TableStat totalStat = StatisticsUtil.computeStatFromUnionBlock(tableStats);
+
+    if (totalStat.getNumRows() == 0) {
       return new QueryUnit[0];
     }
 
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
+    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
 
-    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-    for (QueryUnit tasks : childSubQuery.getQueryUnits()) {
-      if (tasks.getIntermediateData() != null) {
-        partitions.addAll(tasks.getIntermediateData());
-      }
-    }
 
-    Fragment frag = new Fragment(scan.getTableId(), tablePath,
+    Fragment frag = new Fragment(scan.getTableName(), tablePath,
         CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV), 0, 0);
 
-    Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
+
     Map<String, List<IntermediateEntry>> hashedByHost;
     Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
 
-    for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
-      hashedByHost = hashByHost(interm.getValue());
-      for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
-        Collection<URI> uris = createHashFetchURL(e.getKey(), childSubQuery.getId(),
-            interm.getKey(),
-            childSubQuery.getBlock().getPartitionType(), e.getValue());
-
-        if (finalFetchURI.containsKey(interm.getKey())) {
-          finalFetchURI.get(interm.getKey()).addAll(uris);
-        } else {
-          finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
+    for (ExecutionBlock block : childBlocks) {
+      List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+      for (QueryUnit tasks : subQuery.getContext().getSubQuery(block.getId()).getQueryUnits()) {
+        if (tasks.getIntermediateData() != null) {
+          partitions.addAll(tasks.getIntermediateData());
+        }
+      }
+      Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
+      for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
+        hashedByHost = hashByHost(interm.getValue());
+        for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
+          Collection<URI> uris = createHashFetchURL(e.getKey(), block.getId(),
+              interm.getKey(), channel.getPartitionType(), e.getValue());
+
+          if (finalFetchURI.containsKey(interm.getKey())) {
+            finalFetchURI.get(interm.getKey()).addAll(uris);
+          } else {
+            finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
+          }
         }
       }
     }
 
-    GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getStoreTableNode().
-        getChild();
+    GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getPlan();
     // the number of tasks cannot exceed the number of merged fetch uris.
     int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
     if (groupby.getGroupingColumns().length == 0) {
@@ -422,7 +481,7 @@ public class Repartitioner {
     int tid = 0;
     for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
       for (URI uri : entry.getValue()) {
-        tasks[tid].addFetch(scan.getTableId(), uri);
+        tasks[tid].addFetch(scan.getTableName(), uri);
       }
 
       tid ++;
@@ -436,8 +495,7 @@ public class Repartitioner {
   }
 
   public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
-                                       int partitionId, PartitionType type,
-                                       List<IntermediateEntry> entries) {
+                                       int partitionId, PartitionType type, List<IntermediateEntry> entries) {
     String scheme = "http://";
     StringBuilder urlPrefix = new StringBuilder(scheme);
     urlPrefix.append(hostAndPort).append("/?")
@@ -445,9 +503,9 @@ public class Repartitioner {
         .append("&sid=").append(ebid.getId())
         .append("&p=").append(partitionId)
         .append("&type=");
-    if (type == PartitionType.HASH) {
+    if (type == HASH_PARTITION) {
       urlPrefix.append("h");
-    } else if (type == PartitionType.RANGE) {
+    } else if (type == RANGE_PARTITION) {
       urlPrefix.append("r");
     }
     urlPrefix.append("&ta=");
@@ -536,31 +594,35 @@ public class Repartitioner {
     return hashed;
   }
 
-  public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
+  public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n, DataChannel channel) {
     ExecutionBlock execBlock = subQuery.getBlock();
     Column[] keys = null;
     // if the next query is join,
     // set the partition number for the current logicalUnit
     // TODO: the union handling is required when a join has unions as its child
+    MasterPlan masterPlan = subQuery.getMasterPlan();
     ExecutionBlock parentBlock = execBlock.getParentBlock();
     if (parentBlock != null) {
       if (parentBlock.getStoreTableNode().getChild().getType() == NodeType.JOIN) {
         execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
             execBlock.getStoreTableNode().getPartitionKeys(), n);
         keys = execBlock.getStoreTableNode().getPartitionKeys();
+
+        masterPlan.getOutgoingChannels(subQuery.getId()).iterator().next()
+            .setPartition(execBlock.getPartitionType(), execBlock.getStoreTableNode().getPartitionKeys(), n);
       }
     }
 
-    StoreTableNode store = execBlock.getStoreTableNode();
+
     // set the partition number for group by and sort
-    if (execBlock.getPartitionType() == PartitionType.HASH) {
-      if (store.getChild().getType() == NodeType.GROUP_BY) {
-        GroupbyNode groupby = (GroupbyNode)store.getChild();
+    if (channel.getPartitionType() == HASH_PARTITION) {
+      if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
+        GroupbyNode groupby = (GroupbyNode) execBlock.getPlan();
         keys = groupby.getGroupingColumns();
       }
-    } else if (execBlock.getPartitionType() == PartitionType.RANGE) {
-      if (store.getChild().getType() == NodeType.SORT) {
-        SortNode sort = (SortNode)store.getChild();
+    } else if (channel.getPartitionType() == RANGE_PARTITION) {
+      if (execBlock.getPlan().getType() == NodeType.SORT) {
+        SortNode sort = (SortNode) execBlock.getPlan();
         keys = new Column[sort.getSortKeys().length];
         for (int i = 0; i < keys.length; i++) {
           keys[i] = sort.getSortKeys()[i].getSortKey();
@@ -569,12 +631,10 @@ public class Repartitioner {
     }
     if (keys != null) {
       if (keys.length == 0) {
-        store.setPartitions(execBlock.getPartitionType(), new Column[]{}, 1);
+        channel.setPartition(execBlock.getPartitionType(), new Column[]{}, 1);
       } else {
-        store.setPartitions(execBlock.getPartitionType(), keys, n);
+        channel.setPartition(execBlock.getPartitionType(), keys, n);
       }
-    } else {
-      store.setListPartition();
     }
     return subQuery;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 1bf45ee..01d7e46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -30,23 +30,29 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.ColumnStat;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.master.*;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.TaskScheduler;
+import org.apache.tajo.master.TaskSchedulerImpl;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.Fragment;
@@ -68,10 +74,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private static final Log LOG = LogFactory.getLog(SubQuery.class);
 
-  private QueryContext queryContext;
+  private MasterPlan masterPlan;
   private ExecutionBlock block;
   private int priority;
   private TableMeta meta;
+  private TableStat statistics;
   private EventHandler eventHandler;
   private final AbstractStorageManager sm;
   private TaskSchedulerImpl taskScheduler;
@@ -135,8 +142,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private int completedTaskCount = 0;
 
-  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock block, AbstractStorageManager sm) {
+  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
     this.context = context;
+    this.masterPlan = masterPlan;
     this.block = block;
     this.sm = sm;
     this.eventHandler = context.getEventHandler();
@@ -156,6 +164,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return context;
   }
 
+  public MasterPlan getMasterPlan() {
+    return masterPlan;
+  }
+
+  public DataChannel getDataChannel() {
+    return masterPlan.getOutgoingChannels(getId()).iterator().next();
+  }
+
   public EventHandler getEventHandler() {
     return eventHandler;
   }
@@ -234,10 +250,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return sm;
   }
   
-  public SubQuery getChildQuery(ScanNode scanForChild) {
-    return context.getSubQuery(block.getChildBlock(scanForChild).getId());
-  }
-  
   public ExecutionBlockId getId() {
     return block.getId();
   }
@@ -250,17 +262,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return tasks.get(qid);
   }
 
-  public void setTableMeta(TableMeta meta) {
-    this.meta = meta;
-  }
-
   @SuppressWarnings("UnusedDeclaration")
   public TableMeta getTableMeta() {
     return meta;
   }
 
   public TableStat getTableStat() {
-    return this.meta.getStat();
+    return statistics;
   }
 
   public String toString() {
@@ -296,17 +304,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  private static TableStat computeStatFromUnionBlock(SubQuery unit) {
+  public static TableStat computeStatFromUnionBlock(SubQuery subQuery) {
     TableStat stat = new TableStat();
     TableStat childStat;
     long avgRows = 0, numBytes = 0, numRows = 0;
     int numBlocks = 0, numPartitions = 0;
     List<ColumnStat> columnStats = Lists.newArrayList();
 
-    Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
+    MasterPlan masterPlan = subQuery.getMasterPlan();
+    Iterator<ExecutionBlock> it = masterPlan.getChilds(subQuery.getBlock()).iterator();
     while (it.hasNext()) {
       ExecutionBlock block = it.next();
-      SubQuery childSubQuery = unit.context.getSubQuery(block.getId());
+      SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId());
       childStat = childSubQuery.getTableStat();
       avgRows += childStat.getAvgRows();
       columnStats.addAll(childStat.getColumnStats());
@@ -325,22 +334,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return stat;
   }
 
-  public TableMeta buildTableMeta() throws IOException {
-    finishTime = context.getClock().getTime();
-
-    TableStat stat;
-    if (block.hasUnion()) {
-      stat = computeStatFromUnionBlock(this);
-    } else {
-      stat = computeStatFromTasks();
-    }
-
-    StoreTableNode storeTableNode = getBlock().getStoreTableNode();
-    TableMeta meta = toTableMeta(storeTableNode);
-    meta.setStat(stat);
-    return meta;
-  }
-
   private TableStat computeStatFromTasks() {
     List<TableStat> stats = Lists.newArrayList();
     for (QueryUnit unit : getQueryUnits()) {
@@ -350,26 +343,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return tableStat;
   }
 
-  private TableMeta writeStat(SubQuery subQuery, TableStat stat)
-      throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    StoreTableNode storeTableNode = execBlock.getStoreTableNode();
-    TableMeta meta = toTableMeta(storeTableNode);
-    meta.setStat(stat);
-    //sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
-    return meta;
-  }
-
-  private static TableMeta toTableMeta(StoreTableNode store) {
-    if (store.hasOptions()) {
-      return CatalogUtil.newTableMeta(store.getOutSchema(),
-          store.getStorageType(), store.getOptions());
-    } else {
-      return CatalogUtil.newTableMeta(store.getOutSchema(),
-          store.getStorageType());
-    }
-  }
-
   private void stopScheduler() {
     // If there are launched TaskRunners, send the 'shouldDie' message to all r
     // via received task requests.
@@ -385,15 +358,19 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   }
 
   private void finish() {
-    TableMeta meta = null;
-    try {
-      meta = buildTableMeta();
-    } catch (IOException e) {
-      e.printStackTrace();
+    TableStat stat;
+    if (block.hasUnion()) {
+      stat = computeStatFromUnionBlock(this);
+    } else {
+      stat = computeStatFromTasks();
     }
 
-    setTableMeta(meta);
+    DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
+    meta = CatalogUtil.newTableMeta(channel.getSchema(), CatalogProtos.StoreType.CSV, new Options());
+    meta.setStat(stat);
+    statistics = stat;
     setFinishTime();
+
     eventHandler.handle(new SubQuerySucceeEvent(getId(), meta));
   }
 
@@ -445,7 +422,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           subQuery.finish();
           state = SubQueryState.SUCCEEDED;
         } else {
-          setRepartitionIfNecessary(subQuery);
+          DataChannel channel = subQuery.getMasterPlan().getOutgoingChannels(subQuery.getId()).get(0);
+          setRepartitionIfNecessary(subQuery, channel);
           createTasks(subQuery);
 
           if (subQuery.tasks.size() == 0) { // if there is no tasks
@@ -479,10 +457,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
      * If a parent block requires a repartition operation, the method sets proper repartition
      * methods and the number of partitions to a given subquery.
      */
-    private static void setRepartitionIfNecessary(SubQuery subQuery) {
+    private static void setRepartitionIfNecessary(SubQuery subQuery, DataChannel channel) {
       if (subQuery.getBlock().hasParentBlock()) {
-        int numTasks = calculatePartitionNum(subQuery);
-        Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
+        int numTasks = calculatePartitionNum(subQuery, channel);
+        Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks, channel);
       }
     }
 
@@ -493,14 +471,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
      * @param subQuery
      * @return
      */
-    public static int calculatePartitionNum(SubQuery subQuery) {
+    public static int calculatePartitionNum(SubQuery subQuery, DataChannel channel) {
       TajoConf conf = subQuery.context.getConf();
       ExecutionBlock parent = subQuery.getBlock().getParentBlock();
 
       GroupbyNode grpNode = null;
       if (parent != null) {
-        grpNode = (GroupbyNode) PlannerUtil.findTopNode(
-            parent.getPlan(), NodeType.GROUP_BY);
+        grpNode = PlannerUtil.findTopNode(parent.getPlan(), NodeType.GROUP_BY);
       }
 
       // Is this subquery the first step of join?
@@ -509,11 +486,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
         // for inner
         ExecutionBlock outer = child.next();
-        long outerVolume = getInputVolume(subQuery.context, outer);
+        long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer);
 
         // for inner
         ExecutionBlock inner = child.next();
-        long innerVolume = getInputVolume(subQuery.context, inner);
+        long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
         LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
         LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
 
@@ -533,7 +510,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         if (grpNode.getGroupingColumns().length == 0) {
           return 1;
         } else {
-          long volume = getInputVolume(subQuery.context, subQuery.block);
+          long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
 
           int mb = (int) Math.ceil((double)volume / 1048576);
           LOG.info("Table's volume is approximately " + mb + " MB");
@@ -545,7 +522,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         }
       } else {
         LOG.info("============>>>>> Unexpected Case! <<<<<================");
-        long volume = getInputVolume(subQuery.context, subQuery.block);
+        long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
 
         int mb = (int) Math.ceil((double)volume / 1048576);
         LOG.info("Table's volume is approximately " + mb + " MB");
@@ -557,9 +534,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
 
     private static void createTasks(SubQuery subQuery) throws IOException {
+      MasterPlan masterPlan = subQuery.getMasterPlan();
       ExecutionBlock execBlock = subQuery.getBlock();
       QueryUnit [] tasks;
-      if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+      if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
         tasks = createLeafTasks(subQuery);
 
       } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
@@ -567,9 +545,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
       } else { // Case 3: Others (Sort or Aggregation)
         int numTasks = getNonLeafTaskNum(subQuery);
-        ExecutionBlockId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId();
+        ExecutionBlockId childId = masterPlan.getChilds(subQuery.getBlock()).get(0).getId();
         SubQuery child = subQuery.context.getSubQuery(childId);
-        tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
+        DataChannel channel = masterPlan.getChannel(child.getId(), subQuery.getId());
+        tasks = Repartitioner.createNonLeafTask(masterPlan, subQuery, child, channel, numTasks);
       }
 
       LOG.info("Create " + tasks.length + " Tasks");
@@ -587,7 +566,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
      */
     public static int getNonLeafTaskNum(SubQuery subQuery) {
       // Getting intermediate data size
-      long volume = getInputVolume(subQuery.context, subQuery.getBlock());
+      long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock());
 
       int mb = (int) Math.ceil((double)volume / 1048576);
       LOG.info("Table's volume is approximately " + mb + " MB");
@@ -597,15 +576,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       return maxTaskNum;
     }
 
-    public static long getInputVolume(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock execBlock) {
+    public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock execBlock) {
       Map<String, TableDesc> tableMap = context.getTableDescMap();
-      if (execBlock.isLeafBlock()) {
+      if (masterPlan.isLeaf(execBlock)) {
         ScanNode outerScan = execBlock.getScanNodes()[0];
         TableStat stat = tableMap.get(outerScan.getFromTable().getTableName()).getMeta().getStat();
         return stat.getNumBytes();
       } else {
         long aggregatedVolume = 0;
-        for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
+        for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
           SubQuery subquery = context.getSubQuery(childBlock.getId());
           aggregatedVolume += subquery.getTableStat().getNumBytes();
         }
@@ -650,7 +629,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       meta = desc.getMeta();
 
       // TODO - should be change the inner directory
-      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
+      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableName(), meta, inputPath);
 
       QueryUnit queryUnit;
       List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 0286afa..29f44eb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -202,7 +202,8 @@ public class TajoWorkerManagerService extends CompositeService
       QueryId queryId = new QueryId(request.getQueryId());
       LOG.info("Receive executeQuery request:" + queryId);
       queryMaster.handle(new QueryStartEvent(queryId,
-          new QueryContext(request.getQueryContext()), request.getLogicalPlanJson().getValue()));
+          new QueryContext(request.getQueryContext()), request.getSql().getValue(),
+          request.getLogicalPlanJson().getValue()));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 915234f..63d8f04 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
@@ -48,7 +49,6 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
 import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.Fragment;
@@ -150,13 +150,15 @@ public class Task {
     this.context = new TaskAttemptContext(systemConf, taskId,
         request.getFragments().toArray(new Fragment[request.getFragments().size()]),
         taskDir);
+    this.context.setDataChannel(request.getDataChannel());
+
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
     interQuery = request.getProto().getInterQuery();
     if (interQuery) {
       context.setInterQuery();
       StoreTableNode store = (StoreTableNode) plan;
       this.partitionType = store.getPartitionType();
-      if (partitionType == PartitionType.RANGE) {
+      if (partitionType == PartitionType.RANGE_PARTITION) {
         SortNode sortNode = (SortNode) store.getChild();
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
@@ -180,12 +182,12 @@ public class Task {
 
     LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
     for (Fragment f: request.getFragments()) {
-      LOG.info("==> Table Id:" + f.getName() + ", path:" + f.getPath() + "(" + f.getMeta().getStoreType() + "), " +
+      LOG.info("Table Id:" + f.getName() + ", path:" + f.getPath() + "(" + f.getMeta().getStoreType() + "), " +
           "(start:" + f.getStartOffset() + ", length: " + f.getLength() + ")");
     }
     LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
     for (Fetch f : request.getFetches()) {
-      LOG.info("==> Table Id: " + f.getName() + ", url: " + f.getUrls());
+      LOG.info("Table Id: " + f.getName() + ", url: " + f.getUrls());
     }
     LOG.info("* Local task dir: " + taskDir);
     if(LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 94bacdf..1726c7b 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -60,6 +60,7 @@ message QueryUnitRequestProto {
     repeated Fetch fetches = 7;
     optional bool shouldDie = 8;
     optional KeyValueSetProto queryContext = 9;
+    optional DataChannelProto dataChannel = 10;
 }
 
 message Fetch {
@@ -106,7 +107,8 @@ message Partition {
 message QueryExecutionRequestProto {
     required QueryIdProto queryId = 1;
     required KeyValueSetProto queryContext = 2;
-    required StringProto logicalPlanJson = 3;
+    required StringProto sql = 3;
+    optional StringProto logicalPlanJson = 4;
 }
 
 message GetTaskRequestProto {
@@ -114,6 +116,34 @@ message GetTaskRequestProto {
     required ExecutionBlockIdProto executionBlockId = 2;
 }
 
+enum PartitionType {
+  NONE_PARTITION = 0;
+  LIST_PARTITION = 1;
+  HASH_PARTITION = 2;
+  RANGE_PARTITION = 3;
+}
+
+enum TransmitType {
+  PUSH_TRANSMIT = 0;
+  PULL_TRANSMIT = 1;
+  FILE_WRITE = 2;
+}
+
+message DataChannelProto {
+  required ExecutionBlockIdProto srcId = 1;
+  required ExecutionBlockIdProto targetId = 2;
+  required TransmitType transmitType = 3 [default = PULL_TRANSMIT];
+  required PartitionType partitionType = 4;
+
+  optional SchemaProto schema = 5;
+
+  repeated ColumnProto partitionKey = 7;
+  optional int32 partitionNum = 8 [default = 1];
+  repeated SortSpecProto sortSpecs = 9;
+
+  optional StoreType storeType = 10 [default = CSV];
+}
+
 message RunExecutionBlockRequestProto {
     required string executionBlockId = 1;
     required string queryMasterHost = 2;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
index eaaafe5..c0718a8 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -1,33 +1,45 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="java.util.*" %>
-<%@ page import="java.net.InetSocketAddress" %>
-<%@ page import="java.net.InetAddress"  %>
-<%@ page import="org.apache.hadoop.conf.Configuration" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
-<%@ page import="org.apache.tajo.master.*" %>
-<%@ page import="org.apache.tajo.master.rm.*" %>
-<%@ page import="org.apache.tajo.catalog.*" %>
-<%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="org.apache.tajo.QueryId" %>
+<%@ page import="org.apache.tajo.master.querymaster.Query" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask"  %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryUnit" %>
+<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
-<%@ page import="org.apache.tajo.master.querymaster.*" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="java.text.SimpleDateFormat" %>
+<%@ page import="java.util.Collection" %>
 
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
 
 <html>
 <head>
-  <link rel="stylesheet" type = "text/css" href = "./style.css" />
+  <link rel="stylesheet" type="text/css" href="./style.css"/>
   <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
   <title>Query Detail Info</title>
-<%
+  <%
   QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId"));
   TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
   QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
           .getTajoWorkerManagerService().getQueryMaster().getQueryMasterTask(queryId);
 
   Query query = queryMasterTask.getQuery();
+  %>
+  <h1><% out.write(queryId.toString()); %></h1>
+  <h2>Logical Plan</h2>
+  <pre>
+  <%
+    out.write(query.getPlan().getLogicalPlan().toString());
+  %>
+  </pre>
+  <h2>Distributed Query Plan</h2>
+  <pre>
+  <%
+    out.write(query.getPlan().toString());
+  %>
+  </pre>
+  <%
   Collection<SubQuery> subQueries = query.getSubQueries();
 
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -40,7 +52,6 @@
 <%
     QueryUnit[] queryUnits = eachSubQuery.getQueryUnits();
     for(QueryUnit eachQueryUnit: queryUnits) {
-      //QueryUnitAttempt queryUnitAttempt = eachQueryUnit.getSuccessfulAttempt();
 %>
       <tr>
         <td><%=eachQueryUnit.getId()%></td>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 2af7fb8..40fc45f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -30,6 +30,8 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.sql.ResultSet;
@@ -41,6 +43,22 @@ public class LocalTajoTestingUtility {
   private TajoConf conf;
   private TajoClient client;
 
+  public static QueryUnitAttemptId newQueryUnitAttemptId() {
+    return QueryIdFactory.newQueryUnitAttemptId(
+        QueryIdFactory.newQueryUnitId(new MasterPlan(newQueryId(), null, null).newExecutionBlockId()), 0);
+  }
+  public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
+    return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);
+  }
+
+  /**
+   * for test
+   * @return
+   */
+  public synchronized static QueryId newQueryId() {
+    return QueryIdFactory.newQueryId(TajoIdUtils.MASTER_ID_FORMAT.format(0));
+  }
+
   public void setup(String[] names,
                     String[] tablepaths,
                     Schema[] schemas,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
index 7b82952..912400b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo;
 
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -31,23 +32,25 @@ public class TestQueryIdFactory {
 
   @Test
   public void testNewQueryId() {
-    QueryId qid1 = QueryIdFactory.newQueryId();
-    QueryId qid2 = QueryIdFactory.newQueryId();
+    QueryId qid1 = LocalTajoTestingUtility.newQueryId();
+    QueryId qid2 = LocalTajoTestingUtility.newQueryId();
     assertTrue(qid1.compareTo(qid2) < 0);
   }
   
   @Test
   public void testNewSubQueryId() {
-    QueryId qid = QueryIdFactory.newQueryId();
-    ExecutionBlockId subqid1 = QueryIdFactory.newExecutionBlockId(qid);
-    ExecutionBlockId subqid2 = QueryIdFactory.newExecutionBlockId(qid);
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    MasterPlan plan = new MasterPlan(qid, null, null);
+    ExecutionBlockId subqid1 = plan.newExecutionBlockId();
+    ExecutionBlockId subqid2 = plan.newExecutionBlockId();
     assertTrue(subqid1.compareTo(subqid2) < 0);
   }
   
   @Test
   public void testNewQueryUnitId() {
-    QueryId qid = QueryIdFactory.newQueryId();
-    ExecutionBlockId subid = QueryIdFactory.newExecutionBlockId(qid);
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    MasterPlan plan = new MasterPlan(qid, null, null);
+    ExecutionBlockId subid = plan.newExecutionBlockId();
     QueryUnitId quid1 = QueryIdFactory.newQueryUnitId(subid);
     QueryUnitId quid2 = QueryIdFactory.newQueryUnitId(subid);
     assertTrue(quid1.compareTo(quid2) < 0);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
index 1997159..1929fb3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
@@ -20,6 +20,7 @@ package org.apache.tajo;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.util.TajoIdUtils;
 import org.junit.Test;
 
@@ -121,12 +122,12 @@ public class TestTajoIds {
   
   @Test
   public void testConstructFromString() {
-//    QueryIdFactory.reset();
-    QueryId qid1 = QueryIdFactory.newQueryId();
+    QueryId qid1 = LocalTajoTestingUtility.newQueryId();
     QueryId qid2 = TajoIdUtils.parseQueryId(qid1.toString());
     assertEquals(qid1, qid2);
 
-    ExecutionBlockId sub1 = QueryIdFactory.newExecutionBlockId(qid1);
+    MasterPlan plan1 = new MasterPlan(qid1, null, null);
+    ExecutionBlockId sub1 = plan1.newExecutionBlockId();
     ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
     assertEquals(sub1, sub2);
     
@@ -141,12 +142,12 @@ public class TestTajoIds {
 
   @Test
   public void testConstructFromPB() {
-//    QueryIdFactory.reset();
-    QueryId qid1 = QueryIdFactory.newQueryId();
+    QueryId qid1 = LocalTajoTestingUtility.newQueryId();
     QueryId qid2 = new QueryId(qid1.getProto());
     assertEquals(qid1, qid2);
 
-    ExecutionBlockId sub1 = QueryIdFactory.newExecutionBlockId(qid1);
+    MasterPlan plan = new MasterPlan(qid1, null, null);
+    ExecutionBlockId sub1 = plan.newExecutionBlockId();
     ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
     assertEquals(sub1, sub2);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index 75a531f..a6fb17a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -168,6 +168,12 @@ public class TestSQLAnalyzer {
   }
 
   @Test
+  public void testSet4() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/set_4.sql"));
+    System.out.println(parseQuery(sql));
+  }
+
+  @Test
   public void testDropTable() throws IOException {
     String sql = FileUtil.readTextFile(new File("src/test/queries/drop_table.sql"));
     parseQuery(sql);
@@ -240,6 +246,12 @@ public class TestSQLAnalyzer {
   }
 
   @Test
+  public void testTableSubQuery2() throws IOException {
+    String sql = FileUtil.readTextFile(new File("src/test/queries/table_subquery2.sql"));
+    parseQuery(sql);
+  }
+
+  @Test
   public void testInsertIntoTable() throws IOException {
     String sql = FileUtil.readTextFile(new File("src/test/queries/insert_into_select_1.sql"));
     parseQuery(sql);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
deleted file mode 100644
index 66060ce..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.plan.global;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.TestEvalTree.TestSum;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
-import org.apache.tajo.master.GlobalPlanner;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.storage.*;
-import org.apache.zookeeper.KeeperException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import static org.junit.Assert.*;
-
-public class TestGlobalQueryPlanner {
-
-  private static TajoTestingCluster util;
-  private static TajoConf conf;
-  private static CatalogService catalog;
-  private static GlobalPlanner planner;
-  private static Schema schema;
-  private static SQLAnalyzer analyzer;
-  private static LogicalPlanner logicalPlanner;
-  private static LogicalOptimizer optimizer;
-  private static QueryId queryId;
-  private static AbstractStorageManager sm;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    util = new TajoTestingCluster();
-    util.startCatalogCluster();
-
-    int i, j;
-
-    schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
-    schema.addColumn("salary", Type.INT4);
-
-    TableMeta meta;
-
-    conf = new TajoConf(util.getConfiguration());
-    catalog = util.getMiniCatalogCluster().getCatalog();
-    for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      catalog.registerFunction(funcDesc);
-    }
-
-    sm = StorageManagerFactory.getStorageManager(util.getConfiguration());
-    FunctionDesc funcDesc = new FunctionDesc("sumtest", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4),
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4));
-    catalog.registerFunction(funcDesc);
-    FileSystem fs = sm.getFileSystem();
-
-    AsyncDispatcher dispatcher = new AsyncDispatcher();
-    dispatcher.init(conf);
-    dispatcher.start();
-
-    planner = new GlobalPlanner(conf, sm,
-        dispatcher.getEventHandler());
-    analyzer = new SQLAnalyzer();
-    logicalPlanner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
-
-    int tbNum = 2;
-    int tupleNum;
-    Appender appender;
-    Tuple t = new VTuple(4);
-    t.put(new Datum[] {
-        DatumFactory.createInt4(1), DatumFactory.createInt4(32),
-        DatumFactory.createText("h"), DatumFactory.createInt4(10)});
-
-    for (i = 0; i < tbNum; i++) {
-      meta = CatalogUtil.newTableMeta((Schema) schema.clone(), StoreType.CSV);
-      meta.putOption(CSVFile.DELIMITER, ",");
-
-      Path dataRoot = sm.getBaseDir();
-      Path tablePath = StorageUtil.concatPath(dataRoot, "table"+i, "file.csv");
-      if (fs.exists(tablePath.getParent())) {
-        fs.delete(tablePath.getParent(), true);
-      }
-      fs.mkdirs(tablePath.getParent());
-      appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
-      appender.init();
-      tupleNum = 100;
-      for (j = 0; j < tupleNum; j++) {
-        appender.addTuple(t);
-      }
-      appender.close();
-
-      TableDesc desc = CatalogUtil.newTableDesc("table" + i, (TableMeta) meta.clone(), tablePath);
-      catalog.addTable(desc);
-    }
-
-    queryId = QueryIdFactory.newQueryId();
-    dispatcher.stop();
-  }
-
-  @AfterClass
-  public static void terminate() throws IOException {
-    util.shutdownCatalogCluster();
-  }
-  
-  @Test
-  public void testScan() throws IOException, PlanningException {
-    Expr context = analyzer.parse(
-        "select age, sumtest(salary) from table0");
-
-    LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-
-    MasterPlan globalPlan = planner.build(queryId,
-        (LogicalRootNode) rootNode);
-
-    ExecutionBlock unit = globalPlan.getRoot();
-    assertFalse(unit.hasChildBlock());
-    assertEquals(PartitionType.LIST, unit.getPartitionType());
-
-    LogicalNode plan2 = unit.getPlan();
-    assertEquals(NodeType.STORE, plan2.getType());
-    assertEquals(NodeType.SCAN, ((StoreTableNode)plan2).getChild().getType());
-  }
-
-  @Test
-  public void testGroupby() throws IOException, KeeperException,
-      InterruptedException, PlanningException {
-    Expr context = analyzer.parse(
-        "create table store1 as select age, sumtest(salary) from table0 group by age");
-    LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
-
-    ExecutionBlock next, prev;
-    
-    next = globalPlan.getRoot();
-    assertTrue(next.hasChildBlock());
-    assertEquals(PartitionType.LIST, next.getPartitionType());
-    for (ScanNode scan : next.getScanNodes()) {
-      assertTrue(scan.isLocal());
-    }
-    assertFalse(next.getStoreTableNode().isLocal());
-    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
-    
-    prev = it.next();
-    assertFalse(prev.hasChildBlock());
-    assertEquals(PartitionType.HASH, prev.getPartitionType());
-    assertTrue(prev.getStoreTableNode().isLocal());
-    assertFalse(it.hasNext());
-    
-    ScanNode []scans = prev.getScanNodes();
-    assertEquals(1, scans.length);
-    assertEquals("table0", scans[0].getTableId());
-    assertFalse(scans[0].isLocal());
-    
-    scans = next.getScanNodes();
-    assertEquals(1, scans.length);
-    StoreTableNode store = prev.getStoreTableNode();
-    assertEquals(store.getTableName(), scans[0].getTableId());
-    assertEquals(store.getOutSchema(), scans[0].getInSchema());
-  }
-  
-  @Test
-  public void testSort() throws IOException, PlanningException {
-    Expr context = analyzer.parse(
-        "create table store1 as select age from table0 order by age");
-    LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
-
-    ExecutionBlock next, prev;
-    
-    next = globalPlan.getRoot();
-    assertEquals(NodeType.PROJECTION,
-        next.getStoreTableNode().getChild().getType());
-    assertTrue(next.hasChildBlock());
-    assertEquals(PartitionType.LIST, next.getPartitionType());
-    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
-
-    prev = it.next();
-    assertEquals(NodeType.SORT,
-        prev.getStoreTableNode().getChild().getType());
-    assertTrue(prev.hasChildBlock());
-    assertEquals(PartitionType.LIST, prev.getPartitionType());
-    it= prev.getChildBlocks().iterator();
-    next = prev;
-    
-    prev = it.next();
-    assertFalse(prev.hasChildBlock());
-    assertEquals(PartitionType.RANGE, prev.getPartitionType());
-    assertFalse(it.hasNext());
-    
-    ScanNode []scans = prev.getScanNodes();
-    assertEquals(1, scans.length);
-    assertEquals("table0", scans[0].getTableId());
-    
-    scans = next.getScanNodes();
-    assertEquals(1, scans.length);
-    StoreTableNode store = prev.getStoreTableNode();
-    assertEquals(store.getTableName(), scans[0].getTableId());
-    assertEquals(store.getOutSchema(), scans[0].getInSchema());
-  }
-  
-  @Test
-  public void testJoin() throws IOException, PlanningException {
-    Expr expr = analyzer.parse(
-        "select table0.age,table0.salary,table1.salary from table0,table1 " +
-            "where table0.salary = table1.salary order by table0.age");
-    LogicalPlan plan = logicalPlanner.createPlan(expr);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-
-    MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
-
-    ExecutionBlock next, prev;
-    
-    // the second phase of the sort
-    next = globalPlan.getRoot();
-    assertTrue(next.hasChildBlock());
-    assertEquals(PartitionType.LIST, next.getPartitionType());
-    assertEquals(NodeType.PROJECTION, next.getStoreTableNode().getChild().getType());
-    ScanNode []scans = next.getScanNodes();
-    assertEquals(1, scans.length);
-    Iterator<ExecutionBlock> it= next.getChildBlocks().iterator();
-
-    prev = it.next();
-    assertEquals(NodeType.SORT, prev.getStoreTableNode().getChild().getType());
-    assertEquals(PartitionType.LIST, prev.getPartitionType());
-    scans = prev.getScanNodes();
-    assertEquals(1, scans.length);
-    it= prev.getChildBlocks().iterator();
-    
-    // the first phase of the sort
-    prev = it.next();
-    assertEquals(NodeType.SORT, prev.getStoreTableNode().getChild().getType());
-    assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
-    assertTrue(prev.hasChildBlock());
-    assertEquals(PartitionType.RANGE, prev.getPartitionType());
-    assertFalse(it.hasNext());
-    scans = prev.getScanNodes();
-    assertEquals(1, scans.length);
-    next = prev;
-    it= next.getChildBlocks().iterator();
-    
-    // the second phase of the join
-    prev = it.next();
-    assertEquals(NodeType.JOIN, prev.getStoreTableNode().getChild().getType());
-    assertEquals(scans[0].getInSchema(), prev.getOutputSchema());
-    assertTrue(prev.hasChildBlock());
-    assertEquals(PartitionType.LIST, prev.getPartitionType());
-    assertFalse(it.hasNext());
-    scans = prev.getScanNodes();
-    assertEquals(2, scans.length);
-    next = prev;
-    it= next.getChildBlocks().iterator();
-    
-    // the first phase of the join
-    prev = it.next();
-    assertEquals(NodeType.SCAN, prev.getStoreTableNode().getChild().getType());
-    assertFalse(prev.hasChildBlock());
-    assertEquals(PartitionType.HASH, prev.getPartitionType());
-    assertEquals(1, prev.getScanNodes().length);
-    
-    prev = it.next();
-    assertEquals(NodeType.SCAN, prev.getStoreTableNode().getChild().getType());
-    assertFalse(prev.hasChildBlock());
-    assertEquals(PartitionType.HASH, prev.getPartitionType());
-    assertEquals(1, prev.getScanNodes().length);
-    assertFalse(it.hasNext());
-  }
-  
-  @Test
-  public void testSelectAfterJoin() throws IOException, PlanningException {
-    String query = "select table0.name, table1.salary from table0,table1 where table0.name = table1.name and table1.salary > 10";
-    Expr context = analyzer.parse(query);
-    LogicalPlan plan = logicalPlanner.createPlan(context);
-    LogicalNode rootNode = optimizer.optimize(plan);
-    
-    MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
-
-    ExecutionBlock unit = globalPlan.getRoot();
-    StoreTableNode store = unit.getStoreTableNode();
-    assertEquals(NodeType.JOIN, store.getChild().getType());
-    assertTrue(unit.hasChildBlock());
-    ScanNode [] scans = unit.getScanNodes();
-    assertEquals(2, scans.length);
-    ExecutionBlock prev;
-    for (ScanNode scan : scans) {
-      prev = unit.getChildBlock(scan);
-      store = prev.getStoreTableNode();
-      assertEquals(NodeType.SCAN, store.getChild().getType());
-    }
-  }
-  
-  //@Test
-  public void testCubeby() throws IOException, PlanningException {
-    Expr expr = analyzer.parse(
-        "select age, sum(salary) from table0 group by cube (age, id)");
-    LogicalPlan plan = logicalPlanner.createPlan(expr);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
-
-    ExecutionBlock unit = globalPlan.getRoot();
-    StoreTableNode store = unit.getStoreTableNode();
-    assertEquals(NodeType.PROJECTION, store.getChild().getType());
-
-    ScanNode[] scans = unit.getScanNodes();
-    assertEquals(1, scans.length);
-
-    unit = unit.getChildBlock(scans[0]);
-    store = unit.getStoreTableNode();
-    assertEquals(NodeType.UNION, store.getChild().getType());
-    UnionNode union = (UnionNode) store.getChild();
-    assertEquals(NodeType.SCAN, union.getLeftChild().getType());
-    assertEquals(NodeType.UNION, union.getRightChild().getType());
-    union = (UnionNode) union.getRightChild();
-    assertEquals(NodeType.SCAN, union.getLeftChild().getType());
-    assertEquals(NodeType.UNION, union.getRightChild().getType());
-    union = (UnionNode) union.getRightChild();
-    assertEquals(NodeType.SCAN, union.getLeftChild().getType());
-    assertEquals(NodeType.SCAN, union.getRightChild().getType());
-    assertTrue(unit.hasChildBlock());
-    
-    String tableId = "";
-    for (ScanNode scan : unit.getScanNodes()) {
-      ExecutionBlock prev = unit.getChildBlock(scan);
-      store = prev.getStoreTableNode();
-      assertEquals(NodeType.GROUP_BY, store.getChild().getType());
-      GroupbyNode groupby = (GroupbyNode) store.getChild();
-      assertEquals(NodeType.SCAN, groupby.getChild().getType());
-      if (tableId.equals("")) {
-        tableId = store.getTableName();
-      } else {
-        assertEquals(tableId, store.getTableName());
-      }
-      assertEquals(1, prev.getScanNodes().length);
-      prev = prev.getChildBlock(prev.getScanNodes()[0]);
-      store = prev.getStoreTableNode();
-      assertEquals(NodeType.GROUP_BY, store.getChild().getType());
-      groupby = (GroupbyNode) store.getChild();
-      assertEquals(NodeType.SCAN, groupby.getChild().getType());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestGenericDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestGenericDirectedGraph.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestGenericDirectedGraph.java
new file mode 100644
index 0000000..f4d4a8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestGenericDirectedGraph.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.engine.planner.graph.DirectedGraphVisitor;
+import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
+import org.junit.Test;
+
+import java.util.Stack;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestGenericDirectedGraph {
+
+  @Test
+  public final void test() {
+    SimpleDirectedGraph<String, Integer> graph = new SimpleDirectedGraph<String, Integer>();
+
+    //     root
+    //     /  \
+    // (1)/    \ (2)
+    //   /      \
+    // child1  child2
+    //           / \
+    //       (3)/   \(4)
+    //         /     \
+    //    child3   child4
+    //
+    String root = "root";
+    String child1 = "child1";
+    String child2 = "child2";
+    String child3 = "child3";
+    String child4 = "child4";
+
+    graph.connect(child1, root, 1);
+    graph.connect(child2, root, 2);
+    graph.connect(child3, child2, 3);
+    graph.connect(child4, child2, 4);
+
+    assertTrue(graph.isRoot(root));
+    assertFalse(graph.isLeaf(root));
+
+    assertEquals(2, graph.getChildCount(root));
+    assertEquals(2, graph.getChildCount(child2));
+
+    graph.accept(root, new Visitor());
+  }
+
+  private class Visitor implements DirectedGraphVisitor<String> {
+    @Override
+    public void visit(Stack<String> stack, String s) {
+      System.out.println("===> " + s);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
new file mode 100644
index 0000000..04ed6fa
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
+import org.apache.tajo.master.TajoMaster;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestLogicalPlan {
+  private static TajoTestingCluster util;
+  private static TPCH tpch;
+  private static CatalogService catalog;
+  private static SQLAnalyzer sqlAnalyzer = new SQLAnalyzer();
+  private static LogicalPlanner planner;
+  private static LogicalOptimizer optimizer;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    util = new TajoTestingCluster();
+    util.startCatalogCluster();
+    catalog = util.getMiniCatalogCluster().getCatalog();
+    for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
+      catalog.registerFunction(funcDesc);
+    }
+
+    // TPC-H Schema for Complex Queries
+    String [] tpchTables = {
+        "part", "supplier", "partsupp", "nation", "region", "lineitem", "customer", "orders"
+    };
+    int [] tableVolumns = {
+        100, 200, 50, 5, 5, 800, 300, 100
+    };
+    tpch = new TPCH();
+    tpch.loadSchemas();
+    tpch.loadOutSchema();
+
+    for (int i = 0; i < tpchTables.length; i++) {
+      TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(tpchTables[i]), CatalogProtos.StoreType.CSV);
+      TableStat stat = new TableStat();
+      stat.setNumBytes(tableVolumns[i]);
+      m.setStat(stat);
+      TableDesc d = CatalogUtil.newTableDesc(tpchTables[i], m, new Path("file:///"));
+      catalog.addTable(d);
+    }
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
+  }
+
+  public static void tearDown() {
+    util.shutdownCatalogCluster();
+  }
+
+  @Test
+  public final void testQueryBlockGraph() {
+    LogicalPlan plan = new LogicalPlan(new LogicalPlanner(catalog));
+    LogicalPlan.QueryBlock root = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
+    LogicalPlan.QueryBlock new1 = plan.newAndGetBlock("@new1");
+    LogicalPlan.QueryBlock new2 = plan.newAndGetBlock("@new2");
+
+    plan.getBlockGraph().connect(new1, root, new LogicalPlan.BlockEdge(new1, root, BlockType.TableSubQuery));
+    plan.getBlockGraph().connect(new2, root, new LogicalPlan.BlockEdge(new2, root, BlockType.TableSubQuery));
+
+    SimpleDirectedGraph<LogicalPlan.QueryBlock, LogicalPlan.BlockEdge> graph = plan.getBlockGraph();
+    assertEquals(2, graph.getChildCount(root));
+
+    assertEquals(root, graph.getParent(new1));
+    assertEquals(root, graph.getParent(new2));
+
+    assertTrue(graph.isRoot(root));
+    assertFalse(graph.isRoot(new1));
+    assertFalse(graph.isRoot(new2));
+
+    assertFalse(graph.isLeaf(root));
+    assertTrue(graph.isLeaf(new1));
+    assertTrue(graph.isLeaf(new2));
+
+    Set<LogicalPlan.QueryBlock> result = new HashSet<LogicalPlan.QueryBlock>();
+    result.add(new1);
+    result.add(new2);
+
+    Set<LogicalPlan.QueryBlock> childs = new HashSet<LogicalPlan.QueryBlock>(graph.getChilds(root));
+    assertEquals(result, childs);
+  }
+}


Mime
View raw message