tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [2/2] git commit: TAJO-992: Reduce number of hash shuffle output file. Closes #115
Date Thu, 21 Aug 2014 03:00:26 GMT
TAJO-992: Reduce number of hash shuffle output file.
Closes #115


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f3d63b46
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f3d63b46
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f3d63b46

Branch: refs/heads/master
Commit: f3d63b46b7f66464aceb603bdce159e5b52904c4
Parents: ea5ce54
Author: Hyoungjun Kim <hjkim@apache.org>
Authored: Thu Aug 21 11:52:11 2014 +0900
Committer: Hyoungjun Kim <hjkim@apache.org>
Committed: Thu Aug 21 11:56:17 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   6 +-
 .../main/java/org/apache/tajo/util/Pair.java    |  32 +-
 .../engine/planner/PhysicalPlannerImpl.java     |   1 +
 .../planner/physical/ExternalSortExec.java      |   3 +
 .../physical/HashShuffleFileWriteExec.java      | 125 ++++---
 .../org/apache/tajo/master/GlobalEngine.java    |   2 +-
 .../apache/tajo/master/querymaster/Query.java   |   3 +-
 .../tajo/master/querymaster/QueryMaster.java    |   9 +-
 .../querymaster/QueryMasterManagerService.java  |  13 +
 .../tajo/master/querymaster/QueryUnit.java      |  83 +++++
 .../tajo/master/querymaster/Repartitioner.java  | 346 ++++++++++++-------
 .../tajo/master/querymaster/SubQuery.java       |  77 +++--
 .../java/org/apache/tajo/worker/FetchImpl.java  |  32 ++
 .../java/org/apache/tajo/worker/Fetcher.java    |   1 +
 .../java/org/apache/tajo/worker/TajoWorker.java |  14 +
 .../tajo/worker/TajoWorkerManagerService.java   |   6 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  19 +-
 .../apache/tajo/worker/TaskAttemptContext.java  |  34 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  22 +-
 .../apache/tajo/worker/TaskRunnerManager.java   | 119 ++++++-
 .../src/main/proto/QueryMasterProtocol.proto    |   1 +
 .../src/main/proto/TajoWorkerProtocol.proto     |  30 ++
 .../planner/physical/TestPhysicalPlanner.java   |  64 ++--
 .../tajo/engine/query/TestGroupByQuery.java     |   8 +-
 .../tajo/engine/query/TestTablePartitions.java  |  72 ++++
 .../apache/tajo/master/TestRepartitioner.java   | 309 +++++++++++++++--
 .../querymaster/TestIntermediateEntry.java      |  53 +++
 .../querymaster/TestQueryUnitStatusUpdate.java  |   6 +-
 .../org/apache/tajo/worker/TestFetcher.java     |  10 +-
 .../tajo/storage/AbstractStorageManager.java    |   1 -
 .../tajo/storage/HashShuffleAppender.java       | 204 +++++++++++
 .../storage/HashShuffleAppenderManager.java     | 225 ++++++++++++
 .../java/org/apache/tajo/storage/RawFile.java   |   1 +
 .../tajo/pullserver/TajoPullServerService.java  |  61 +++-
 35 files changed, 1684 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 07119bd..acd7094 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-992: Reduce number of hash shuffle output file.(Hyoungjun Kim)
+
     TAJO-1008: Protocol buffer De/Serialization for EvalNode. (hyunsik)
 
     TAJO-984: Improve the default data type handling in RowStoreUtil.

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index f4229e7..77c2363 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -223,7 +223,9 @@ public class TajoConf extends Configuration {
     SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  8192),
     SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120),
     SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20),
-
+    SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 10000),
+    SHUFFLE_HASH_APPENDER_PAGE_VOLUME("tajo.shuffle.hash.appender.page.volumn-mb", 30),
+    HASH_SHUFFLE_PARENT_DIRS("tajo.hash.shuffle.parent.dirs.count", 10),
 
     // Storage Configuration --------------------------------------------------
     ROWFILE_SYNC_INTERVAL("rowfile.sync.interval", 100),
@@ -231,7 +233,7 @@ public class TajoConf extends Configuration {
     // for RCFile
     HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true),
 
-    // for Storage Manager ----------------------------------------------------
+    // for Storage Manager v2
     STORAGE_MANAGER_VERSION_2("tajo.storage-manager.v2", false),
     STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT("tajo.storage-manager.max-read-bytes", 8 * 1024 * 1024),
     STORAGE_MANAGER_DISK_SCHEDULER_REPORT_INTERVAL("tajo.storage-manager.disk-scheduler.report-interval", 60 * 1000),

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-common/src/main/java/org/apache/tajo/util/Pair.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Pair.java b/tajo-common/src/main/java/org/apache/tajo/util/Pair.java
index 72cfc5c..7b5f8ad 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/Pair.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Pair.java
@@ -19,19 +19,37 @@
 package org.apache.tajo.util;
 
 public class Pair<T1, T2> {
-  private final T1 value1;
-  private final T2 value2;
+  private T1 first;
+  private T2 second;
 
-  public Pair(T1 value1, T2 value2) {
-    this.value1 = value1;
-    this.value2 = value2;
+  public Pair(T1 first, T2 second) {
+    this.first = first;
+    this.second = second;
   }
 
   public T1 getFirst() {
-    return value1;
+    return first;
   }
 
   public T2 getSecond() {
-    return value2;
+    return second;
+  }
+
+  public void setFirst(T1 first) {
+    this.first = first;
+  }
+
+  public void setSecond(T2 second) {
+    this.second = second;
+  }
+
+  @Override
+  public String toString() {
+    return first + "," + second;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 9f533e2..19a16d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -1001,6 +1001,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     // if the relation size is less than the threshold,
     // the hash aggregation will be used.
+    LOG.info("Aggregation:estimatedSize=" + estimatedSize + ", threshold=" + threshold);
     if (estimatedSize <= threshold) {
       LOG.info("The planner chooses [Hash Aggregation]");
       return createInMemoryHashAggregation(context, groupbyNode, subOp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 31cb3b6..6215527 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -703,6 +703,9 @@ public class ExternalSortExec extends SortExec {
         return mergerInputStats;
       }
       TableStats leftInputStats = leftScan.getInputStats();
+      if (mergerInputStats == null) {
+        mergerInputStats = new TableStats();
+      }
       mergerInputStats.setNumBytes(0);
       mergerInputStats.setReadBytes(0);
       mergerInputStats.setNumRows(0);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 44e8646..cee2b77 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -21,17 +21,16 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.logical.ShuffleFileWriteNode;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.HashShuffleAppender;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -49,11 +48,13 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
   private ShuffleFileWriteNode plan;
   private final TableMeta meta;
   private Partitioner partitioner;
-  private final Path storeTablePath;
-  private Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
+//  private final Path storeTablePath;
+  private Map<Integer, HashShuffleAppender> appenderMap = new HashMap<Integer, HashShuffleAppender>();
   private final int numShuffleOutputs;
   private final int [] shuffleKeyIds;
-  
+  private HashShuffleAppenderManager hashShuffleAppenderManager;
+  private int numHashShuffleBufferTuples;
+
   public HashShuffleFileWriteExec(TaskAttemptContext context, final AbstractStorageManager sm,
                                   final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
@@ -73,71 +74,83 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
       i++;
     }
     this.partitioner = new HashPartitioner(shuffleKeyIds, numShuffleOutputs);
-    storeTablePath = new Path(context.getWorkDir(), "output");
+    this.hashShuffleAppenderManager = context.getHashShuffleAppenderManager();
+    this.numHashShuffleBufferTuples = context.getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_BUFFER_SIZE);
   }
 
   @Override
   public void init() throws IOException {
     super.init();
-    FileSystem fs = new RawLocalFileSystem();
-    fs.mkdirs(storeTablePath);
   }
   
-  private Appender getAppender(int partId) throws IOException {
-    Appender appender = appenderMap.get(partId);
-
+  private HashShuffleAppender getAppender(int partId) throws IOException {
+    HashShuffleAppender appender = appenderMap.get(partId);
     if (appender == null) {
-      Path dataFile = getDataFile(partId);
-      FileSystem fs = dataFile.getFileSystem(context.getConf());
-      if (fs.exists(dataFile)) {
-        LOG.info("File " + dataFile + " already exists!");
-        FileStatus status = fs.getFileStatus(dataFile);
-        LOG.info("File size: " + status.getLen());
-      }
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
-      appender.enableStats();
-      appender.init();
+      appender = hashShuffleAppenderManager.getAppender(context.getConf(),
+          context.getQueryId().getQueryUnitId().getExecutionBlockId(), partId, meta, outSchema);
       appenderMap.put(partId, appender);
-    } else {
-      appender = appenderMap.get(partId);
     }
-
     return appender;
   }
 
-  private Path getDataFile(int partId) {
-    return StorageUtil.concatPath(storeTablePath, ""+partId);
-  }
+//  Map<Integer, Long> partitionStats = new HashMap<Integer, Long>();
+  Map<Integer, List<Tuple>> partitionTuples = new HashMap<Integer, List<Tuple>>();
+  long writtenBytes = 0L;
 
   @Override
   public Tuple next() throws IOException {
-    Tuple tuple;
-    Appender appender;
-    int partId;
-    while ((tuple = child.next()) != null) {
-      partId = partitioner.getPartition(tuple);
-      appender = getAppender(partId);
-      appender.addTuple(tuple);
-    }
-    
-    List<TableStats> statSet = new ArrayList<TableStats>();
-    for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
-      int partNum = entry.getKey();
-      Appender app = entry.getValue();
-      app.flush();
-      app.close();
-      statSet.add(app.getStats());
-      if (app.getStats().getNumRows() > 0) {
-        context.addShuffleFileOutput(partNum, getDataFile(partNum).getName());
-        context.addPartitionOutputVolume(partNum, app.getStats().getNumBytes());
+    try {
+      Tuple tuple;
+      int partId;
+      int tupleCount = 0;
+      long numRows = 0;
+      while ((tuple = child.next()) != null) {
+        tupleCount++;
+        numRows++;
+
+        partId = partitioner.getPartition(tuple);
+        List<Tuple> partitionTupleList = partitionTuples.get(partId);
+        if (partitionTupleList == null) {
+          partitionTupleList = new ArrayList<Tuple>(1000);
+          partitionTuples.put(partId, partitionTupleList);
+        }
+        try {
+          partitionTupleList.add(tuple.clone());
+        } catch (CloneNotSupportedException e) {
+        }
+        if (tupleCount >= numHashShuffleBufferTuples) {
+          for (Map.Entry<Integer, List<Tuple>> entry : partitionTuples.entrySet()) {
+            int appendPartId = entry.getKey();
+            HashShuffleAppender appender = getAppender(appendPartId);
+            int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue());
+            writtenBytes += appendedSize;
+            entry.getValue().clear();
+          }
+          tupleCount = 0;
+        }
+      }
+
+      // processing remained tuples
+      for (Map.Entry<Integer, List<Tuple>> entry : partitionTuples.entrySet()) {
+        int appendPartId = entry.getKey();
+        HashShuffleAppender appender = getAppender(appendPartId);
+        int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue());
+        writtenBytes += appendedSize;
+        entry.getValue().clear();
       }
+
+      TableStats aggregated = (TableStats)child.getInputStats().clone();
+      aggregated.setNumBytes(writtenBytes);
+      aggregated.setNumRows(numRows);
+      context.setResultStats(aggregated);
+
+      partitionTuples.clear();
+
+      return null;
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new IOException(e);
     }
-    
-    // Collect and aggregated statistics data
-    TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
-    context.setResultStats(aggregated);
-    
-    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 37a56ba..fc1be9b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -296,7 +296,7 @@ public class GlobalEngine extends AbstractService {
     }
 
     TaskAttemptContext taskAttemptContext =
-        new TaskAttemptContext(queryContext, null, (CatalogProtos.FragmentProto[]) null, stagingDir);
+        new TaskAttemptContext(queryContext, null, null, (CatalogProtos.FragmentProto[]) null, stagingDir);
     taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
 
     EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 4f4aaab..7063197 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -799,12 +799,13 @@ public class Query implements EventHandler<QueryEvent> {
         if (castEvent.getState() == SubQueryState.SUCCEEDED &&  // latest subquery succeeded
             query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
             hasNext(query)) {                                   // there remains at least one subquery.
+          query.getSubQuery(castEvent.getExecutionBlockId()).waitingIntermediateReport();
           executeNextBlock(query);
         } else { // if a query is completed due to finished, kill, failure, or error
           query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
         }
       } catch (Throwable t) {
-        LOG.error(t);
+        LOG.error(t.getMessage(), t);
         query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 6a980de..9f90b05 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -37,6 +37,8 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.ExecutionBlockReport;
+import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.rpc.CallFuture;
@@ -57,6 +59,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
@@ -180,12 +183,14 @@ public class QueryMaster extends CompositeService implements EventHandler {
   }
 
   protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) {
-    LOG.info("cleanup executionBlocks : " + executionBlockIds);
+    LOG.info("cleanup executionBlocks: " + executionBlockIds);
     NettyClientBase rpc = null;
     List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
     TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
     builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
     TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
+
+    List<IntermediateEntryProto> intermediateEntries = new ArrayList<IntermediateEntryProto>();
     for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
       try {
         if (worker.getPeerRpcPort() == 0) continue;
@@ -196,7 +201,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
         tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get());
       } catch (Exception e) {
-        LOG.error(e.getMessage());
+        continue;
       } finally {
         connPool.releaseConnection(rpc);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index ec975d8..4f3c2ab 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -40,6 +40,8 @@ import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 public class QueryMasterManagerService extends CompositeService
     implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
@@ -215,6 +217,17 @@ public class QueryMasterManagerService extends CompositeService
   }
 
   @Override
+  public void doneExecutionBlock(
+      RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request,
+      RpcCallback<PrimitiveProtos.BoolProto> done) {
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
+    if (queryMasterTask != null) {
+      ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
+      queryMasterTask.getQuery().getSubQuery(ebId).receiveExecutionBlockReport(request);
+    }
+  }
+
+  @Override
   public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
                         RpcCallback<PrimitiveProtos.BoolProto> done) {
     QueryId queryId = new QueryId(request);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 8c953bd..f41fd0e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -34,12 +34,15 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.QueryUnitId;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.FragmentPair;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.FetchImpl;
 
@@ -667,6 +670,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       newPullHost.port = port;
       return newPullHost;
     }
+
+    @Override
+    public String toString() {
+      return host + ":" + port;
+    }
   }
 
   public static class IntermediateEntry {
@@ -676,6 +684,31 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     int partId;
     PullHost host;
     long volume;
+    List<Pair<Long, Integer>> pages;
+    List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
+
+    public IntermediateEntry(IntermediateEntryProto proto) {
+      this.ebId = new ExecutionBlockId(proto.getEbId());
+      this.taskId = proto.getTaskId();
+      this.attemptId = proto.getAttemptId();
+      this.partId = proto.getPartId();
+
+      String[] pullHost = proto.getHost().split(":");
+      this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
+      this.volume = proto.getVolume();
+
+      failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+      for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
+
+        failureRowNums.add(new Pair(eachFailure.getPagePos(),
+            new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
+      }
+
+      pages = new ArrayList<Pair<Long, Integer>>();
+      for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
+        pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
+      }
+    }
 
     public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
       this.taskId = taskId;
@@ -720,9 +753,59 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       return this.volume;
     }
 
+    public long setVolume(long volume) {
+      return this.volume = volume;
+    }
+
+    public List<Pair<Long, Integer>> getPages() {
+      return pages;
+    }
+
+    public void setPages(List<Pair<Long, Integer>> pages) {
+      this.pages = pages;
+    }
+
+    public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
+      return failureRowNums;
+    }
+
     @Override
     public int hashCode() {
       return Objects.hashCode(ebId, taskId, partId, attemptId, host);
     }
+
+    public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
+      List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
+
+      if (pages == null || pages.isEmpty()) {
+        return splits;
+      }
+      int pageSize = pages.size();
+
+      long currentOffset = -1;
+      long currentBytes = 0;
+
+      long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
+      for (int i = 0; i < pageSize; i++) {
+        Pair<Long, Integer> eachPage = pages.get(i);
+        if (currentOffset == -1) {
+          currentOffset = eachPage.getFirst();
+        }
+        if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
+          splits.add(new Pair(currentOffset, currentBytes));
+          currentOffset = eachPage.getFirst();
+          currentBytes = 0;
+          realSplitVolume = splitVolume;
+        }
+
+        currentBytes += eachPage.getSecond();
+      }
+
+      //add last
+      if (currentBytes > 0) {
+        splits.add(new Pair(currentOffset, currentBytes));
+      }
+      return splits;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 940170c..4deddee 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -31,6 +31,8 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
@@ -213,8 +215,6 @@ public class Repartitioner {
       LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join with all tables, base_table=%s, base_volume=%d",
           scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx]));
       scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments);
-
-
     } else if (!execBlock.getBroadcastTables().isEmpty()) { // If some relations of this EB are broadcasted
       boolean hasNonLeafNode = false;
       List<Integer> largeScanIndexList = new ArrayList<Integer>();
@@ -251,7 +251,7 @@ public class Repartitioner {
           for (Integer eachId : largeScanIndexList) {
             largeTableNames += scans[eachId].getTableName() + ",";
           }
-          throw new IOException("Broadcase join with leaf node should have only one large table, " +
+          throw new IOException("Broadcast join with leaf node should have only one large table, " +
               "but " + largeScanIndexList.size() + ", tables=" + largeTableNames);
         }
         int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0);
@@ -309,7 +309,7 @@ public class Repartitioner {
     MasterPlan masterPlan = subQuery.getMasterPlan();
     ExecutionBlock execBlock = subQuery.getBlock();
     // The hash map is modeling as follows:
-    // <Part Id, <EbId, Intermediate Data>>
+    // <Part Id, <EbId, List<Intermediate Data>>>
     Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries =
         new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
 
@@ -324,42 +324,42 @@ public class Repartitioner {
         scanEbId = childBlock.getId();
       }
       SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId());
-      for (QueryUnit task : childExecSM.getQueryUnits()) {
-        if (task.getIntermediateData() != null && !task.getIntermediateData().isEmpty()) {
-          for (IntermediateEntry intermEntry : task.getIntermediateData()) {
-            intermEntry.setEbId(childBlock.getId());
-            if (hashEntries.containsKey(intermEntry.getPartId())) {
-              Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
-                  hashEntries.get(intermEntry.getPartId());
-
-              if (tbNameToInterm.containsKey(scanEbId)) {
-                tbNameToInterm.get(scanEbId).add(intermEntry);
-              } else {
-                tbNameToInterm.put(scanEbId, TUtil.newList(intermEntry));
-              }
+
+      if (childExecSM.getHashShuffleIntermediateEntries() != null &&
+          !childExecSM.getHashShuffleIntermediateEntries().isEmpty()) {
+        for (IntermediateEntry intermEntry: childExecSM.getHashShuffleIntermediateEntries()) {
+          intermEntry.setEbId(childBlock.getId());
+          if (hashEntries.containsKey(intermEntry.getPartId())) {
+            Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
+                hashEntries.get(intermEntry.getPartId());
+
+            if (tbNameToInterm.containsKey(scanEbId)) {
+              tbNameToInterm.get(scanEbId).add(intermEntry);
             } else {
-              Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
-                  new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
               tbNameToInterm.put(scanEbId, TUtil.newList(intermEntry));
-              hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
             }
-          }
-        } else {
-          //if no intermidatedata(empty table), make empty entry
-          int emptyPartitionId = 0;
-          if (hashEntries.containsKey(emptyPartitionId)) {
-            Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(emptyPartitionId);
-            if (tbNameToInterm.containsKey(scanEbId))
-              tbNameToInterm.get(scanEbId).addAll(new ArrayList<IntermediateEntry>());
-            else
-              tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
           } else {
             Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
                 new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
-            tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
-            hashEntries.put(emptyPartitionId, tbNameToInterm);
+            tbNameToInterm.put(scanEbId, TUtil.newList(intermEntry));
+            hashEntries.put(intermEntry.getPartId(), tbNameToInterm);
           }
         }
+      } else {
+        //if no intermidatedata(empty table), make empty entry
+        int emptyPartitionId = 0;
+        if (hashEntries.containsKey(emptyPartitionId)) {
+          Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = hashEntries.get(emptyPartitionId);
+          if (tbNameToInterm.containsKey(scanEbId))
+            tbNameToInterm.get(scanEbId).addAll(new ArrayList<IntermediateEntry>());
+          else
+            tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
+        } else {
+          Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
+              new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
+          tbNameToInterm.put(scanEbId, new ArrayList<IntermediateEntry>());
+          hashEntries.put(emptyPartitionId, tbNameToInterm);
+        }
       }
     }
 
@@ -406,6 +406,53 @@ public class Repartitioner {
   }
 
   /**
+   * merge intermediate entry by ebid, pullhost
+   * @param hashEntries
+   * @return
+   */
+  public static Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergeIntermediateByPullHost(
+      Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries) {
+    Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries =
+        new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>();
+
+    for(Entry<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> entry: hashEntries.entrySet()) {
+      Integer partId = entry.getKey();
+      for (Entry<ExecutionBlockId, List<IntermediateEntry>> partEntry: entry.getValue().entrySet()) {
+        List<IntermediateEntry> intermediateList = partEntry.getValue();
+        if (intermediateList == null || intermediateList.isEmpty()) {
+          continue;
+        }
+        ExecutionBlockId ebId = partEntry.getKey();
+        // EBID + PullHost -> IntermediateEntry
+        // In the case of union partEntry.getKey() return's delegated EBID.
+        // Intermediate entries are merged by real EBID.
+        Map<String, IntermediateEntry> ebMerged = new HashMap<String, IntermediateEntry>();
+
+        for (IntermediateEntry eachIntermediate: intermediateList) {
+          String ebMergedKey = eachIntermediate.getEbId().toString() + eachIntermediate.getPullHost().getPullAddress();
+          IntermediateEntry intermediateEntryPerPullHost = ebMerged.get(ebMergedKey);
+          if (intermediateEntryPerPullHost == null) {
+            intermediateEntryPerPullHost = new IntermediateEntry(-1, -1, partId, eachIntermediate.getPullHost());
+            intermediateEntryPerPullHost.setEbId(eachIntermediate.getEbId());
+            ebMerged.put(ebMergedKey, intermediateEntryPerPullHost);
+          }
+          intermediateEntryPerPullHost.setVolume(intermediateEntryPerPullHost.getVolume() + eachIntermediate.getVolume());
+        }
+
+        List<IntermediateEntry> ebIntermediateEntries = new ArrayList<IntermediateEntry>(ebMerged.values());
+
+        Map<ExecutionBlockId, List<IntermediateEntry>> mergedPartEntries = mergedHashEntries.get(partId);
+        if (mergedPartEntries == null) {
+          mergedPartEntries = new HashMap<ExecutionBlockId, List<IntermediateEntry>>();
+          mergedHashEntries.put(partId, mergedPartEntries);
+        }
+        mergedPartEntries.put(ebId, ebIntermediateEntries);
+      }
+    }
+    return mergedHashEntries;
+  }
+
+  /**
    * It creates a number of fragments for all partitions.
    */
   public static List<FileFragment> getFragmentsFromPartitionedTable(AbstractStorageManager sm,
@@ -711,30 +758,27 @@ public class Repartitioner {
     fragments.add(frag);
     SubQuery.scheduleFragments(subQuery, fragments);
 
-    Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost;
     Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer, FetchGroupMeta>();
     Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId,
         List<IntermediateEntry>>();
 
     for (ExecutionBlock block : masterPlan.getChilds(execBlock)) {
       List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-      for (QueryUnit tasks : subQuery.getContext().getSubQuery(block.getId()).getQueryUnits()) {
-        if (tasks.getIntermediateData() != null) {
-          partitions.addAll(tasks.getIntermediateData());
-
-          // In scattered hash shuffle, Collecting each IntermediateEntry
-          if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
-            if (intermediates.containsKey(block.getId())) {
-              intermediates.get(block.getId()).addAll(tasks.getIntermediateData());
-            } else {
-              intermediates.put(block.getId(), tasks.getIntermediateData());
-            }
-          }
+      partitions.addAll(subQuery.getContext().getSubQuery(block.getId()).getHashShuffleIntermediateEntries());
+
+      // In scattered hash shuffle, Collecting each IntermediateEntry
+      if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
+        if (intermediates.containsKey(block.getId())) {
+          intermediates.get(block.getId()).addAll(partitions);
+        } else {
+          intermediates.put(block.getId(), partitions);
         }
       }
+
+      // make FetchImpl per PullServer, PartId
       Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
       for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
-        hashedByHost = hashByHost(interm.getValue());
+        Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue());
         for (Entry<QueryUnit.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
 
           FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
@@ -866,56 +910,49 @@ public class Repartitioner {
   public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext,
        SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
        String tableName) {
-    int i = 0;
     long splitVolume = StorageUnit.MB *
         subQuery.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
+    long pageSize = StorageUnit.MB * 
+        subQuery.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
+    if (pageSize >= splitVolume) {
+      throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " +
+          "tajo.shuffle.hash.appender.page.volumn-mb");
+    }
+    List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
 
-    long sumNumBytes = 0L;
-    Map<Integer, List<FetchImpl>> fetches = new HashMap<Integer, List<FetchImpl>>();
-
-    // Step 1 : divide fetch uris into the the proper number of tasks by
-    // SCATTERED_HASH_SHUFFLE_SPLIT_VOLUME
+    long totalIntermediateSize = 0L;
     for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) {
-
-      // Step 2: Sort IntermediateEntry by partition id. After first sort,
-      // we need to sort again by PullHost address because of data locality.
-      Collections.sort(listEntry.getValue(), new IntermediateEntryComparator());
-      for (IntermediateEntry interm : listEntry.getValue()) {
-        FetchImpl fetch = new FetchImpl(interm.getPullHost(), SCATTERED_HASH_SHUFFLE,
-            listEntry.getKey(), interm.getPartId(), TUtil.newList(interm));
-        if (fetches.size() == 0) {
-          fetches.put(i, TUtil.newList(fetch));
+      // merge by PartitionId
+      Map<Integer, List<IntermediateEntry>> partitionIntermMap = new HashMap<Integer, List<IntermediateEntry>>();
+      for (IntermediateEntry eachInterm: listEntry.getValue()) {
+        totalIntermediateSize += eachInterm.getVolume();
+        int partId = eachInterm.getPartId();
+        List<IntermediateEntry> partitionInterms = partitionIntermMap.get(partId);
+        if (partitionInterms == null) {
+          partitionInterms = TUtil.newList(eachInterm);
+          partitionIntermMap.put(partId, partitionInterms);
         } else {
+          partitionInterms.add(eachInterm);
+        }
+      }
 
-          // Step 3: Compare current partition id with previous partition id because One task just
-          // can include one partitionId.
-          if (fetches.get(i).get(0).getPartitionId() != interm.getPartId()) {
-            i++;
-            fetches.put(i, TUtil.newList(fetch));
-            sumNumBytes = 0L;
-          } else {
-            if ((sumNumBytes + interm.getVolume()) < splitVolume) {
-              fetches.get(i).add(fetch);
-            } else {
-              i++;
-              fetches.put(i, TUtil.newList(fetch));
-              sumNumBytes = 0L;
-            }
-          }
+      // Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size
+      for (List<IntermediateEntry> partitionEntries : partitionIntermMap.values()) {
+        List<List<FetchImpl>> eachFetches = splitOrMergeIntermediates(listEntry.getKey(), partitionEntries,
+            splitVolume, pageSize);
+        if (eachFetches != null && !eachFetches.isEmpty()) {
+          fetches.addAll(eachFetches);
         }
-        sumNumBytes += interm.getVolume();
       }
     }
 
-    // Step 4 : Set the proper number of tasks to the estimated task num
     schedulerContext.setEstimatedTaskNum(fetches.size());
 
-    // Step 5 : Apply divided fetches
-    i = 0;
+    int i = 0;
     Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()];
-    for(Entry<Integer, List<FetchImpl>> entry : fetches.entrySet()) {
+    for(List<FetchImpl> entry : fetches) {
       fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
-      fetchesArray[i].put(tableName, entry.getValue());
+      fetchesArray[i].put(tableName, entry);
 
       SubQuery.scheduleFetches(subQuery, fetchesArray[i]);
       i++;
@@ -923,20 +960,65 @@ public class Repartitioner {
 
     LOG.info(subQuery.getId()
         + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()
-        + ", DeterminedTaskNum : " + fetches.size());
+        + ", Intermediate Size: " + totalIntermediateSize
+        + ", splitSize: " + splitVolume
+        + ", DeterminedTaskNum: " + fetches.size());
   }
 
-  static class IntermediateEntryComparator implements Comparator<IntermediateEntry> {
+  /**
+   * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has single element.
+   * @param ebId
+   * @param entries
+   * @param splitVolume
+   * @return
+   */
+  public static List<List<FetchImpl>> splitOrMergeIntermediates(
+      ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) {
+    // Each List<FetchImpl> has splitVolume size.
+    List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
+
+    Iterator<IntermediateEntry> iter = entries.iterator();
+    if (!iter.hasNext()) {
+      return null;
+    }
+    List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>();
+    long fetchListVolume = 0;
+
+    while (iter.hasNext()) {
+      IntermediateEntry currentInterm = iter.next();
 
-    @Override
-    public int compare(IntermediateEntry o1, IntermediateEntry o2) {
-      int cmp = Ints.compare(o1.getPartId(), o2.getPartId());
-      if (cmp != 0) {
-        return cmp;
+      long firstSplitVolume = splitVolume - fetchListVolume;
+      if (firstSplitVolume < pageSize) {
+        firstSplitVolume = splitVolume;
       }
 
-      return o1.getPullHost().getHost().compareTo(o2.getPullHost().getHost());
+      //Each Pair object in the splits variable is assigned to the next ExectionBlock's task.
+      //The first long value is a offset of the intermediate file and the second long value is length.
+      List<Pair<Long, Long>> splits = currentInterm.split(firstSplitVolume, splitVolume);
+      if (splits == null || splits.isEmpty()) {
+        break;
+      }
+
+      for (Pair<Long, Long> eachSplit: splits) {
+        if (fetchListVolume > 0 && fetchListVolume + eachSplit.getSecond() >= splitVolume) {
+          if (!fetchListForSingleTask.isEmpty()) {
+            fetches.add(fetchListForSingleTask);
+          }
+          fetchListForSingleTask = new ArrayList<FetchImpl>();
+          fetchListVolume = 0;
+        }
+        FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
+            ebId, currentInterm.getPartId(), TUtil.newList(currentInterm));
+        fetch.setOffset(eachSplit.getFirst());
+        fetch.setLength(eachSplit.getSecond());
+        fetchListForSingleTask.add(fetch);
+        fetchListVolume += eachSplit.getSecond();
+      }
     }
+    if (!fetchListForSingleTask.isEmpty()) {
+      fetches.add(fetchListForSingleTask);
+    }
+    return fetches;
   }
 
   public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) {
@@ -956,47 +1038,59 @@ public class Repartitioner {
       urlPrefix.append("s");
     }
 
+    if (fetch.getLength() >= 0) {
+      urlPrefix.append("&offset=").append(fetch.getOffset()).append("&length=").append(fetch.getLength());
+    }
+
     List<URI> fetchURLs = new ArrayList<URI>();
-    if(includeParts){
-      // If the get request is longer than 2000 characters,
-      // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
-      // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
-      // The below code transforms a long request to multiple requests.
-      List<String> taskIdsParams = new ArrayList<String>();
-      StringBuilder taskIdListBuilder = new StringBuilder();
-      List<Integer> taskIds = fetch.getTaskIds();
-      List<Integer> attemptIds = fetch.getAttemptIds();
-      boolean first = true;
-
-      for (int i = 0; i < taskIds.size(); i++) {
-        StringBuilder taskAttemptId = new StringBuilder();
-
-        if (!first) { // when comma is added?
-          taskAttemptId.append(",");
-        } else {
-          first = false;
-        }
+    if(includeParts) {
+      if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) {
+        fetchURLs.add(URI.create(urlPrefix.toString()));
+      } else {
+        // If the get request is longer than 2000 characters,
+        // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+        // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+        // The below code transforms a long request to multiple requests.
+        List<String> taskIdsParams = new ArrayList<String>();
+        StringBuilder taskIdListBuilder = new StringBuilder();
+        List<Integer> taskIds = fetch.getTaskIds();
+        List<Integer> attemptIds = fetch.getAttemptIds();
+        boolean first = true;
+
+        for (int i = 0; i < taskIds.size(); i++) {
+          StringBuilder taskAttemptId = new StringBuilder();
+
+          if (!first) { // when comma is added?
+            taskAttemptId.append(",");
+          } else {
+            first = false;
+          }
 
-        int taskId = taskIds.get(i);
-        int attemptId = attemptIds.get(i);
-        taskAttemptId.append(taskId).append("_").append(attemptId);
+          int taskId = taskIds.get(i);
+          if (taskId < 0) {
+            // In the case of hash shuffle each partition has single shuffle file per worker.
+            // TODO If file is large, consider multiple fetching(shuffle file can be split)
+            continue;
+          }
+          int attemptId = attemptIds.get(i);
+          taskAttemptId.append(taskId).append("_").append(attemptId);
 
-        if (taskIdListBuilder.length() + taskAttemptId.length()
-            > HTTP_REQUEST_MAXIMUM_LENGTH) {
+          if (taskIdListBuilder.length() + taskAttemptId.length()
+              > HTTP_REQUEST_MAXIMUM_LENGTH) {
+            taskIdsParams.add(taskIdListBuilder.toString());
+            taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
+          } else {
+            taskIdListBuilder.append(taskAttemptId);
+          }
+        }
+        // if the url params remain
+        if (taskIdListBuilder.length() > 0) {
           taskIdsParams.add(taskIdListBuilder.toString());
-          taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId);
-        } else {
-          taskIdListBuilder.append(taskAttemptId);
         }
-      }
-      // if the url params remain
-      if (taskIdListBuilder.length() > 0) {
-        taskIdsParams.add(taskIdListBuilder.toString());
-      }
-
-      urlPrefix.append("&ta=");
-      for (String param : taskIdsParams) {
-        fetchURLs.add(URI.create(urlPrefix + param));
+        urlPrefix.append("&ta=");
+        for (String param : taskIdsParams) {
+          fetchURLs.add(URI.create(urlPrefix + param));
+        }
       }
     } else {
       fetchURLs.add(URI.create(urlPrefix.toString()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 8eff8a4..51116bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -48,18 +48,23 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -250,6 +255,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private int killedObjectCount = 0;
   private int failedObjectCount = 0;
   private TaskSchedulerContext schedulerContext;
+  private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
+  private AtomicInteger completeReportReceived = new AtomicInteger(0);
 
   public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
     this.context = context;
@@ -1080,25 +1087,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.completedTaskCount++;
 
         if (taskEvent.getState() == TaskState.SUCCEEDED) {
-//          if (task.isLeafTask()) {
-//            subQuery.succeededObjectCount += task.getTotalFragmentNum();
-//          } else {
-//            subQuery.succeededObjectCount++;
-//          }
           subQuery.succeededObjectCount++;
         } else if (task.getState() == TaskState.KILLED) {
-//          if (task.isLeafTask()) {
-//            subQuery.killedObjectCount += task.getTotalFragmentNum();
-//          } else {
-//            subQuery.killedObjectCount++;
-//          }
           subQuery.killedObjectCount++;
         } else if (task.getState() == TaskState.FAILED) {
-//          if (task.isLeafTask()) {
-//            subQuery.failedObjectCount+= task.getTotalFragmentNum();
-//          } else {
-//            subQuery.failedObjectCount++;
-//          }
           subQuery.failedObjectCount++;
           // if at least one task is failed, try to kill all tasks.
           subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
@@ -1137,18 +1129,63 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
       List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
       List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
-      for (ExecutionBlock executionBlock :  childs){
+
+      for (ExecutionBlock executionBlock : childs) {
         ebIds.add(executionBlock.getId().getProto());
       }
 
-      try {
-        getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
-      } catch (Throwable e) {
-        LOG.error(e);
+      getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
+    }
+  }
+
+  public List<IntermediateEntry> getHashShuffleIntermediateEntries() {
+    return hashShuffleIntermediateEntries;
+  }
+
+  protected void waitingIntermediateReport() {
+    LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
+    synchronized(completeReportReceived) {
+      long startTime = System.currentTimeMillis();
+      while (true) {
+        if (completeReportReceived.get() >= tasks.size()) {
+          LOG.info(getId() + ", completed waiting IntermediateReport");
+          return;
+        } else {
+          try {
+            completeReportReceived.wait(10 * 1000);
+          } catch (InterruptedException e) {
+          }
+          long elapsedTime = System.currentTimeMillis() - startTime;
+          if (elapsedTime >= 120 * 1000) {
+            LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
+            abort(SubQueryState.FAILED);
+            return;
+          }
+        }
       }
     }
   }
 
+  public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
+    LOG.info(getId() + ", receiveExecutionBlockReport:" +  report.getSucceededTasks());
+    if (!report.getReportSuccess()) {
+      LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
+      abort(SubQueryState.FAILED);
+      return;
+    }
+    if (report.getIntermediateEntriesCount() > 0) {
+      synchronized (hashShuffleIntermediateEntries) {
+        for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
+          hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+        }
+      }
+    }
+    synchronized(completeReportReceived) {
+      completeReportReceived.addAndGet(report.getSucceededTasks());
+      completeReportReceived.notifyAll();
+    }
+  }
+
   private static class SubQueryCompleteTransition
       implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 7baae64..964da5d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -49,6 +49,9 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
   private List<Integer> taskIds;               // repeated, the task ids
   private List<Integer> attemptIds;            // repeated, the attempt ids
 
+  private long offset = -1;
+  private long length = -1;
+
   public FetchImpl() {
     builder = TajoWorkerProtocol.FetchProto.newBuilder();
     taskIds = new ArrayList<Integer>();
@@ -64,6 +67,14 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
         proto.getHasNext(),
         proto.getName(),
         proto.getTaskIdList(), proto.getAttemptIdList());
+
+    if (proto.hasOffset()) {
+      this.offset = proto.getOffset();
+    }
+
+    if (proto.hasLength()) {
+      this.length = proto.getLength();
+    }
   }
 
   public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, ExecutionBlockId executionBlockId,
@@ -120,6 +131,9 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
     Preconditions.checkArgument(taskIds.size() == attemptIds.size());
     builder.addAllTaskId(taskIds);
     builder.addAllAttemptId(attemptIds);
+
+    builder.setOffset(offset);
+    builder.setLength(length);
     return builder.build();
   }
 
@@ -202,6 +216,22 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
     return attemptIds;
   }
 
+  public long getOffset() {
+    return offset;
+  }
+
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public void setLength(long length) {
+    this.length = length;
+  }
+
   public FetchImpl clone() throws CloneNotSupportedException {
     FetchImpl newFetchImpl = (FetchImpl) super.clone();
 
@@ -219,6 +249,8 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
     if (attemptIds != null) {
       newFetchImpl.attemptIds = Lists.newArrayList(attemptIds);
     }
+    newFetchImpl.offset = offset;
+    newFetchImpl.length = length;
     return newFetchImpl;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 1b95238..aa22bb8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -252,6 +252,7 @@ public class Fetcher {
         }
 
         if(fileLen == length){
+          IOUtils.cleanup(LOG, fc, raf);
           finishTime = System.currentTimeMillis();
           state = TajoProtos.FetcherState.FETCH_FINISHED;
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 736cf51..d8d09e1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -44,6 +44,7 @@ import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.*;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.StaticHttpServer;
 
@@ -117,6 +118,8 @@ public class TajoWorker extends CompositeService {
 
   private TajoSystemMetrics workerSystemMetrics;
 
+  private HashShuffleAppenderManager hashShuffleAppenderManager;
+
   public TajoWorker() throws Exception {
     super(TajoWorker.class.getName());
   }
@@ -260,6 +263,13 @@ public class TajoWorker extends CompositeService {
     workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
     workerHeartbeatThread.init(conf);
     addIfService(workerHeartbeatThread);
+
+    try {
+      hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf);
+    } catch (IOException e) {
+      LOG.fatal(e.getMessage(), e);
+      System.exit(-1);
+    }
   }
 
   private void initWorkerMetrics() {
@@ -483,6 +493,10 @@ public class TajoWorker extends CompositeService {
     public TajoSystemMetrics getWorkerSystemMetrics() {
       return workerSystemMetrics;
     }
+
+    public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+      return hashShuffleAppenderManager;
+    }
   }
 
   public void stopWorkerForce() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index e77da70..c5f1446 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -31,6 +31,7 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.ExecutionBlockReport;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
@@ -154,9 +155,10 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
-  public void cleanupExecutionBlocks(RpcController controller, TajoWorkerProtocol.ExecutionBlockListProto request,
+  public void cleanupExecutionBlocks(RpcController controller,
+                                     TajoWorkerProtocol.ExecutionBlockListProto ebIds,
                                      RpcCallback<PrimitiveProtos.BoolProto> done) {
-    for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : request.getExecutionBlockIdList()) {
+    for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : ebIds.getExecutionBlockIdList()) {
       String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
       workerContext.cleanup(inputDir);
       String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 200892a..1881685 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -148,7 +148,7 @@ public class Task {
     this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
         taskId.getQueryUnitId().getId() + "_" + taskId.getId());
 
-    this.context = new TaskAttemptContext(queryContext, taskId,
+    this.context = new TaskAttemptContext(queryContext, worker.getWorkerContext(), taskId,
         request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
     this.context.setDataChannel(request.getDataChannel());
     this.context.setEnforcer(request.getEnforcer());
@@ -200,12 +200,14 @@ public class Task {
     LOG.info("==================================");
     LOG.info("* Subquery " + request.getId() + " is initialized");
     LOG.info("* InterQuery: " + interQuery
-        + (interQuery ? ", Use " + this.shuffleType + " shuffle":""));
+        + (interQuery ? ", Use " + this.shuffleType + " shuffle":"") +
+        ", Fragments (num: " + request.getFragments().size() + ")" +
+        ", Fetches (total:" + request.getFetches().size() + ") :");
 
-    LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
-    LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
-    for (FetchImpl f : request.getFetches()) {
-      LOG.info("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+    if(LOG.isDebugEnabled()) {
+      for (FetchImpl f : request.getFetches()) {
+        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
+      }
     }
     LOG.info("* Local task dir: " + taskDir);
     if(LOG.isDebugEnabled()) {
@@ -434,6 +436,7 @@ public class Task {
     } finally {
       context.setProgress(1.0f);
       taskRunnerContext.completedTasksNum.incrementAndGet();
+      context.getHashShuffleAppenderManager().finalizeTask(taskId);
 
       if (killed || aborted) {
         context.setExecutorProgress(0.0f);
@@ -483,9 +486,9 @@ public class Task {
         masterProxy.done(null, report, NullCallback.get());
         taskRunnerContext.succeededTasksNum.incrementAndGet();
       }
-
       finishTime = System.currentTimeMillis();
-      LOG.info("Worker's task counter - total:" + taskRunnerContext.completedTasksNum.intValue() +
+      LOG.info(context.getTaskId() + " completed. " +
+          "Worker's task counter - total:" + taskRunnerContext.completedTasksNum.intValue() +
           ", succeeded: " + taskRunnerContext.succeededTasksNum.intValue()
           + ", killed: " + taskRunnerContext.killedTasksNum.intValue()
           + ", failed: " + taskRunnerContext.failedTasksNum.intValue());

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 211f953..6cb4bd7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -31,12 +31,15 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TajoWorker.WorkerContext;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
@@ -71,14 +74,17 @@ public class TaskAttemptContext {
   private DataChannel dataChannel;
   private Enforcer enforcer;
   private QueryContext queryContext;
+  private WorkerContext workerContext;
 
   /** a output volume for each partition */
   private Map<Integer, Long> partitionOutputVolume;
+  private HashShuffleAppenderManager hashShuffleAppenderManager;
 
-  public TaskAttemptContext(final QueryContext queryContext, final QueryUnitAttemptId queryId,
+  public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext, final QueryUnitAttemptId queryId,
                             final FragmentProto[] fragments,
                             final Path workDir) {
     this.queryContext = queryContext;
+    this.workerContext = workerContext;
     this.queryId = queryId;
 
     if (fragments != null) {
@@ -99,12 +105,24 @@ public class TaskAttemptContext {
     state = TaskAttemptState.TA_PENDING;
 
     this.partitionOutputVolume = Maps.newHashMap();
+
+    if (workerContext != null) {
+      this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager();
+    } else {
+      // For TestCase
+      LOG.warn("WorkerContext is null, so create HashShuffleAppenderManager created per a Task.");
+      try {
+        this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf());
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
   }
 
   @VisibleForTesting
   public TaskAttemptContext(final QueryContext queryContext, final QueryUnitAttemptId queryId,
                             final Fragment [] fragments,  final Path workDir) {
-    this(queryContext, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+    this(queryContext, null, queryId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
   }
 
   public TajoConf getConf() {
@@ -330,4 +348,16 @@ public class TaskAttemptContext {
   public QueryContext getQueryContext() {
     return queryContext;
   }
+
+  public WorkerContext getWorkContext() {
+    return workerContext;
+  }
+
+  public QueryUnitAttemptId getQueryId() {
+    return queryId;
+  }
+
+  public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+    return hashShuffleAppenderManager;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 9676192..32cd4f5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -41,12 +41,14 @@ import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.TajoWorker.WorkerContext;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
@@ -165,6 +167,18 @@ public class TaskRunner extends AbstractService {
     }
   }
 
+  protected void sendExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport reporter) throws Exception {
+    QueryMasterProtocol.QueryMasterProtocolService.Interface qmClientService = null;
+    NettyClientBase qmClient = null;
+    try {
+      qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+      qmClientService = qmClient.getStub();
+      qmClientService.doneExecutionBlock(null, reporter, NullCallback.get());
+    } finally {
+      connPool.releaseConnection(qmClient);
+    }
+  }
+
   public String getId() {
     return getId(executionBlockId, containerId);
   }
@@ -324,6 +338,10 @@ public class TaskRunner extends AbstractService {
     public TaskRunnerHistory getExcutionBlockHistory(){
       return history;
     }
+
+    public WorkerContext getWorkerContext() {
+      return taskRunnerManager.getWorkerContext();
+    }
   }
 
   public TaskRunnerContext getContext() {
@@ -383,7 +401,9 @@ public class TaskRunner extends AbstractService {
                 }
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
-                LOG.info("Retry assigning task:" + getId());
+                if (LOG.isDebugEnabled()) {
+                  LOG.info("Retry assigning task:" + getId());
+                }
                 continue;
               }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index a8e8730..ec413b2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -23,11 +23,17 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.Pair;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TaskRunnerManager extends CompositeService {
   private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
@@ -88,13 +94,108 @@ public class TaskRunnerManager extends CompositeService {
   public void stopTask(String id) {
     LOG.info("Stop Task:" + id);
     synchronized(taskRunnerMap) {
-      taskRunnerMap.remove(id);
+      TaskRunner taskRunner = taskRunnerMap.remove(id);
+      if (taskRunner != null) {
+        synchronized(taskRunnerCompleteCounter) {
+          ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId();
+          AtomicInteger ebSuccessedTaskNums = successedTaskNums.get(ebId);
+          if (ebSuccessedTaskNums == null) {
+            ebSuccessedTaskNums = new AtomicInteger(taskRunner.getContext().succeededTasksNum.get());
+            successedTaskNums.put(ebId, ebSuccessedTaskNums);
+          } else {
+            ebSuccessedTaskNums.addAndGet(taskRunner.getContext().succeededTasksNum.get());
+          }
+
+          Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId);
+
+          if (counter != null) {
+            if (counter.getSecond().decrementAndGet() <= 0) {
+              LOG.info(ebId + "'s all tasks are completed.");
+              try {
+                closeExecutionBlock(ebId, ebSuccessedTaskNums.get(), taskRunner);
+              } catch (Exception e) {
+                LOG.error(ebId + ", closing error:" + e.getMessage(), e);
+              }
+              successedTaskNums.remove(ebId);
+              taskRunnerCompleteCounter.remove(ebId);
+            }
+          }
+        }
+      }
     }
     if(workerContext.isYarnContainerMode()) {
       stop();
     }
   }
 
+  private void closeExecutionBlock(ExecutionBlockId ebId, int succeededTasks, TaskRunner lastTaskRunner) throws Exception {
+    TajoWorkerProtocol.ExecutionBlockReport.Builder reporterBuilder =
+        TajoWorkerProtocol.ExecutionBlockReport.newBuilder();
+    reporterBuilder.setEbId(ebId.getProto());
+    reporterBuilder.setReportSuccess(true);
+    reporterBuilder.setSucceededTasks(succeededTasks);
+    try {
+      List<TajoWorkerProtocol.IntermediateEntryProto> intermediateEntries =
+          new ArrayList<TajoWorkerProtocol.IntermediateEntryProto>();
+      List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles =
+          workerContext.getHashShuffleAppenderManager().close(ebId);
+      if (shuffles == null) {
+        reporterBuilder.addAllIntermediateEntries(intermediateEntries);
+        lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
+        return;
+      }
+
+      TajoWorkerProtocol.IntermediateEntryProto.Builder intermediateBuilder =
+          TajoWorkerProtocol.IntermediateEntryProto.newBuilder();
+      TajoWorkerProtocol.IntermediateEntryProto.PageProto.Builder pageBuilder =
+          TajoWorkerProtocol.IntermediateEntryProto.PageProto.newBuilder();
+      TajoWorkerProtocol.FailureIntermediateProto.Builder failureBuilder =
+          TajoWorkerProtocol.FailureIntermediateProto.newBuilder();
+
+      for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) {
+        List<TajoWorkerProtocol.IntermediateEntryProto.PageProto> pages =
+            new ArrayList<TajoWorkerProtocol.IntermediateEntryProto.PageProto>();
+        List<TajoWorkerProtocol.FailureIntermediateProto> failureIntermediateItems =
+            new ArrayList<TajoWorkerProtocol.FailureIntermediateProto>();
+
+        for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
+          pageBuilder.clear();
+          pageBuilder.setPos(eachPage.getFirst());
+          pageBuilder.setLength(eachPage.getSecond());
+          pages.add(pageBuilder.build());
+        }
+
+        for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) {
+          failureBuilder.clear();
+          failureBuilder.setPagePos(eachFailure.getFirst());
+          failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
+          failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
+          failureIntermediateItems.add(failureBuilder.build());
+        }
+        intermediateBuilder.clear();
+        intermediateBuilder.setEbId(ebId.getProto())
+            .setHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
+                workerContext.getPullService().getPort())
+            .setTaskId(-1)
+            .setAttemptId(-1)
+            .setPartId(eachShuffle.getPartId())
+            .setVolume(eachShuffle.getVolume())
+            .addAllPages(pages)
+            .addAllFailures(failureIntermediateItems);
+        intermediateEntries.add(intermediateBuilder.build());
+      }
+
+      // send intermediateEntries to QueryMaster
+      reporterBuilder.addAllIntermediateEntries(intermediateEntries);
+
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      reporterBuilder.setReportSuccess(false);
+      reporterBuilder.setReportErrorMessage(e.getMessage());
+    }
+    lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
+  }
+
   public Collection<TaskRunner> getTaskRunners() {
     synchronized(taskRunnerMap) {
       return Collections.unmodifiableCollection(taskRunnerMap.values());
@@ -146,6 +247,12 @@ public class TaskRunnerManager extends CompositeService {
     }
   }
 
+  //<# tasks, # running tasks>
+  Map<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>> taskRunnerCompleteCounter =
+      new HashMap<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>>();
+
+  Map<ExecutionBlockId, AtomicInteger> successedTaskNums = new HashMap<ExecutionBlockId, AtomicInteger>();
+
   public void startTask(final String[] params) {
     //TODO change to use event dispatcher
     Thread t = new Thread() {
@@ -162,6 +269,16 @@ public class TaskRunnerManager extends CompositeService {
             taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getContext().getExcutionBlockHistory());
           }
 
+          synchronized(taskRunnerCompleteCounter) {
+            ExecutionBlockId ebId = taskRunner.getContext().getExecutionBlockId();
+            Pair<AtomicInteger, AtomicInteger> counter = taskRunnerCompleteCounter.get(ebId);
+            if (counter == null) {
+              counter = new Pair(new AtomicInteger(0), new AtomicInteger(0));
+              taskRunnerCompleteCounter.put(ebId, counter);
+            }
+            counter.getFirst().incrementAndGet();
+            counter.getSecond().incrementAndGet();
+          }
           taskRunner.init(systemConf);
           taskRunner.start();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index e12c9aa..0119a88 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -35,6 +35,7 @@ service QueryMasterProtocolService {
   rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
   rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
   rpc done (TaskCompletionReport) returns (BoolProto);
+  rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);
 
   //from TajoMaster's QueryJobManager
   rpc killQuery(QueryIdProto) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index dc2b1d7..cdb1438 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -88,9 +88,39 @@ message FetchProto {
     //repeated part
     repeated int32 taskId = 9 [packed=true];
     repeated int32 attemptId = 10 [packed=true];
+
+    optional int64 offset = 11;
+    optional int64 length = 12;
 }
 
+message FailureIntermediateProto {
+    required int64 pagePos = 1;
+    required int32 startRowNum = 2;
+    required int32 endRowNum = 3;
+}
 
+message IntermediateEntryProto {
+    message PageProto {
+        required int64 pos = 1;
+        required int32 length = 2;
+    }
+    required ExecutionBlockIdProto ebId = 1;
+    required int32 taskId = 2;
+    required int32 attemptId = 3;
+    required int32 partId = 4;
+    required string host = 5;
+    required int64 volume = 6;
+    repeated PageProto pages = 7;
+    repeated FailureIntermediateProto failures = 8;
+}
+
+message ExecutionBlockReport {
+    required ExecutionBlockIdProto ebId = 1;
+    required bool reportSuccess = 2;
+    optional string reportErrorMessage = 3;
+    required int32 succeededTasks = 4;
+    repeated IntermediateEntryProto intermediateEntries = 5;
+}
 
 message QueryUnitResponseProto {
     required string id = 1;


Mime
View raw message