tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-464: Rename the name 'partition', actually meaning shuffle to 'shuffle'. (hyunsik)
Date Thu, 02 Jan 2014 08:26:09 GMT
TAJO-464: Rename the name 'partition', actually meaning shuffle to 'shuffle'. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bb7e6b6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bb7e6b6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bb7e6b6b

Branch: refs/heads/master
Commit: bb7e6b6b3ca3f729e292df3b5905a46fe773f392
Parents: df5727c
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Thu Jan 2 17:10:05 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Thu Jan 2 17:10:05 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../engine/planner/PhysicalPlannerImpl.java     | 105 ++++++-------
 .../tajo/engine/planner/global/DataChannel.java |  96 ++++++------
 .../engine/planner/global/GlobalPlanner.java    |  74 ++++++----
 .../tajo/engine/planner/global/MasterPlan.java  |  10 +-
 .../planner/logical/PersistentStoreNode.java    | 110 ++++++++++++++
 .../planner/logical/ShuffleFileWriteNode.java   | 114 ++++++++++++++
 .../engine/planner/logical/StoreTableNode.java  |  78 +---------
 .../planner/physical/HashPartitioner.java       |   6 +-
 .../physical/HashShuffleFileWriteExec.java      | 146 ++++++++++++++++++
 .../planner/physical/IndexedStoreExec.java      | 122 ---------------
 .../planner/physical/PartitionedStoreExec.java  | 148 -------------------
 .../engine/planner/physical/Partitioner.java    |   4 +-
 .../physical/RangeShuffleFileWriteExec.java     | 128 ++++++++++++++++
 .../engine/planner/physical/StoreTableExec.java |  34 ++---
 .../tajo/master/querymaster/QueryUnit.java      |  10 +-
 .../master/querymaster/QueryUnitAttempt.java    |  11 +-
 .../tajo/master/querymaster/Repartitioner.java  |  38 ++---
 .../tajo/master/querymaster/SubQuery.java       |   4 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  16 +-
 .../apache/tajo/worker/TaskAttemptContext.java  |  13 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |  23 ++-
 .../main/resources/webapps/worker/queryplan.jsp |   4 +-
 .../main/resources/webapps/worker/queryunit.jsp |   6 +-
 .../engine/planner/global/TestMasterPlan.java   |   4 +-
 .../planner/physical/TestPhysicalPlanner.java   |  15 +-
 .../apache/tajo/master/TestRepartitioner.java   |   2 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   6 +-
 .../tajo/pullserver/TajoPullServerService.java  |  34 ++---
 29 files changed, 764 insertions(+), 600 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cfc53e..f3e74ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -110,6 +110,9 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-464: Rename the name 'partition', actually meaning shuffle to 
+    'shuffle'. (hyunsik)
+
     TAJO-385: Refactoring TaskScheduler to assign multiple fragments. (jihoon)
 
     TAJO-468: Implements task's detail info page in WEB UI.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 5120106..d6d518c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -72,8 +72,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     try {
       execPlan = createPlanRecursive(context, logicalPlan);
       if (execPlan instanceof StoreTableExec
-          || execPlan instanceof IndexedStoreExec
-          || execPlan instanceof PartitionedStoreExec
+          || execPlan instanceof RangeShuffleFileWriteExec
+          || execPlan instanceof HashShuffleFileWriteExec
           || execPlan instanceof ColumnPartitionedTableStoreExec) {
         return execPlan;
       } else if (context.getDataChannel() != null) {
@@ -89,18 +89,15 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
                                            PhysicalExec execPlan) throws IOException {
     DataChannel channel = context.getDataChannel();
-    StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
-    if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
-    storeTableNode.setInSchema(plan.getOutSchema());
-    storeTableNode.setOutSchema(plan.getOutSchema());
-    if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
-      storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
-    } else {
-      storeTableNode.setDefaultParition();
-    }
-    storeTableNode.setChild(plan);
-
-    PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
+    ShuffleFileWriteNode shuffleFileWriteNode =
+        new ShuffleFileWriteNode(UNGENERATED_PID, channel.getTargetId().toString());
+    shuffleFileWriteNode.setStorageType(context.getDataChannel().getStoreType());
+    shuffleFileWriteNode.setInSchema(plan.getOutSchema());
+    shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
+    shuffleFileWriteNode.setShuffle(channel.getShuffleType(), channel.getShuffleKeys(), channel.getShuffleOutputNum());
+    shuffleFileWriteNode.setChild(plan);
+
+    PhysicalExec outExecPlan = createShuffleFileWritePlan(context, shuffleFileWriteNode, execPlan);
     return outExecPlan;
   }
 
@@ -606,50 +603,56 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     }
   }
 
-  public PhysicalExec createStorePlan(TaskAttemptContext ctx,
-                                      StoreTableNode plan, PhysicalExec subOp) throws IOException {
-    if (plan.getPartitionType() == PartitionType.HASH_PARTITION
-        || plan.getPartitionType() == PartitionType.RANGE_PARTITION) {
-      switch (ctx.getDataChannel().getPartitionType()) {
-        case HASH_PARTITION:
-          return new PartitionedStoreExec(ctx, sm, plan, subOp);
-
-        case RANGE_PARTITION:
-          SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
-
-          SortSpec [] sortSpecs = null;
-          if (sortExec != null) {
-            sortSpecs = sortExec.getSortSpecs();
-          } else {
-            Column[] columns = ctx.getDataChannel().getPartitionKey();
-            SortSpec specs[] = new SortSpec[columns.length];
-            for (int i = 0; i < columns.length; i++) {
-              specs[i] = new SortSpec(columns[i]);
-            }
-          }
-
-          return new IndexedStoreExec(ctx, sm, subOp,
-              plan.getInSchema(), plan.getInSchema(), sortSpecs);
+
+  /**
+   * Create a shuffle file write executor to store intermediate data into local disks.
+   */
+  public PhysicalExec createShuffleFileWritePlan(TaskAttemptContext ctx,
+                                                 ShuffleFileWriteNode plan, PhysicalExec subOp) throws IOException {
+    switch (plan.getShuffleType()) {
+    case HASH_SHUFFLE:
+      return new HashShuffleFileWriteExec(ctx, sm, plan, subOp);
+
+    case RANGE_SHUFFLE:
+      SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
+
+      SortSpec [] sortSpecs = null;
+      if (sortExec != null) {
+        sortSpecs = sortExec.getSortSpecs();
+      } else {
+        Column[] columns = ctx.getDataChannel().getShuffleKeys();
+        SortSpec specs[] = new SortSpec[columns.length];
+        for (int i = 0; i < columns.length; i++) {
+          specs[i] = new SortSpec(columns[i]);
+        }
       }
+      return new RangeShuffleFileWriteExec(ctx, sm, subOp, plan.getInSchema(), plan.getInSchema(), sortSpecs);
+
+    case NONE_SHUFFLE:
+      return new StoreTableExec(ctx, plan, subOp);
+
+    default:
+      throw new IllegalStateException(ctx.getDataChannel().getShuffleType() + " is not supported yet.");
     }
-    if (plan instanceof StoreIndexNode) {
-      return new TunnelExec(ctx, plan.getOutSchema(), subOp);
-    }
+  }
+
+  /**
+   * Create a executor to store a table into HDFS. This is used for CREATE TABLE ..
+   * AS or INSERT (OVERWRITE) INTO statement.
+   */
+  public PhysicalExec createStorePlan(TaskAttemptContext ctx,
+                                      StoreTableNode plan, PhysicalExec subOp) throws IOException {
 
-    // Find partitioned table
     if (plan.getPartitions() != null) {
-      if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
+      switch (plan.getPartitions().getPartitionsType()) {
+      case COLUMN:
         return new ColumnPartitionedTableStoreExec(ctx, plan, subOp);
-      } else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.HASH)) {
-        // TODO
-      } else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE)) {
-        // TODO
-      } else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
-        // TODO
+      default:
+        throw new IllegalStateException(plan.getPartitions().getPartitionsType() + " is not supported yet.");
       }
+    } else {
+      return new StoreTableExec(ctx, plan, subOp);
     }
-
-    return new StoreTableExec(ctx, plan, subOp);
   }
 
   public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 556c7ff..efa1e05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -30,9 +30,9 @@ public class DataChannel {
   private ExecutionBlockId srcId;
   private ExecutionBlockId targetId;
   private TransmitType transmitType = TransmitType.PULL_TRANSMIT;
-  private PartitionType partitionType;
-  private Integer partitionNum = 1;
-  private Column[] key;
+  private ShuffleType shuffleType;
+  private Integer numOutputs = 1;
+  private Column[] shuffleKeys;
 
   private Schema schema;
 
@@ -43,39 +43,39 @@ public class DataChannel {
     this.targetId = targetId;
   }
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType) {
     this(srcId, targetId);
-    this.partitionType = partitionType;
+    this.shuffleType = shuffleType;
   }
 
-  public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
-    this(src.getId(), target.getId(), partitionType, partNum);
+  public DataChannel(ExecutionBlock src, ExecutionBlock target, ShuffleType shuffleType, int numOutput) {
+    this(src.getId(), target.getId(), shuffleType, numOutput);
     setSchema(src.getPlan().getOutSchema());
   }
 
-  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
-    this(srcId, targetId, partitionType);
-    this.partitionNum = partNum;
+  public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, ShuffleType shuffleType, int numOutputs) {
+    this(srcId, targetId, shuffleType);
+    this.numOutputs = numOutputs;
   }
 
   public DataChannel(DataChannelProto proto) {
     this.srcId = new ExecutionBlockId(proto.getSrcId());
     this.targetId = new ExecutionBlockId(proto.getTargetId());
     this.transmitType = proto.getTransmitType();
-    this.partitionType = proto.getPartitionType();
+    this.shuffleType = proto.getShuffleType();
     if (proto.hasSchema()) {
       this.setSchema(new Schema(proto.getSchema()));
     }
-    if (proto.getPartitionKeyCount() > 0) {
-      key = new Column[proto.getPartitionKeyCount()];
-      for (int i = 0; i < proto.getPartitionKeyCount(); i++) {
-        key[i] = new Column(proto.getPartitionKey(i));
+    if (proto.getShuffleKeysCount() > 0) {
+      shuffleKeys = new Column[proto.getShuffleKeysCount()];
+      for (int i = 0; i < proto.getShuffleKeysCount(); i++) {
+        shuffleKeys[i] = new Column(proto.getShuffleKeys(i));
       }
     } else {
-      key = new Column[] {};
+      shuffleKeys = new Column[] {};
     }
-    if (proto.hasPartitionNum()) {
-      this.partitionNum = proto.getPartitionNum();
+    if (proto.hasNumOutputs()) {
+      this.numOutputs = proto.getNumOutputs();
     }
 
     if (proto.hasStoreType()) {
@@ -91,8 +91,8 @@ public class DataChannel {
     return targetId;
   }
 
-  public PartitionType getPartitionType() {
-    return partitionType;
+  public ShuffleType getShuffleType() {
+    return shuffleType;
   }
 
   public TransmitType getTransmitType() {
@@ -103,37 +103,37 @@ public class DataChannel {
     this.transmitType = transmitType;
   }
 
-  public void setPartition(PartitionType partitionType, Column [] keys, int numPartitions) {
-    Preconditions.checkArgument(keys.length >= 0, "At least one partition key must be specified.");
-    Preconditions.checkArgument(numPartitions > 0, "The number of partitions must be positive: %s", numPartitions);
+  public void setShuffle(ShuffleType shuffleType, Column[] keys, int numOutputs) {
+    Preconditions.checkArgument(keys.length >= 0, "At least one shuffle key must be specified.");
+    Preconditions.checkArgument(numOutputs > 0, "The number of outputs must be positive: %s", numOutputs);
 
-    this.partitionType = partitionType;
-    this.key = keys;
-    this.partitionNum = numPartitions;
+    this.shuffleType = shuffleType;
+    this.shuffleKeys = keys;
+    this.numOutputs = numOutputs;
   }
 
-  public void setPartitionType(PartitionType partitionType) {
-    this.partitionType = partitionType;
+  public void setShuffleType(ShuffleType shuffleType) {
+    this.shuffleType = shuffleType;
   }
 
-  public boolean hasPartitionKey() {
-    return key != null;
+  public boolean hasShuffleKeys() {
+    return shuffleKeys != null;
   }
 
-  public void setPartitionKey(Column [] key) {
-    this.key = key;
+  public void setShuffleKeys(Column[] key) {
+    this.shuffleKeys = key;
   }
 
-  public Column [] getPartitionKey() {
-    return this.key;
+  public Column [] getShuffleKeys() {
+    return this.shuffleKeys;
   }
 
-  public void setPartitionNum(int partNum) {
-    this.partitionNum = partNum;
+  public void setShuffleOutputNum(int partNum) {
+    this.numOutputs = partNum;
   }
 
-  public int getPartitionNum() {
-    return partitionNum;
+  public int getShuffleOutputNum() {
+    return numOutputs;
   }
 
   public boolean hasStoreType() {
@@ -155,17 +155,17 @@ public class DataChannel {
     if (transmitType != null) {
       builder.setTransmitType(transmitType);
     }
-    builder.setPartitionType(partitionType);
+    builder.setShuffleType(shuffleType);
     if (schema != null) {
       builder.setSchema(schema.getProto());
     }
-    if (key != null) {
-      for (Column column : key) {
-        builder.addPartitionKey(column.getProto());
+    if (shuffleKeys != null) {
+      for (Column column : shuffleKeys) {
+        builder.addShuffleKeys(column.getProto());
       }
     }
-    if (partitionNum != null) {
-      builder.setPartitionNum(partitionNum);
+    if (numOutputs != null) {
+      builder.setNumOutputs(numOutputs);
     }
 
     if(storeType != null){
@@ -186,11 +186,11 @@ public class DataChannel {
     StringBuilder sb = new StringBuilder();
     sb.append("[").append(srcId.getQueryId()).append("] ");
     sb.append(srcId.getId()).append(" => ").append(targetId.getId());
-    sb.append(" (type=").append(partitionType);
-    if (hasPartitionKey()) {
+    sb.append(" (type=").append(shuffleType);
+    if (hasShuffleKeys()) {
       sb.append(", key=");
       boolean first = true;
-      for (Column column : getPartitionKey()) {
+      for (Column column : getShuffleKeys()) {
         if (first) {
           first = false;
         } else {
@@ -198,7 +198,7 @@ public class DataChannel {
         }
         sb.append(column.getColumnName());
       }
-      sb.append(", num=").append(partitionNum);
+      sb.append(", num=").append(numOutputs);
     }
     sb.append(")");
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index b7e1ddb..abe6af3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -39,7 +39,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.*;
 
 /**
  * Build DAG
@@ -72,27 +72,43 @@ public class GlobalPlanner {
     globalPlanContext.plan = masterPlan;
     LOG.info(masterPlan.getLogicalPlan());
 
+    // copy a logical plan in order to keep the original logical plan. The distributed planner can modify
+    // an input logical plan.
     LogicalNode inputPlan = PlannerUtil.clone(masterPlan.getLogicalPlan(),
         masterPlan.getLogicalPlan().getRootBlock().getRoot());
+
+    // create a distributed execution plan by visiting each logical node.
+    // Its output is a graph, where each vertex is an execution block, and each edge is a data channel.
+    // MasterPlan contains them.
     LogicalNode lastNode = planner.visit(globalPlanContext,
         masterPlan.getLogicalPlan(), masterPlan.getLogicalPlan().getRootBlock(), inputPlan, new Stack<LogicalNode>());
-
     ExecutionBlock childExecBlock = globalPlanContext.execBlockMap.get(lastNode.getPID());
 
+    ExecutionBlock terminalBlock;
+    // TODO - consider two terminal types: specified output or not
     if (childExecBlock.getPlan() != null) {
-      ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
-      DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
-      dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
-      dataChannel.setSchema(lastNode.getOutSchema());
-      masterPlan.addConnect(dataChannel);
-      masterPlan.setTerminal(terminalBlock);
-    } else {
-      masterPlan.setTerminal(childExecBlock);
+      terminalBlock = masterPlan.createTerminalBlock();
+      DataChannel finalChannel = new DataChannel(childExecBlock.getId(), terminalBlock.getId());
+      setFinalOutputChannel(finalChannel, lastNode.getOutSchema());
+      masterPlan.addConnect(finalChannel);
+    } else { // if one or more unions is terminal
+      terminalBlock = childExecBlock;
+      for (DataChannel outputChannel : masterPlan.getIncomingChannels(terminalBlock.getId())) {
+        setFinalOutputChannel(outputChannel, lastNode.getOutSchema());
+      }
     }
 
+    masterPlan.setTerminal(terminalBlock);
     LOG.info(masterPlan);
   }
 
+  private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) {
+    outputChannel.setShuffleType(NONE_SHUFFLE);
+    outputChannel.setShuffleOutputNum(1);
+    outputChannel.setStoreType(CatalogProtos.StoreType.CSV);
+    outputChannel.setSchema(outputSchema);
+  }
+
   public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
     Preconditions.checkArgument(channel.getSchema() != null,
         "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -105,15 +121,15 @@ public class GlobalPlanner {
                                                 ExecutionBlock parent, JoinNode join, boolean leftTable) {
     ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
 
-    DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+    DataChannel channel = new DataChannel(childBlock, parent, HASH_SHUFFLE, 32);
     channel.setStoreType(storeType);
     if (join.getJoinType() != JoinType.CROSS) {
       Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
           leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
       if (leftTable) {
-        channel.setPartitionKey(joinColumns[0]);
+        channel.setShuffleKeys(joinColumns[0]);
       } else {
-        channel.setPartitionKey(joinColumns[1]);
+        channel.setShuffleKeys(joinColumns[1]);
       }
     }
     return channel;
@@ -218,8 +234,8 @@ public class GlobalPlanner {
 
     // setup channel
     DataChannel channel;
-    channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-    channel.setPartitionKey(groupbyNode.getGroupingColumns());
+    channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+    channel.setShuffleKeys(groupbyNode.getGroupingColumns());
     channel.setSchema(topMostOfFirstPhase.getOutSchema());
     channel.setStoreType(storeType);
 
@@ -251,9 +267,9 @@ public class GlobalPlanner {
         currentBlock = childBlock;
         for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
           if (firstPhaseGroupBy.isEmptyGrouping()) {
-            dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 1);
+            dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
           } else {
-            dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 32);
+            dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
           }
           dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
 
@@ -273,11 +289,11 @@ public class GlobalPlanner {
 
         DataChannel channel;
         if (firstPhaseGroupBy.isEmptyGrouping()) {
-          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
-          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+          channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
+          channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
         } else {
-          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
+          channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+          channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
         }
         channel.setSchema(firstPhaseGroupBy.getOutSchema());
         channel.setStoreType(storeType);
@@ -306,8 +322,8 @@ public class GlobalPlanner {
     childBlock.setPlan(firstSortNode);
 
     currentBlock = masterPlan.newExecutionBlock();
-    DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
-    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
+    DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_SHUFFLE, 32);
+    channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
     channel.setSchema(firstSortNode.getOutSchema());
     channel.setStoreType(storeType);
 
@@ -348,9 +364,9 @@ public class GlobalPlanner {
     DataChannel channel = null;
     CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
     if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
-      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+      channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
       Column[] columns = new Column[partitionDesc.getColumns().size()];
-      channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
+      channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
       channel.setSchema(childNode.getOutSchema());
       channel.setStoreType(storeType);
     } else {
@@ -409,15 +425,15 @@ public class GlobalPlanner {
         childBlock.setPlan(childLimit);
 
         DataChannel channel = context.plan.getChannel(childBlock, execBlock);
-        channel.setPartitionNum(1);
+        channel.setShuffleOutputNum(1);
         context.execBlockMap.put(node.getPID(), execBlock);
       } else {
         node.setChild(execBlock.getPlan());
         execBlock.setPlan(node);
 
         ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
-        DataChannel newChannel = new DataChannel(execBlock, newExecBlock, HASH_PARTITION, 1);
-        newChannel.setPartitionKey(new Column[]{});
+        DataChannel newChannel = new DataChannel(execBlock, newExecBlock, HASH_SHUFFLE, 1);
+        newChannel.setShuffleKeys(new Column[]{});
         newChannel.setSchema(node.getOutSchema());
         newChannel.setStoreType(storeType);
 
@@ -525,7 +541,7 @@ public class GlobalPlanner {
       }
 
       for (ExecutionBlock childBlocks : queryBlockBlocks) {
-        DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
+        DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_SHUFFLE, 1);
         channel.setStoreType(storeType);
         context.plan.addConnect(channel);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 891b452..2ac2bc9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -26,6 +26,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,8 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-
 public class MasterPlan {
   private final QueryId queryId;
   private final QueryContext context;
@@ -44,7 +43,8 @@ public class MasterPlan {
 
   private ExecutionBlock terminalBlock;
   private Map<ExecutionBlockId, ExecutionBlock> execBlockMap = new HashMap<ExecutionBlockId, ExecutionBlock>();
-  private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph = new SimpleDirectedGraph<ExecutionBlockId, DataChannel>();
+  private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph =
+      new SimpleDirectedGraph<ExecutionBlockId, DataChannel>();
 
   public ExecutionBlockId newExecutionBlockId() {
     return new ExecutionBlockId(queryId, nextId.incrementAndGet());
@@ -108,11 +108,11 @@ public class MasterPlan {
     execBlockGraph.addEdge(dataChannel.getSrcId(), dataChannel.getTargetId(), dataChannel);
   }
 
-  public void addConnect(ExecutionBlock src, ExecutionBlock target, PartitionType type) {
+  public void addConnect(ExecutionBlock src, ExecutionBlock target, TajoWorkerProtocol.ShuffleType type) {
     addConnect(src.getId(), target.getId(), type);
   }
 
-  public void addConnect(ExecutionBlockId src, ExecutionBlockId target, PartitionType type) {
+  public void addConnect(ExecutionBlockId src, ExecutionBlockId target, TajoWorkerProtocol.ShuffleType type) {
     addConnect(new DataChannel(src, target, type));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
new file mode 100644
index 0000000..2f1a487
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.util.TUtil;
+
+
+/**
+ * <code>PersistentStoreNode</code> an expression for a persistent data store step.
+ * This includes some basic information for materializing data.
+ */
+public abstract class PersistentStoreNode extends UnaryNode implements Cloneable {
+  @Expose protected String tableName;
+  @Expose protected CatalogProtos.StoreType storageType = CatalogProtos.StoreType.CSV;
+  @Expose protected Options options;
+
+  public PersistentStoreNode(int pid, String tableName) {
+    super(pid, NodeType.STORE);
+    this.tableName = tableName;
+  }
+
+  public final String getTableName() {
+    return this.tableName;
+  }
+
+  public void setStorageType(CatalogProtos.StoreType storageType) {
+    this.storageType = storageType;
+  }
+
+  public CatalogProtos.StoreType getStorageType() {
+    return this.storageType;
+  }
+
+  public boolean hasOptions() {
+    return this.options != null;
+  }
+
+  public Options getOptions() {
+    return this.options;
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Store");
+    planStr.appendTitle(" into ").appendTitle(tableName);
+    planStr.addExplan("Store type: " + storageType);
+
+    return planStr;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof PersistentStoreNode) {
+      PersistentStoreNode other = (PersistentStoreNode) obj;
+      boolean eq = super.equals(other);
+      eq = eq && this.tableName.equals(other.tableName);
+      eq = eq && this.storageType.equals(other.storageType);
+      eq = eq && TUtil.checkEquals(options, other.options);
+      return eq;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    PersistentStoreNode store = (PersistentStoreNode) super.clone();
+    store.tableName = tableName;
+    store.storageType = storageType != null ? storageType : null;
+    store.options = options != null ? (Options) options.clone() : null;
+    return store;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"Store\": {\"table\": \""+tableName);
+    if (storageType != null) {
+      sb.append(", storage: "+ storageType.name());
+    }
+
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
+        .append("\n  \"in schema\": ").append(getInSchema());
+
+    sb.append("}");
+
+    return sb.toString() + "\n"
+        + getChild().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
new file mode 100644
index 0000000..180b1a2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.util.TUtil;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.NONE_SHUFFLE;
+
+/**
+ * ShuffeFileWriteNode is an expression for an intermediate data materialization step.
+ */
+public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneable {
+  @Expose private TajoWorkerProtocol.ShuffleType shuffleType = NONE_SHUFFLE;
+  @Expose private int numOutputs;
+  @Expose private Column [] shuffleKeys;
+
+  public ShuffleFileWriteNode(int pid, String tableName) {
+    super(pid, tableName);
+  }
+    
+  public final int getNumOutputs() {
+    return this.numOutputs;
+  }
+  
+  public final boolean hasShuffleKeys() {
+    return this.shuffleKeys != null;
+  }
+  
+  public final Column [] getShuffleKeys() {
+    return shuffleKeys;
+  }
+  
+  public final void setShuffle(TajoWorkerProtocol.ShuffleType type, Column[] keys, int numPartitions) {
+    Preconditions.checkArgument(keys.length >= 0, 
+        "At least one partition key must be specified.");
+    Preconditions.checkArgument(numPartitions > 0,
+        "The number of partitions must be positive: %s", numPartitions);
+
+    this.shuffleType = type;
+    this.shuffleKeys = keys;
+    this.numOutputs = numPartitions;
+  }
+
+  public TajoWorkerProtocol.ShuffleType getShuffleType() {
+    return this.shuffleType;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ShuffleFileWriteNode) {
+      ShuffleFileWriteNode other = (ShuffleFileWriteNode) obj;
+      boolean eq = super.equals(other);
+      eq = eq && this.numOutputs == other.numOutputs;
+      eq = eq && TUtil.checkEquals(shuffleKeys, other.shuffleKeys);
+      return eq;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    ShuffleFileWriteNode store = (ShuffleFileWriteNode) super.clone();
+    store.numOutputs = numOutputs;
+    store.shuffleKeys = shuffleKeys != null ? shuffleKeys.clone() : null;
+    return store;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\"Store\": {\"table\": \""+tableName);
+    if (storageType != null) {
+      sb.append(", storage: "+ storageType.name());
+    }
+    sb.append(", partnum: ").append(numOutputs).append("}")
+    .append(", ");
+    if (shuffleKeys != null) {
+      sb.append("\"partition keys: [");
+      for (int i = 0; i < shuffleKeys.length; i++) {
+        sb.append(shuffleKeys[i]);
+        if (i < shuffleKeys.length - 1)
+          sb.append(",");
+      }
+      sb.append("],");
+    }
+    
+    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
+    .append("\n  \"in schema\": ").append(getInSchema());
+
+    sb.append("}");
+    
+    return sb.toString() + "\n" + getChild().toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 634fa3a..843a70f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -18,44 +18,28 @@
 
 package org.apache.tajo.engine.planner.logical;
 
-import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.LIST_PARTITION;
-
-public class StoreTableNode extends UnaryNode implements Cloneable {
-  @Expose private String tableName;
-  @Expose private StoreType storageType = StoreType.CSV;
-  @Expose private PartitionType partitionType;
-  @Expose private int numPartitions;
-  @Expose private Column [] partitionKeys;
-  @Expose private Options options;
+
+public class StoreTableNode extends PersistentStoreNode implements Cloneable {
   @Expose private boolean isCreatedTable = false;
   @Expose private boolean isOverwritten = false;
   @Expose private PartitionDesc partitionDesc;
 
   public StoreTableNode(int pid, String tableName) {
-    super(pid, NodeType.STORE);
-    this.tableName = tableName;
+    super(pid, tableName);
   }
 
   public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
-    super(pid, NodeType.STORE);
-    this.tableName = tableName;
+    super(pid, tableName);
     this.partitionDesc = partitionDesc;
   }
 
-  public final String getTableName() {
-    return this.tableName;
-  }
-
   public void setStorageType(StoreType storageType) {
     this.storageType = storageType;
   }
@@ -63,39 +47,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   public StoreType getStorageType() {
     return this.storageType;
   }
-    
-  public final int getNumPartitions() {
-    return this.numPartitions;
-  }
-  
-  public final boolean hasPartitionKey() {
-    return this.partitionKeys != null;
-  }
-  
-  public final Column [] getPartitionKeys() {
-    return this.partitionKeys;
-  }
-
-  public final void setDefaultParition() {
-    this.partitionType = LIST_PARTITION;
-    this.partitionKeys = null;
-    this.numPartitions = 1;
-  }
-  
-  public final void setPartitions(PartitionType type, Column [] keys, int numPartitions) {
-    Preconditions.checkArgument(keys.length >= 0, 
-        "At least one partition key must be specified.");
-    Preconditions.checkArgument(numPartitions > 0,
-        "The number of partitions must be positive: %s", numPartitions);
-
-    this.partitionType = type;
-    this.partitionKeys = keys;
-    this.numPartitions = numPartitions;
-  }
-
-  public PartitionType getPartitionType() {
-    return this.partitionType;
-  }
 
   public boolean hasOptions() {
     return this.options != null;
@@ -139,11 +90,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     if (obj instanceof StoreTableNode) {
       StoreTableNode other = (StoreTableNode) obj;
       boolean eq = super.equals(other);
-      eq = eq && this.tableName.equals(other.tableName);
-      eq = eq && this.storageType.equals(other.storageType);
-      eq = eq && this.numPartitions == other.numPartitions;
-      eq = eq && TUtil.checkEquals(partitionKeys, other.partitionKeys);
-      eq = eq && TUtil.checkEquals(options, other.options);
       eq = eq && isCreatedTable == other.isCreatedTable;
       eq = eq && isOverwritten == other.isOverwritten;
       eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
@@ -156,11 +102,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     StoreTableNode store = (StoreTableNode) super.clone();
-    store.tableName = tableName;
-    store.storageType = storageType != null ? storageType : null;
-    store.numPartitions = numPartitions;
-    store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
-    store.options = options != null ? (Options) options.clone() : null;
     store.isCreatedTable = isCreatedTable;
     store.isOverwritten = isOverwritten;
     store.partitionDesc = partitionDesc;
@@ -173,17 +114,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     if (storageType != null) {
       sb.append(", storage: "+ storageType.name());
     }
-    sb.append(", partnum: ").append(numPartitions).append("}")
-    .append(", ");
-    if (partitionKeys != null) {
-      sb.append("\"partition keys: [");
-      for (int i = 0; i < partitionKeys.length; i++) {
-        sb.append(partitionKeys[i]);
-        if (i < partitionKeys.length - 1)
-          sb.append(",");
-      }
-      sb.append("],");
-    }
     
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
     .append("\n  \"in schema\": ").append(getInSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
index 485e0d1..b620b22 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashPartitioner.java
@@ -29,14 +29,14 @@ public class HashPartitioner extends Partitioner {
   
   public HashPartitioner(final int [] keys, final int numPartitions) {
     super(keys, numPartitions);
-    this.keyTuple = new VTuple(partitionKeys.length);
+    this.keyTuple = new VTuple(partitionKeyIds.length);
   }
   
   @Override
   public int getPartition(Tuple tuple) {
     // build one key tuple
-    for (int i = 0; i < partitionKeys.length; i++) {
-      keyTuple.put(i, tuple.get(partitionKeys[i]));
+    for (int i = 0; i < partitionKeyIds.length; i++) {
+      keyTuple.put(i, tuple.get(partitionKeyIds[i]));
     }
     return (keyTuple.hashCode() & Integer.MAX_VALUE) %
         (numPartitions == 32 ? numPartitions-1 : numPartitions);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
new file mode 100644
index 0000000..c09ec19
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.logical.ShuffleFileWriteNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <code>HashShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle keys. The file outputs are stored on local disks.
+ */
+public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
+  private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
+  private final ShuffleFileWriteNode plan;
+  private final TableMeta meta;
+  private final Partitioner partitioner;
+  private final Path storeTablePath;
+  private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
+  private final int numShuffleOutputs;
+  private final int [] shuffleKeyIds;
+  
+  public HashShuffleFileWriteExec(TaskAttemptContext context, final AbstractStorageManager sm,
+                                  final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    Preconditions.checkArgument(plan.hasShuffleKeys());
+    this.plan = plan;
+    if (plan.hasOptions()) {
+      this.meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+    } else {
+      this.meta = CatalogUtil.newTableMeta(plan.getStorageType());
+    }
+    // about the shuffle
+    this.numShuffleOutputs = this.plan.getNumOutputs();
+    int i = 0;
+    this.shuffleKeyIds = new int [this.plan.getShuffleKeys().length];
+    for (Column key : this.plan.getShuffleKeys()) {
+      shuffleKeyIds[i] = inSchema.getColumnId(key.getQualifiedName());
+      i++;
+    }
+    this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs);
+    storeTablePath = new Path(context.getWorkDir(), "output");
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+    FileSystem fs = new RawLocalFileSystem();
+    fs.mkdirs(storeTablePath);
+  }
+  
+  private Appender getAppender(int partId) throws IOException {
+    Appender appender = appenderMap.get(partId);
+
+    if (appender == null) {
+      Path dataFile = getDataFile(partId);
+      FileSystem fs = dataFile.getFileSystem(context.getConf());
+      if (fs.exists(dataFile)) {
+        LOG.info("File " + dataFile + " already exists!");
+        FileStatus status = fs.getFileStatus(dataFile);
+        LOG.info("File size: " + status.getLen());
+      }
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
+      appender.enableStats();
+      appender.init();
+      appenderMap.put(partId, appender);
+    } else {
+      appender = appenderMap.get(partId);
+    }
+
+    return appender;
+  }
+
+  private Path getDataFile(int partId) {
+    return StorageUtil.concatPath(storeTablePath, ""+partId);
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    Appender appender;
+    int partId;
+    while ((tuple = child.next()) != null) {
+      partId = partitioner.getPartition(tuple);
+      appender = getAppender(partId);
+      appender.addTuple(tuple);
+    }
+    
+    List<TableStats> statSet = new ArrayList<TableStats>();
+    for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
+      int partNum = entry.getKey();
+      Appender app = entry.getValue();
+      app.flush();
+      app.close();
+      statSet.add(app.getStats());
+      if (app.getStats().getNumRows() > 0) {
+        context.addShuffleFileOutput(partNum, getDataFile(partNum).getName());
+      }
+    }
+    
+    // Collect and aggregated statistics data
+    TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
+    context.setResultStats(aggregated);
+    
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    // nothing to do   
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
deleted file mode 100644
index afb4d3c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-
-public class IndexedStoreExec extends UnaryPhysicalExec {
-  private static Log LOG = LogFactory.getLog(IndexedStoreExec.class);
-  private final SortSpec[] sortSpecs;
-  private int [] indexKeys = null;
-  private Schema keySchema;
-
-  private BSTIndex.BSTIndexWriter indexWriter;
-  private TupleComparator comp;
-  private FileAppender appender;
-  private TableMeta meta;
-
-  public IndexedStoreExec(final TaskAttemptContext context, final AbstractStorageManager sm,
-      final PhysicalExec child, final Schema inSchema, final Schema outSchema,
-      final SortSpec[] sortSpecs) throws IOException {
-    super(context, inSchema, outSchema, child);
-    this.sortSpecs = sortSpecs;
-  }
-
-  public void init() throws IOException {
-    super.init();
-
-    indexKeys = new int[sortSpecs.length];
-    keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-
-    Column col;
-    for (int i = 0 ; i < sortSpecs.length; i++) {
-      col = sortSpecs[i].getSortKey();
-      indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
-    }
-
-    BSTIndex bst = new BSTIndex(new TajoConf());
-    this.comp = new TupleComparator(keySchema, sortSpecs);
-    Path storeTablePath = new Path(context.getWorkDir(), "output");
-    LOG.info("Output data directory: " + storeTablePath);
-    this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
-        context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
-    FileSystem fs = new RawLocalFileSystem();
-    fs.mkdirs(storeTablePath);
-    this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
-        outSchema, new Path(storeTablePath, "output"));
-    this.appender.enableStats();
-    this.appender.init();
-    this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
-        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
-    this.indexWriter.setLoadNum(100);
-    this.indexWriter.open();
-  }
-
-  @Override
-  public Tuple next() throws IOException {
-    Tuple tuple;
-    Tuple keyTuple;
-    Tuple prevKeyTuple = null;
-    long offset;
-
-
-    while((tuple = child.next()) != null) {
-      offset = appender.getOffset();
-      appender.addTuple(tuple);
-      keyTuple = new VTuple(keySchema.getColumnNum());
-      RowStoreUtil.project(tuple, keyTuple, indexKeys);
-      if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
-        indexWriter.write(keyTuple, offset);
-        prevKeyTuple = keyTuple;
-      }
-    }
-
-    return null;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-  }
-
-  public void close() throws IOException {
-    super.close();
-
-    appender.flush();
-    appender.close();
-    indexWriter.flush();
-    indexWriter.close();
-
-    // Collect statistics data
-    context.setResultStats(appender.getStats());
-    context.addRepartition(0, context.getTaskId().toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
deleted file mode 100644
index bcea189..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public final class PartitionedStoreExec extends UnaryPhysicalExec {
-  private static Log LOG = LogFactory.getLog(PartitionedStoreExec.class);
-  private static final NumberFormat numFormat = NumberFormat.getInstance();
-
-  static {
-    numFormat.setGroupingUsed(false);
-    numFormat.setMinimumIntegerDigits(6);
-  }
-
-  private final StoreTableNode plan;
-
-  private final int numPartitions;
-  private final int [] partitionKeys;  
-
-  private final TableMeta meta;
-  private final Partitioner partitioner;
-  private final Path storeTablePath;
-  private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
-  
-  public PartitionedStoreExec(TaskAttemptContext context, final AbstractStorageManager sm,
-      final StoreTableNode plan, final PhysicalExec child) throws IOException {
-    super(context, plan.getInSchema(), plan.getOutSchema(), child);
-    Preconditions.checkArgument(plan.hasPartitionKey());
-    this.plan = plan;
-    this.meta = CatalogUtil.newTableMeta(context.getDataChannel().getStoreType());
-    // about the partitions
-    this.numPartitions = this.plan.getNumPartitions();
-    int i = 0;
-    this.partitionKeys = new int [this.plan.getPartitionKeys().length];
-    for (Column key : this.plan.getPartitionKeys()) {
-      partitionKeys[i] = inSchema.getColumnId(key.getQualifiedName());
-      i++;
-    }
-    this.partitioner = new HashPartitioner(partitionKeys, numPartitions);
-    storeTablePath = new Path(context.getWorkDir(), "output");
-  }
-
-  @Override
-  public void init() throws IOException {
-    super.init();
-    FileSystem fs = new RawLocalFileSystem();
-    fs.mkdirs(storeTablePath);
-  }
-  
-  private Appender getAppender(int partition) throws IOException {
-    Appender appender = appenderMap.get(partition);
-
-    if (appender == null) {
-      Path dataFile = getDataFile(partition);
-      FileSystem fs = dataFile.getFileSystem(context.getConf());
-      if (fs.exists(dataFile)) {
-        LOG.info("File " + dataFile + " already exists!");
-        FileStatus status = fs.getFileStatus(dataFile);
-        LOG.info("File size: " + status.getLen());
-      }
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
-      appender.enableStats();
-      appender.init();
-      appenderMap.put(partition, appender);
-    } else {
-      appender = appenderMap.get(partition);
-    }
-
-    return appender;
-  }
-
-  private Path getDataFile(int partition) {
-    return StorageUtil.concatPath(storeTablePath, ""+partition);
-  }
-
-  @Override
-  public Tuple next() throws IOException {
-    Tuple tuple;
-    Appender appender;
-    int partition;
-    while ((tuple = child.next()) != null) {
-      partition = partitioner.getPartition(tuple);
-      appender = getAppender(partition);
-      appender.addTuple(tuple);
-    }
-    
-    List<TableStats> statSet = new ArrayList<TableStats>();
-    for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
-      int partNum = entry.getKey();
-      Appender app = entry.getValue();
-      app.flush();
-      app.close();
-      statSet.add(app.getStats());
-      if (app.getStats().getNumRows() > 0) {
-        context.addRepartition(partNum, getDataFile(partNum).getName());
-      }
-    }
-    
-    // Collect and aggregated statistics data
-    TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
-    context.setResultStats(aggregated);
-    
-    return null;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    // nothing to do   
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
index 866738d..b67f45c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/Partitioner.java
@@ -25,7 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.tajo.storage.Tuple;
 
 public abstract class Partitioner {
-  protected final int [] partitionKeys;
+  protected final int [] partitionKeyIds;
   protected final int numPartitions;
   
   public Partitioner(final int [] keyList, final int numPartitions) {
@@ -35,7 +35,7 @@ public abstract class Partitioner {
         "At least one partition key must be specified.");
     Preconditions.checkArgument(numPartitions > 0, 
         "The number of partitions must be positive: %s", numPartitions);
-    this.partitionKeys = keyList;
+    this.partitionKeyIds = keyList;
     this.numPartitions = numPartitions;    
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
new file mode 100644
index 0000000..13573eb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * <code>RangeShuffleFileWriteExec</code> is a physical executor to store intermediate data into a number of
+ * file outputs associated with shuffle key ranges. The file outputs are stored with index files on local disks.
+ * <code>RangeShuffleFileWriteExec</code> is implemented with an assumption that input tuples are sorted in an
+ * specified order of shuffle keys.
+ */
+public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
+  private static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class);
+  private final SortSpec[] sortSpecs;
+  private int [] indexKeys = null;
+  private Schema keySchema;
+
+  private BSTIndex.BSTIndexWriter indexWriter;
+  private TupleComparator comp;
+  private FileAppender appender;
+  private TableMeta meta;
+
+  public RangeShuffleFileWriteExec(final TaskAttemptContext context, final AbstractStorageManager sm,
+                                   final PhysicalExec child, final Schema inSchema, final Schema outSchema,
+                                   final SortSpec[] sortSpecs) throws IOException {
+    super(context, inSchema, outSchema, child);
+    this.sortSpecs = sortSpecs;
+  }
+
+  public void init() throws IOException {
+    super.init();
+
+    indexKeys = new int[sortSpecs.length];
+    keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
+
+    Column col;
+    for (int i = 0 ; i < sortSpecs.length; i++) {
+      col = sortSpecs[i].getSortKey();
+      indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
+    }
+
+    BSTIndex bst = new BSTIndex(new TajoConf());
+    this.comp = new TupleComparator(keySchema, sortSpecs);
+    Path storeTablePath = new Path(context.getWorkDir(), "output");
+    LOG.info("Output data directory: " + storeTablePath);
+    this.meta = CatalogUtil.newTableMeta(context.getDataChannel() != null ?
+        context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
+    FileSystem fs = new RawLocalFileSystem();
+    fs.mkdirs(storeTablePath);
+    this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+        outSchema, new Path(storeTablePath, "output"));
+    this.appender.enableStats();
+    this.appender.init();
+    this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
+        BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
+    this.indexWriter.setLoadNum(100);
+    this.indexWriter.open();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+    Tuple prevKeyTuple = null;
+    long offset;
+
+
+    while((tuple = child.next()) != null) {
+      offset = appender.getOffset();
+      appender.addTuple(tuple);
+      keyTuple = new VTuple(keySchema.getColumnNum());
+      RowStoreUtil.project(tuple, keyTuple, indexKeys);
+      if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
+        indexWriter.write(keyTuple, offset);
+        prevKeyTuple = keyTuple;
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+  }
+
+  public void close() throws IOException {
+    super.close();
+
+    appender.flush();
+    appender.close();
+    indexWriter.flush();
+    indexWriter.close();
+
+    // Collect statistics data
+    context.setResultStats(appender.getStats());
+    context.addShuffleFileOutput(0, context.getTaskId().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 673e0b5..affdc86 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -21,33 +21,25 @@
  */
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManagerFactory;
-import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 /**
- * This physical operator stores a relation into a table.
+ * This is a physical executor to store a table part into a specified storage.
  */
 public class StoreTableExec extends UnaryPhysicalExec {
-  private final StoreTableNode plan;
+  private final PersistentStoreNode plan;
   private Appender appender;
   private Tuple tuple;
-  
-  /**
-   * @throws java.io.IOException
-   *
-   */
-  public StoreTableExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
+
+  public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
   }
@@ -62,16 +54,9 @@ public class StoreTableExec extends UnaryPhysicalExec {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
 
-    if (context.isInterQuery()) {
-      Path storeTablePath = new Path(context.getWorkDir(), "out");
-      FileSystem fs = new RawLocalFileSystem();
-      fs.mkdirs(storeTablePath);
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
-          StorageUtil.concatPath(storeTablePath, "0"));
-    } else {
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
-          context.getOutputPath());
-    }
+    appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
+        context.getOutputPath());
+
     appender.enableStats();
     appender.init();
   }
@@ -100,8 +85,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
     appender.close();
 
     // Collect statistics data
-//    ctx.addStatSet(annotation.getType().toString(), appender.getStats());
     context.setResultStats(appender.getStats());
-    context.addRepartition(0, context.getTaskId().toString());
+    context.addShuffleFileOutput(0, context.getTaskId().toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 5e7c82f..85dfa2e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -32,7 +32,6 @@ import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
 import org.apache.tajo.master.FragmentPair;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
@@ -50,6 +49,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
 
 public class QueryUnit implements EventHandler<TaskEvent> {
   /** Class Logger */
@@ -65,7 +65,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	private Map<String, Set<FragmentProto>> fragMap;
 	private Map<String, Set<URI>> fetchMap;
 	
-  private List<Partition> partitions;
+  private List<ShuffleFileOutput> partitions;
 	private TableStats stats;
   private final boolean isLeafTask;
   private List<IntermediateEntry> intermediateData;
@@ -127,7 +127,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 		scan = new ArrayList<ScanNode>();
     fetchMap = Maps.newHashMap();
     fragMap = Maps.newHashMap();
-    partitions = new ArrayList<Partition>();
+    partitions = new ArrayList<ShuffleFileOutput>();
     attempts = Collections.emptyMap();
     lastAttemptId = null;
     nextAttempt = -1;
@@ -329,7 +329,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	  this.stats = stats;
 	}
 	
-	public void setPartitions(List<Partition> partitions) {
+	public void setPartitions(List<ShuffleFileOutput> partitions) {
 	  this.partitions = Collections.unmodifiableList(partitions);
 	}
 	
@@ -337,7 +337,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	  return this.stats;
 	}
 	
-	public List<Partition> getPartitions() {
+	public List<ShuffleFileOutput> getPartitions() {
 	  return this.partitions;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 0c50704..f65f810 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
@@ -40,6 +39,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+
 public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
 
   private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
@@ -186,13 +187,13 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   }
 
   private void fillTaskStatistics(TaskCompletionReport report) {
-    if (report.getPartitionsCount() > 0) {
-      this.getQueryUnit().setPartitions(report.getPartitionsList());
+    if (report.getShuffleFileOutputsCount() > 0) {
+      this.getQueryUnit().setPartitions(report.getShuffleFileOutputsList());
 
       List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-      for (Partition p : report.getPartitionsList()) {
+      for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
         IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
-            getId().getId(), p.getPartitionKey(), getHost(), getPullServerPort());
+            getId().getId(), p.getPartId(), getHost(), getPullServerPort());
         partitions.add(entry);
       }
       this.getQueryUnit().setIntermediateData(partitions);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 2185acf..bec0620 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -55,9 +55,9 @@ import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.HASH_PARTITION;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.RANGE_PARTITION;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE;
 
 /**
  * Repartitioner creates non-leaf tasks and shuffles intermediate data.
@@ -223,7 +223,7 @@ public class Repartitioner {
       for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
         Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
             execBlock.getId(),
-            partitionId, HASH_PARTITION,
+            partitionId, HASH_SHUFFLE,
             requestPerNode.getValue());
         fetchURIs.addAll(uris);
       }
@@ -257,9 +257,9 @@ public class Repartitioner {
                                                       MasterPlan masterPlan, SubQuery subQuery, SubQuery childSubQuery,
                                                       DataChannel channel, int maxNum)
       throws InternalException {
-    if (channel.getPartitionType() == HASH_PARTITION) {
+    if (channel.getShuffleType() == HASH_SHUFFLE) {
       scheduleHashPartitionedFetches(schedulerContext, masterPlan, subQuery, channel, maxNum);
-    } else if (channel.getPartitionType() == RANGE_PARTITION) {
+    } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
       scheduleRangePartitionedFetches(schedulerContext, subQuery, childSubQuery, channel, maxNum);
     } else {
       throw new InternalException("Cannot support partition type");
@@ -281,7 +281,7 @@ public class Repartitioner {
 
     SortNode sortNode = PlannerUtil.findTopNode(childSubQuery.getBlock().getPlan(), NodeType.SORT);
     SortSpec [] sortSpecs = sortNode.getSortKeys();
-    Schema sortSchema = new Schema(channel.getPartitionKey());
+    Schema sortSchema = new Schema(channel.getShuffleKeys());
 
     // calculate the number of maximum query ranges
     TupleRange mergedRange = TupleUtil.columnStatToRange(channel.getSchema(), sortSchema, stat.getColumnStats());
@@ -420,7 +420,7 @@ public class Repartitioner {
         hashedByHost = hashByHost(interm.getValue());
         for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
           Collection<URI> uris = createHashFetchURL(e.getKey(), block.getId(),
-              interm.getKey(), channel.getPartitionType(), e.getValue());
+              interm.getKey(), channel.getShuffleType(), e.getValue());
 
           if (finalFetchURI.containsKey(interm.getKey())) {
             finalFetchURI.get(interm.getKey()).addAll(uris);
@@ -449,7 +449,7 @@ public class Repartitioner {
   }
 
   public static Collection<URI> createHashFetchURL(String hostAndPort, ExecutionBlockId ebid,
-                                       int partitionId, PartitionType type, List<IntermediateEntry> entries) {
+                                       int partitionId, ShuffleType type, List<IntermediateEntry> entries) {
     String scheme = "http://";
     StringBuilder urlPrefix = new StringBuilder(scheme);
     urlPrefix.append(hostAndPort).append("/?")
@@ -457,9 +457,9 @@ public class Repartitioner {
         .append("&sid=").append(ebid.getId())
         .append("&p=").append(partitionId)
         .append("&type=");
-    if (type == HASH_PARTITION) {
+    if (type == HASH_SHUFFLE) {
       urlPrefix.append("h");
-    } else if (type == RANGE_PARTITION) {
+    } else if (type == RANGE_SHUFFLE) {
       urlPrefix.append("r");
     }
     urlPrefix.append("&ta=");
@@ -542,22 +542,22 @@ public class Repartitioner {
     // set the partition number for the current logicalUnit
     // TODO: the union handling is required when a join has unions as its child
     MasterPlan masterPlan = subQuery.getMasterPlan();
-    keys = channel.getPartitionKey();
+    keys = channel.getShuffleKeys();
     if (!masterPlan.isRoot(subQuery.getBlock()) ) {
       ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
       if (parentBlock.getPlan().getType() == NodeType.JOIN) {
-        channel.setPartitionNum(desiredNum);
+        channel.setShuffleOutputNum(desiredNum);
       }
     }
 
 
     // set the partition number for group by and sort
-    if (channel.getPartitionType() == HASH_PARTITION) {
+    if (channel.getShuffleType() == HASH_SHUFFLE) {
       if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
         GroupbyNode groupby = (GroupbyNode) execBlock.getPlan();
         keys = groupby.getGroupingColumns();
       }
-    } else if (channel.getPartitionType() == RANGE_PARTITION) {
+    } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
       if (execBlock.getPlan().getType() == NodeType.SORT) {
         SortNode sort = (SortNode) execBlock.getPlan();
         keys = new Column[sort.getSortKeys().length];
@@ -568,11 +568,11 @@ public class Repartitioner {
     }
     if (keys != null) {
       if (keys.length == 0) {
-        channel.setPartitionKey(new Column[] {});
-        channel.setPartitionNum(1);
+        channel.setShuffleKeys(new Column[]{});
+        channel.setShuffleOutputNum(1);
       } else {
-        channel.setPartitionKey(keys);
-        channel.setPartitionNum(desiredNum);
+        channel.setShuffleKeys(keys);
+        channel.setShuffleOutputNum(desiredNum);
       }
     }
     return subQuery;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 5979fbc..ef3a11f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -61,7 +61,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
 
 
 /**
@@ -552,7 +552,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
      * methods and the number of partitions to a given subquery.
      */
     private static void setRepartitionIfNecessary(SubQuery subQuery, DataChannel channel) {
-      if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
+      if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
         int numTasks = calculatePartitionNum(subQuery, channel);
         Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks, channel);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bb7e6b6b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index a1c383a..eff384b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -104,7 +104,7 @@ public class Task {
   private AtomicBoolean progressFlag = new AtomicBoolean(false);
 
   // TODO - to be refactored
-  private PartitionType partitionType = null;
+  private ShuffleType shuffleType = null;
   private Schema finalSchema = null;
   private TupleComparator sortComp = null;
 
@@ -163,9 +163,9 @@ public class Task {
     interQuery = request.getProto().getInterQuery();
     if (interQuery) {
       context.setInterQuery();
-      this.partitionType = context.getDataChannel().getPartitionType();
+      this.shuffleType = context.getDataChannel().getShuffleType();
 
-      if (partitionType == PartitionType.RANGE_PARTITION) {
+      if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
         SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
         this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
@@ -185,7 +185,7 @@ public class Task {
     LOG.info("==================================");
     LOG.info("* Subquery " + request.getId() + " is initialized");
     LOG.info("* InterQuery: " + interQuery
-        + (interQuery ? ", Use " + this.partitionType  + " partitioning":""));
+        + (interQuery ? ", Use " + this.shuffleType + " partitioning":""));
 
     LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
     LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
@@ -326,13 +326,13 @@ public class Task {
       builder.setResultStats(new TableStats().getProto());
     }
 
-    Iterator<Entry<Integer,String>> it = context.getRepartitions();
+    Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs();
     if (it.hasNext()) {
       do {
         Entry<Integer,String> entry = it.next();
-        Partition.Builder part = Partition.newBuilder();
-        part.setPartitionKey(entry.getKey());
-        builder.addPartitions(part.build());
+        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
+        part.setPartId(entry.getKey());
+        builder.addShuffleFileOutputs(part.build());
       } while (it.hasNext());
     }
 


Mime
View raw message