tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1560: HashShuffle report should be ignored when a succeed tasks are not included. (jinho)
Date Sun, 19 Apr 2015 09:47:04 GMT
Repository: tajo
Updated Branches:
  refs/heads/master ad596bb1c -> 1f72d11f1


TAJO-1560: HashShuffle report should be ignored when a succeed tasks are not included. (jinho)

Closes #538


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f72d11f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f72d11f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f72d11f

Branch: refs/heads/master
Commit: 1f72d11f1d2bd48e895cbeb8a7228a854633fe2b
Parents: ad596bb
Author: Jinho Kim <jhkim@apache.org>
Authored: Sun Apr 19 18:45:51 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Sun Apr 19 18:45:51 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/master/TajoContainerProxy.java  |   9 +-
 .../java/org/apache/tajo/querymaster/Stage.java | 133 +++++++++++--------
 .../apache/tajo/util/history/HistoryWriter.java |  30 ++---
 .../tajo/worker/ExecutionBlockContext.java      |  46 ++++---
 .../tajo/worker/TajoWorkerManagerService.java   |   3 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |  16 ++-
 .../tajo/worker/event/TaskRunnerStartEvent.java |  10 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   1 +
 .../apache/tajo/querymaster/TestKillQuery.java  |   3 +-
 10 files changed, 156 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 510c1a3..7d5cc3c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -96,6 +96,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1560: HashShuffle report should be ignored when a succeed tasks are not
+    included. (jinho)
+
     TAJO-1569: BlockingRpcClient can make other request fail. (jinho)
 
     TAJO-1564: TestFetcher fails occasionally. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 1fda7d4..2aac005 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -32,6 +32,7 @@ import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.TaskFatalErrorEvent;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
@@ -93,15 +94,16 @@ public class TajoContainerProxy extends ContainerProxy {
   }
 
   private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContainer container)
{
-    NettyClientBase tajoWorkerRpc = null;
+    NettyClientBase tajoWorkerRpc;
     try {
-      InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
-          .getQueryMasterManagerService().getBindAddr();
 
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
       tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class,
true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
 
+      PlanProto.ShuffleType shuffleType =
+          context.getQuery().getStage(executionBlockId).getDataChannel().getShuffleType();
+
       TajoWorkerProtocol.RunExecutionBlockRequestProto request =
           TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
               .setExecutionBlockId(executionBlockId.getProto())
@@ -111,6 +113,7 @@ public class TajoContainerProxy extends ContainerProxy {
               .setQueryOutputPath(context.getStagingDir().toString())
               .setQueryContext(queryContext.getProto())
               .setPlanJson(planJson)
+              .setShuffleType(shuffleType)
               .build();
 
       tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get());

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 20add9f..4179003 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -239,7 +239,8 @@ public class Stage implements EventHandler<StageEvent> {
               EnumSet.of(
                   StageEventType.SQ_START,
                   StageEventType.SQ_KILL,
-                  StageEventType.SQ_CONTAINER_ALLOCATED))
+                  StageEventType.SQ_CONTAINER_ALLOCATED,
+                  StageEventType.SQ_SHUFFLE_REPORT))
 
           // Transitions from KILLED state
           .addTransition(StageState.KILLED, StageState.KILLED,
@@ -1300,6 +1301,53 @@ public class Stage implements EventHandler<StageEvent> {
     stopShuffleReceiver.set(true);
   }
 
+  private void finalizeShuffleReport(StageShuffleReportEvent event, ShuffleType type) {
+    if(!checkIfNeedFinalizing(type)) return;
+
+    TajoWorkerProtocol.ExecutionBlockReport report = event.getReport();
+
+    if (!report.getReportSuccess()) {
+      stopFinalization();
+      LOG.error(getId() + ", " + type + " report are failed. Caused by:" + report.getReportErrorMessage());
+      eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_FAILED));
+    }
+
+    completedShuffleTasks.addAndGet(report.getSucceededTasks());
+    if (report.getIntermediateEntriesCount() > 0) {
+      for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) {
+        hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
+      }
+    }
+
+    if (completedShuffleTasks.get() >= succeededObjectCount) {
+      LOG.info(getId() + ", Finalized " + type + " reports: " + completedShuffleTasks.get());
+      eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_STAGE_COMPLETED));
+      if (timeoutChecker != null) {
+        stopFinalization();
+        synchronized (timeoutChecker){
+          timeoutChecker.notifyAll();
+        }
+      }
+    } else {
+      LOG.info(getId() + ", Received " + type + " reports " +
+          completedShuffleTasks.get() + "/" + succeededObjectCount);
+    }
+  }
+
+  /**
+   * HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should get report from worker nodes when ExecutionBlock
is stopping.
+   * RANGE_SHUFFLE report is sent from task reporter when a task finished in worker node.
+   */
+  public static boolean checkIfNeedFinalizing(ShuffleType type) {
+    switch (type) {
+      case HASH_SHUFFLE:
+      case SCATTERED_HASH_SHUFFLE:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   private static class StageFinalizeTransition implements SingleArcTransition<Stage, StageEvent>
{
 
     @Override
@@ -1310,71 +1358,50 @@ public class Stage implements EventHandler<StageEvent> {
       }
 
       stage.lastContactTime = System.currentTimeMillis();
+      ShuffleType shuffleType = stage.getDataChannel().getShuffleType();
       try {
         if (event instanceof StageShuffleReportEvent) {
-
-          StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event;
-          TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport();
-
-          if (!report.getReportSuccess()) {
-            stage.stopFinalization();
-            LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage());
-            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
-          }
-
-          stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks());
-          if (report.getIntermediateEntriesCount() > 0) {
-            for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList())
{
-              stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
-            }
-          }
-
-          if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) {
-            LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get());
-            stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
-            if (stage.timeoutChecker != null) {
-              stage.stopFinalization();
-              synchronized (stage.timeoutChecker){
-                stage.timeoutChecker.notifyAll();
-              }
-            }
-          } else {
-            LOG.info(stage.getId() + ", Received shuffle report: " +
-                stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
-          }
-
+          stage.finalizeShuffleReport((StageShuffleReportEvent) event, shuffleType);
         } else {
-          LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)",
+          LOG.info(String.format("Stage - %s finalize %s (total=%d, success=%d, killed=%d)",
               stage.getId().toString(),
+              shuffleType,
               stage.totalScheduledObjectsCount,
               stage.succeededObjectCount,
               stage.killedObjectCount));
           stage.finalizeStage();
-          LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount);
 
+          if (checkIfNeedFinalizing(shuffleType)) {
+            /* wait for StageShuffleReportEvent from worker nodes */
+
+            LOG.info(stage.getId() + ", wait for " + shuffleType + " reports. expected Tasks:"
+                + stage.succeededObjectCount);
           /* FIXME implement timeout handler of stage and task */
-          if (stage.timeoutChecker != null) {
-            stage.timeoutChecker = new Thread(new Runnable() {
-              @Override
-              public void run() {
-                while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted())
{
-                  long elapsedTime = System.currentTimeMillis() - stage.lastContactTime;
-                  if (elapsedTime > 120 * 1000) {
-                    stage.stopFinalization();
-                    LOG.error(stage.getId() + ": Timed out while receiving intermediate reports:
" + elapsedTime
-                        + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
-                    stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
-                  }
-                  synchronized (this) {
-                    try {
-                      this.wait(1 * 1000);
-                    } catch (InterruptedException e) {
+            if (stage.timeoutChecker != null) {
+              stage.timeoutChecker = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                  while (stage.getSynchronizedState() == StageState.FINALIZING &&
!Thread.interrupted()) {
+                    long elapsedTime = System.currentTimeMillis() - stage.lastContactTime;
+                    if (elapsedTime > 120 * 1000) {
+                      stage.stopFinalization();
+                      LOG.error(stage.getId() + ": Timed out while receiving intermediate
reports: " + elapsedTime
+                          + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount);
+                      stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
+                    }
+                    synchronized (this) {
+                      try {
+                        this.wait(1 * 1000);
+                      } catch (InterruptedException e) {
+                      }
                     }
                   }
                 }
-              }
-            });
-            stage.timeoutChecker.start();
+              });
+              stage.timeoutChecker.start();
+            }
+          } else {
+            stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
           }
         }
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index f0c6c11..e8ba304 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -132,7 +132,7 @@ public class HistoryWriter extends AbstractService {
   }
 
   /* asynchronously flush to history file */
-  public synchronized WriterFuture<WriterHolder> appendAndFlush(History history) {
+  public WriterFuture<WriterHolder> appendAndFlush(History history) {
     WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(history)
{
       public void done(WriterHolder holder) {
         try {
@@ -163,7 +163,7 @@ public class HistoryWriter extends AbstractService {
   }
 
   /* Flushing the buffer */
-  public synchronized void flushTaskHistories() {
+  public void flushTaskHistories() {
     if (historyQueue.size() > 0) {
       synchronized (writerThread) {
         writerThread.needTaskFlush.set(true);
@@ -244,20 +244,16 @@ public class HistoryWriter extends AbstractService {
           cal.add(Calendar.HOUR_OF_DAY, -2);
           String closeTargetTime = df.format(cal.getTime());
           List<String> closingTargets = new ArrayList<String>();
-          synchronized (taskWriters) {
-            for (String eachWriterTime : taskWriters.keySet()) {
-              if (eachWriterTime.compareTo(closeTargetTime) <= 0) {
-                closingTargets.add(eachWriterTime);
-              }
+
+          for (String eachWriterTime : taskWriters.keySet()) {
+            if (eachWriterTime.compareTo(closeTargetTime) <= 0) {
+              closingTargets.add(eachWriterTime);
             }
           }
 
           for (String eachWriterTime : closingTargets) {
             WriterHolder writerHolder;
-            synchronized (taskWriters) {
-              writerHolder = taskWriters.remove(eachWriterTime);
-            }
-
+            writerHolder = taskWriters.remove(eachWriterTime);
             if (writerHolder != null) {
               LOG.info("Closing task history file: " + writerHolder.path);
               IOUtils.cleanup(LOG, writerHolder);
@@ -340,7 +336,7 @@ public class HistoryWriter extends AbstractService {
       return histories;
     }
 
-    private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Exception
{
+    private void writeQueryHistory(QueryHistory queryHistory) throws Exception {
       // QueryMaster's query detail history (json format)
       // <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/query.hist
 
@@ -381,7 +377,7 @@ public class HistoryWriter extends AbstractService {
       }
     }
 
-    private synchronized WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception
{
+    private WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception {
       if(stopped.get()) return null;
 
         // writing to HDFS and rolling hourly
@@ -409,7 +405,7 @@ public class HistoryWriter extends AbstractService {
       return querySummaryWriter;
     }
 
-    private synchronized void rollingQuerySummaryWriter() throws Exception {
+    private void rollingQuerySummaryWriter() throws Exception {
       // finding largest file sequence
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
       String currentDateTime = df.format(new Date(System.currentTimeMillis()));
@@ -442,7 +438,7 @@ public class HistoryWriter extends AbstractService {
       }
     }
 
-    private synchronized WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception
{
+    private WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception {
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
 
       String taskStartTime = df.format(new Date(taskHistory.getStartTime()));
@@ -536,14 +532,14 @@ public class HistoryWriter extends AbstractService {
     FSDataOutputStream out;
 
     @Override
-    public synchronized void close() throws IOException {
+    public void close() throws IOException {
       if (out != null) out.close();
     }
 
     /*
      * Sync buffered data to DataNodes or disks (flush to disk devices).
      */
-    private synchronized void flush() throws IOException {
+    private void flush() throws IOException {
       if (out != null) out.hsync();
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 5ffc7a9..fcf787e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -20,6 +20,7 @@ package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import io.netty.channel.ConnectTimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,6 +35,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcClientManager;
@@ -42,9 +44,6 @@ import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
 
-import io.netty.channel.ConnectTimeoutException;
-import io.netty.channel.EventLoopGroup;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -66,8 +65,6 @@ public class ExecutionBlockContext {
   public AtomicInteger killedTasksNum = new AtomicInteger();
   public AtomicInteger failedTasksNum = new AtomicInteger();
 
-  private EventLoopGroup loopGroup;
-  // for temporal or intermediate files
   private FileSystem localFS;
   // for input files
   private FileSystem defaultFS;
@@ -90,6 +87,8 @@ public class ExecutionBlockContext {
 
   private AtomicBoolean stop = new AtomicBoolean();
 
+  private PlanProto.ShuffleType shuffleType;
+
   // It keeps all of the query unit attempts while a TaskRunner is running.
   private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap();
 
@@ -97,7 +96,8 @@ public class ExecutionBlockContext {
 
   public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext,
                                TaskRunnerManager manager, QueryContext queryContext, String
plan,
-                               ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster)
throws Throwable {
+                               ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster,
+                               PlanProto.ShuffleType shuffleType) throws Throwable {
     this.manager = manager;
     this.executionBlockId = executionBlockId;
     this.connManager = RpcClientManager.getInstance();
@@ -114,6 +114,7 @@ public class ExecutionBlockContext {
     this.plan = plan;
     this.resource = new ExecutionBlockSharedResource();
     this.workerContext = workerContext;
+    this.shuffleType = shuffleType;
   }
 
   public void init() throws Throwable {
@@ -193,10 +194,6 @@ public class ExecutionBlockContext {
     return localFS;
   }
 
-  public FileSystem getDefaultFS() {
-    return defaultFS;
-  }
-
   public LocalDirAllocator getLocalDirAllocator() {
     return workerContext.getLocalDirAllocator();
   }
@@ -264,13 +261,30 @@ public class ExecutionBlockContext {
     return workerContext;
   }
 
-  private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
+  /**
+   * HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should send report when this executionBlock stopping.
+   */
+  protected void sendShuffleReport() throws Exception {
+
+    switch (shuffleType) {
+      case HASH_SHUFFLE:
+      case SCATTERED_HASH_SHUFFLE:
+        sendHashShuffleReport(executionBlockId);
+        break;
+      case NONE_SHUFFLE:
+      case RANGE_SHUFFLE:
+      default:
+        break;
+    }
+  }
+
+  private void sendHashShuffleReport(ExecutionBlockId ebId) throws Exception {
+    /* This case is that worker did not ran tasks */
+    if(completedTasksNum.get() == 0) return;
+
     NettyClientBase client = getQueryMasterConnection();
     QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
-    stub.doneExecutionBlock(null, reporter, NullCallback.get());
-  }
 
-  protected void reportExecutionBlock(ExecutionBlockId ebId) {
     ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
     reporterBuilder.setEbId(ebId.getProto());
     reporterBuilder.setReportSuccess(true);
@@ -281,7 +295,7 @@ public class ExecutionBlockContext {
           getWorkerContext().getHashShuffleAppenderManager().close(ebId);
       if (shuffles == null) {
         reporterBuilder.addAllIntermediateEntries(intermediateEntries);
-        sendExecutionBlockReport(reporterBuilder.build());
+        stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get());
         return;
       }
 
@@ -334,7 +348,7 @@ public class ExecutionBlockContext {
       }
     }
     try {
-      sendExecutionBlockReport(reporterBuilder.build());
+      stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get());
     } catch (Throwable e) {
       // can't send report to query master
       LOG.fatal(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 4a09772..71d96c4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -121,7 +121,8 @@ public class TajoWorkerManagerService extends CompositeService
           , new ExecutionBlockId(request.getExecutionBlockId())
           , request.getContainerId()
           , new QueryContext(workerContext.getConf(), request.getQueryContext()),
-          request.getPlanJson()
+          request.getPlanJson(),
+          request.getShuffleType()
       ));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index b3c28b3..734a8a5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -32,7 +32,6 @@ import org.apache.tajo.worker.event.TaskRunnerEvent;
 import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 import org.apache.tajo.worker.event.TaskRunnerStopEvent;
 
-import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -155,8 +154,14 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
 
       if(context == null){
         try {
-          context = new ExecutionBlockContext(getTajoConf(), getWorkerContext(), this, startEvent.getQueryContext(),
-              startEvent.getPlan(), startEvent.getExecutionBlockId(), startEvent.getQueryMaster());
+          context = new ExecutionBlockContext(getTajoConf(),
+              getWorkerContext(),
+              this,
+              startEvent.getQueryContext(),
+              startEvent.getPlan(),
+              startEvent.getExecutionBlockId(),
+              startEvent.getQueryMaster(),
+              startEvent.getShuffleType());
           context.init();
         } catch (Throwable e) {
           LOG.fatal(e.getMessage(), e);
@@ -178,10 +183,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
       if(executionBlockContext != null){
         try {
           executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId());
-          executionBlockContext.reportExecutionBlock(event.getExecutionBlockId());
-          workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId());
+          executionBlockContext.sendShuffleReport();
           workerContext.getTaskHistoryWriter().flushTaskHistories();
-        } catch (IOException e) {
+        } catch (Exception e) {
           LOG.fatal(e.getMessage(), e);
           throw new RuntimeException(e);
         } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
index ff63754..908afa2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker.event;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
 
 public class TaskRunnerStartEvent extends TaskRunnerEvent {
 
@@ -28,17 +29,20 @@ public class TaskRunnerStartEvent extends TaskRunnerEvent {
   private final WorkerConnectionInfo queryMaster;
   private final String containerId;
   private final String plan;
+  private final PlanProto.ShuffleType shuffleType;
 
   public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster,
                               ExecutionBlockId executionBlockId,
                               String containerId,
                               QueryContext context,
-                              String plan) {
+                              String plan,
+                              PlanProto.ShuffleType shuffleType) {
     super(EventType.START, executionBlockId);
     this.queryMaster = queryMaster;
     this.containerId = containerId;
     this.queryContext = context;
     this.plan = plan;
+    this.shuffleType = shuffleType;
   }
 
   public WorkerConnectionInfo getQueryMaster() {
@@ -56,4 +60,8 @@ public class TaskRunnerStartEvent extends TaskRunnerEvent {
   public String getPlan() {
     return plan;
   }
+
+  public PlanProto.ShuffleType getShuffleType() {
+    return shuffleType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index b8c9575..fddef8f 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -201,6 +201,7 @@ message RunExecutionBlockRequestProto {
 
     required KeyValueSetProto queryContext = 6;
     required string planJson = 7;
+    required ShuffleType shuffleType = 8;
 }
 
 message ExecutionBlockListProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1f72d11f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 09be700..b2e1ce9 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -211,7 +211,8 @@ public class TestKillQuery {
     taskRequest.setInterQuery();
     TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
 
-    ExecutionBlockContext context = new ExecutionBlockContext(conf, null, null, new QueryContext(conf),
null, eid, null);
+    ExecutionBlockContext context =
+        new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null,
null);
 
     org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId,
         conf, context, taskRequest);


Mime
View raw message