tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [04/11] tajo git commit: TAJO-1397: Resource allocation should be fine grained. (jinho)
Date Mon, 20 Jul 2015 08:34:22 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
index 299952e..5e1ccc1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -25,44 +25,44 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.query.TaskRequest;
 import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.resource.NodeResource;
-import org.apache.tajo.resource.NodeResources;
-import org.apache.tajo.worker.event.*;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+import org.apache.tajo.worker.event.TaskStartEvent;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * TaskExecutor uses a number of threads equal to the number of slots available for running tasks on the Worker
  */
-public class TaskExecutor extends AbstractService implements EventHandler<TaskExecutorEvent> {
+public class TaskExecutor extends AbstractService implements EventHandler<TaskStartEvent> {
   private static final Log LOG = LogFactory.getLog(TaskExecutor.class);
 
-  private final TaskManager taskManager;
-  private final EventHandler rmEventHandler;
+  private final TajoWorker.WorkerContext workerContext;
   private final Map<TaskAttemptId, NodeResource> allocatedResourceMap;
   private final BlockingQueue<Task> taskQueue;
   private final AtomicInteger runningTasks;
-  private ThreadPoolExecutor fetcherExecutor;
+  private ExecutorService fetcherThreadPool;
   private ExecutorService threadPool;
   private TajoConf tajoConf;
   private volatile boolean isStopped;
 
-  public TaskExecutor(TaskManager taskManager, EventHandler rmEventHandler) {
+  public TaskExecutor(TajoWorker.WorkerContext workerContext) {
     super(TaskExecutor.class.getName());
-    this.taskManager = taskManager;
-    this.rmEventHandler = rmEventHandler;
+    this.workerContext = workerContext;
     this.allocatedResourceMap = Maps.newConcurrentMap();
     this.runningTasks = new AtomicInteger();
     this.taskQueue = new LinkedBlockingQueue<Task>();
@@ -70,12 +70,8 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
 
-    this.tajoConf = (TajoConf) conf;
-    this.taskManager.getDispatcher().register(TaskExecutorEvent.EventType.class, this);
+    this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     super.serviceInit(conf);
   }
 
@@ -85,12 +81,9 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx
     this.threadPool = Executors.newFixedThreadPool(nThreads,
         new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build());
 
-    //TODO move to tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
-    int maxFetcherThreads = Runtime.getRuntime().availableProcessors() * 2;
-    this.fetcherExecutor = new ThreadPoolExecutor(Math.min(nThreads, maxFetcherThreads),
-        maxFetcherThreads,
-        60L, TimeUnit.SECONDS,
-        new SynchronousQueue<Runnable>(true));
+    int maxFetcherThreads = tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
+    this.fetcherThreadPool = Executors.newFixedThreadPool(nThreads,
+        new ThreadFactoryBuilder().setNameFormat("Fetcher executor #%d").build());
 
 
     for (int i = 0; i < nThreads; i++) {
@@ -106,7 +99,7 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx
     isStopped = true;
 
     threadPool.shutdown();
-    fetcherExecutor.shutdown();
+    fetcherThreadPool.shutdown();
     super.serviceStop();
   }
 
@@ -131,19 +124,28 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx
     return task;
   }
 
-  @SuppressWarnings("unchecked")
   protected void stopTask(TaskAttemptId taskId) {
     runningTasks.decrementAndGet();
-    rmEventHandler.handle(new NodeResourceDeallocateEvent(allocatedResourceMap.remove(taskId)));
+    releaseResource(taskId);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void releaseResource(TaskAttemptId taskId) {
+    NodeResource resource =  allocatedResourceMap.remove(taskId);
+
+    if(resource != null) {
+      workerContext.getNodeResourceManager().getDispatcher().getEventHandler().handle(
+          new NodeResourceDeallocateEvent(resource, NodeResourceEvent.ResourceType.TASK));
+    }
   }
 
   protected ExecutorService getFetcherExecutor() {
-    return fetcherExecutor;
+    return fetcherThreadPool;
   }
 
 
   protected Task createTask(ExecutionBlockContext executionBlockContext,
-                            TajoWorkerProtocol.TaskRequestProto taskRequest) throws IOException {
+                            TaskRequestProto taskRequest) throws IOException {
     Task task = null;
     TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
     if (executionBlockContext.getTasks().containsKey(taskAttemptId)) {
@@ -158,37 +160,34 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskEx
   }
 
   @Override
-  public void handle(TaskExecutorEvent event) {
-
-    if (event instanceof TaskStartEvent) {
-      TaskStartEvent startEvent = (TaskStartEvent) event;
-      allocatedResourceMap.put(startEvent.getTaskId(), startEvent.getAllocatedResource());
-
-      ExecutionBlockContext context = taskManager.getExecutionBlockContext(
-          startEvent.getTaskId().getTaskId().getExecutionBlockId());
-
-      try {
-        Task task = createTask(context, startEvent.getTaskRequest());
-        if (task != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() +
-                ", allocated resource: " + startEvent.getAllocatedResource());
-          }
-          taskQueue.put(task);
-          runningTasks.incrementAndGet();
-          context.getWorkerContext().getWorkerSystemMetrics()
-              .histogram("tasks", "running").update(runningTasks.get());
-        } else {
-          LOG.warn("Release duplicate task resource: " + startEvent.getAllocatedResource());
-          stopTask(startEvent.getTaskId());
-        }
-      } catch (InterruptedException e) {
-        if (!isStopped) {
-          LOG.fatal(e.getMessage(), e);
+  public void handle(TaskStartEvent event) {
+
+    allocatedResourceMap.put(event.getTaskAttemptId(), event.getAllocatedResource());
+
+    ExecutionBlockContext context = workerContext.getTaskManager().getExecutionBlockContext(
+        event.getTaskAttemptId().getTaskId().getExecutionBlockId());
+
+    try {
+      Task task = createTask(context, event.getTaskRequest());
+      if (task != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Arrival task: " + task.getTaskContext().getTaskId() +
+              ", allocated resource: " + event.getAllocatedResource());
         }
-      } catch (IOException e) {
-        stopTask(startEvent.getTaskId());
+        taskQueue.put(task);
+        runningTasks.incrementAndGet();
+        context.getWorkerContext().getWorkerSystemMetrics()
+            .histogram("tasks", "running").update(runningTasks.get());
+      } else {
+        LOG.warn("Release duplicate task resource: " + event.getAllocatedResource());
+        stopTask(event.getTaskAttemptId());
+      }
+    } catch (InterruptedException e) {
+      if (!isStopped) {
+        LOG.fatal(e.getMessage(), e);
       }
+    } catch (Exception e) {
+      stopTask(event.getTaskAttemptId());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
index c2432eb..52b0d0b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -29,8 +29,8 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.tajo.TajoProtos.TaskAttemptState;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.FetcherHistoryProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
+import static org.apache.tajo.ResourceProtos.FetcherHistoryProto;
+import static org.apache.tajo.ResourceProtos.TaskHistoryProto;
 
 /**
  * The history class for Task processing.

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 7697458..d77c583 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -37,15 +37,17 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequest;
 import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
 import org.apache.tajo.plan.function.python.TajoScriptEngine;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
@@ -65,8 +67,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+import static org.apache.tajo.ResourceProtos.*;
 
 public class TaskImpl implements Task {
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
@@ -466,6 +467,7 @@ public class TaskImpl implements Task {
 
   @Override
   public void cleanup() {
+    // history store in memory while running stage
     TaskHistory taskHistory = createTaskHistory();
     executionBlockContext.addTaskHistory(getId().getTaskId(), taskHistory);
     executionBlockContext.getTasks().remove(getId());
@@ -485,6 +487,7 @@ public class TaskImpl implements Task {
     stopScriptExecutors();
   }
 
+  @Override
   public TaskHistory createTaskHistory() {
     TaskHistory taskHistory = null;
     try {
@@ -508,16 +511,12 @@ public class TaskImpl implements Task {
         int i = 0;
         FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
         for (Fetcher fetcher : fetcherRunners) {
-          // TODO store the fetcher histories
-          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
-            builder.setStartTime(fetcher.getStartTime());
-            builder.setFinishTime(fetcher.getFinishTime());
-            builder.setFileLength(fetcher.getFileLen());
-            builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
-            builder.setState(fetcher.getState());
-
-            taskHistory.addFetcherHistory(builder.build());
-          }
+          builder.setStartTime(fetcher.getStartTime());
+          builder.setFinishTime(fetcher.getFinishTime());
+          builder.setFileLength(fetcher.getFileLen());
+          builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
+          builder.setState(fetcher.getState());
+          taskHistory.addFetcherHistory(builder.build());
           if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
         }
         taskHistory.setFinishedFetchCount(i);
@@ -529,6 +528,10 @@ public class TaskImpl implements Task {
     return taskHistory;
   }
 
+  public List<Fetcher> getFetchers() {
+    return fetcherRunners;
+  }
+
   public int hashCode() {
     return context.hashCode();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index 7990a72..f518fd3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -29,12 +29,24 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.event.*;
 
 import java.io.IOException;
-import java.util.*;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.tajo.ResourceProtos.ExecutionBlockListProto;
+import static org.apache.tajo.ResourceProtos.ExecutionBlockContextRequest;
+import static org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
 
 /**
  * A TaskManager is responsible for managing executionBlock resource and tasks.
@@ -45,26 +57,23 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
   private final TajoWorker.WorkerContext workerContext;
   private final Map<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap;
   private final Dispatcher dispatcher;
-  private final EventHandler rmEventHandler;
+  private TaskExecutor executor;
 
-  private TajoConf tajoConf;
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext){
+    this(dispatcher, workerContext, null);
+  }
 
-  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) {
+  public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, TaskExecutor executor) {
     super(TaskManager.class.getName());
 
     this.dispatcher = dispatcher;
     this.workerContext = workerContext;
     this.executionBlockContextMap = Maps.newHashMap();
-    this.rmEventHandler = rmEventHandler;
+    this.executor = executor;
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
-
-    this.tajoConf = (TajoConf)conf;
     dispatcher.register(TaskManagerEvent.EventType.class, this);
     super.serviceInit(conf);
   }
@@ -87,22 +96,51 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
     return workerContext;
   }
 
-  protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+  protected TaskExecutor getTaskExecutor() {
+    if (executor == null) {
+      executor = workerContext.getTaskExecuor();
+    }
+    return executor;
+  }
+
+  public int getRunningTasks() {
+    return workerContext.getTaskExecuor().getRunningTasks();
+  }
+
+  protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionBlockId,
+                                                       String queryMasterHostAndPort) {
+
+    LOG.info("QueryMaster Address:" + queryMasterHostAndPort);
+
+    AsyncRpcClient client = null;
     try {
-      ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), null, request);
+      InetSocketAddress address = NetUtils.createSocketAddr(queryMasterHostAndPort);
+      ExecutionBlockContextRequest.Builder request = ExecutionBlockContextRequest.newBuilder();
+      request.setExecutionBlockId(executionBlockId.getProto())
+          .setWorker(getWorkerContext().getConnectionInfo().getProto());
+
+      client = RpcClientManager.getInstance().newClient(address, QueryMasterProtocol.class, true);
+      QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+      CallFuture<ExecutionBlockContextResponse> callback = new CallFuture<ExecutionBlockContextResponse>();
+      stub.getExecutionBlockContext(callback.getController(), request.build(), callback);
+
+      ExecutionBlockContextResponse contextProto =
+          callback.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client);
 
       context.init();
       return context;
     } catch (Throwable e) {
+      RpcClientManager.cleanup(client);
       LOG.fatal(e.getMessage(), e);
       throw new RuntimeException(e);
     }
   }
 
   protected void stopExecutionBlock(ExecutionBlockContext context,
-                                    TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+                                    ExecutionBlockListProto cleanupList) {
 
-    if(context != null){
+    if (context != null) {
       try {
         context.getSharedResource().releaseBroadcastCache(context.getExecutionBlockId());
         context.sendShuffleReport();
@@ -127,22 +165,72 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
 
   @Override
   public void handle(TaskManagerEvent event) {
-    LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType());
 
-    if (event instanceof ExecutionBlockStartEvent) {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("======================== Processing " + event + " of type " + event.getType());
+    }
+
+    switch (event.getType()) {
+      case TASK_START: {
+        //receive event from NodeResourceManager
+        TaskStartEvent taskStartEvent = TUtil.checkTypeAndGet(event, TaskStartEvent.class);
+        try {
+          if (!executionBlockContextMap.containsKey(taskStartEvent.getExecutionBlockId())) {
+            ExecutionBlockContext context = createExecutionBlock(taskStartEvent.getExecutionBlockId(),
+                taskStartEvent.getTaskRequest().getQueryMasterHostAndPort());
 
-      //receive event from NodeResourceManager
-      if(!executionBlockContextMap.containsKey(event.getExecutionBlockId())) {
-        ExecutionBlockContext context = createExecutionBlock(((ExecutionBlockStartEvent) event).getRequestProto());
-        executionBlockContextMap.put(context.getExecutionBlockId(), context);
-      } else {
-        LOG.warn("Already initialized ExecutionBlock: " + event.getExecutionBlockId());
+            executionBlockContextMap.put(context.getExecutionBlockId(), context);
+            LOG.info("Running ExecutionBlocks: " + executionBlockContextMap.size()
+                + ", running tasks:" + getRunningTasks() + ", availableResource: "
+                + workerContext.getNodeResourceManager().getAvailableResource());
+          }
+          getTaskExecutor().handle(taskStartEvent);
+        } catch (Exception e) {
+          getTaskExecutor().releaseResource(taskStartEvent.getTaskAttemptId());
+          getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
+              .handle(new ExecutionBlockErrorEvent(taskStartEvent.getExecutionBlockId(), e));
+        }
+        break;
+      }
+      case EB_STOP: {
+        //receive event from QueryMaster
+        ExecutionBlockStopEvent executionBlockStopEvent = TUtil.checkTypeAndGet(event, ExecutionBlockStopEvent.class);
+        workerContext.getNodeResourceManager().getDispatcher().getEventHandler()
+            .handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+        stopExecutionBlock(executionBlockContextMap.remove(executionBlockStopEvent.getExecutionBlockId()),
+            executionBlockStopEvent.getCleanupList());
+        break;
+      }
+      case QUERY_STOP: {
+        QueryStopEvent queryStopEvent = TUtil.checkTypeAndGet(event, QueryStopEvent.class);
+
+        //cleanup failure ExecutionBlock
+        for (ExecutionBlockId ebId : executionBlockContextMap.keySet()) {
+          if (ebId.getQueryId().equals(queryStopEvent.getQueryId())) {
+            try {
+              executionBlockContextMap.remove(ebId).stop();
+            } catch (Exception e) {
+              LOG.fatal(e.getMessage(), e);
+            }
+          }
+        }
+        workerContext.cleanup(queryStopEvent.getQueryId().toString());
+        break;
       }
-    } else if (event instanceof ExecutionBlockStopEvent) {
-      //receive event from QueryMaster
-      rmEventHandler.handle(new NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
-      stopExecutionBlock(executionBlockContextMap.remove(event.getExecutionBlockId()),
-          ((ExecutionBlockStopEvent) event).getCleanupList());
+      case EB_FAIL: {
+        ExecutionBlockErrorEvent errorEvent = TUtil.checkTypeAndGet(event, ExecutionBlockErrorEvent.class);
+        LOG.error(errorEvent.getError().getMessage(), errorEvent.getError());
+        ExecutionBlockContext context = executionBlockContextMap.remove(errorEvent.getExecutionBlockId());
+
+        if (context != null) {
+          context.getSharedResource().releaseBroadcastCache(context.getExecutionBlockId());
+          getWorkerContext().getTaskHistoryWriter().flushTaskHistories();
+          context.stop();
+        }
+        break;
+      }
+      default:
+        break;
     }
   }
 
@@ -158,17 +246,14 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
     return null;
   }
 
-  public List<TaskHistory> getTaskHistories(ExecutionBlockId executionblockId) throws IOException {
-    List<TaskHistory> histories = new ArrayList<TaskHistory>();
-    ExecutionBlockContext context = executionBlockContextMap.get(executionblockId);
-    if (context != null) {
-      histories.addAll(context.getTaskHistories().values());
-    }
-    //TODO get List<TaskHistory> from HistoryReader
-    return histories;
+  public List<org.apache.tajo.util.history.TaskHistory> getTaskHistories(ExecutionBlockId executionblockId)
+      throws IOException {
+
+    return getWorkerContext().getHistoryReader().getTaskHistory(executionblockId.getQueryId().toString(),
+        executionblockId.toString());
   }
 
-  public TaskHistory getTaskHistory(TaskId taskId) {
+  public TaskHistory getTaskHistory(TaskId taskId) throws IOException {
     TaskHistory history = null;
     ExecutionBlockContext context = executionBlockContextMap.get(taskId.getExecutionBlockId());
     if (context != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
deleted file mode 100644
index 207b47e..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.container.TajoContainerIdPBImpl;
-import org.apache.tajo.master.container.TajoConverterUtils;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NullCallback;
-
-import java.util.concurrent.*;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
-
-/**
- * The driver class for Tajo Task processing.
- */
-@Deprecated
-public class TaskRunner extends AbstractService {
-  /** class logger */
-  private static final Log LOG = LogFactory.getLog(TaskRunner.class);
-
-  private TajoConf systemConf;
-
-  private volatile boolean stopped = false;
-  private Path baseDirPath;
-
-  private TajoContainerId containerId;
-
-  // for Fetcher
-  private ExecutorService fetchLauncher;
-
-  // A thread to receive each assigned query unit and execute the query unit
-  private Thread taskLauncher;
-
-  // Contains the object references related for TaskRunner
-  private ExecutionBlockContext executionBlockContext;
-
-  private long finishTime;
-
-  private TaskRunnerHistory history;
-
-  public TaskRunner(ExecutionBlockContext executionBlockContext, String containerId) {
-    super(TaskRunner.class.getName());
-
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    ThreadFactory fetcherFactory = builder.setNameFormat("Fetcher executor #%d").build();
-    this.systemConf = executionBlockContext.getConf();
-    this.fetchLauncher = Executors.newFixedThreadPool(
-        systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
-    try {
-      this.containerId = TajoConverterUtils.toTajoContainerId(containerId);
-      this.executionBlockContext = executionBlockContext;
-      this.history = executionBlockContext.createTaskRunnerHistory(this);
-      this.history.setState(getServiceState());
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  // TODO this is expensive. we should change to unique id
-  public String getId() {
-    return getId(getContext().getExecutionBlockId(), containerId);
-  }
-
-  public TajoContainerId getContainerId(){
-    return containerId;
-  }
-
-  public static String getId(ExecutionBlockId executionBlockId, TajoContainerId containerId) {
-    return executionBlockId + "," + containerId;
-  }
-
-  public TaskRunnerHistory getHistory(){
-    return history;
-  }
-
-  public Path getTaskBaseDir(){
-    return baseDirPath;
-  }
-
-  public ExecutorService getFetchLauncher() {
-    return fetchLauncher;
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("conf should be a TajoConf Type.");
-    }
-    this.systemConf = (TajoConf)conf;
-
-    try {
-      // the base dir for an output dir
-      baseDirPath = getContext().createBaseDir();
-      LOG.info("TaskRunner basedir is created (" + baseDirPath +")");
-    } catch (Throwable t) {
-      t.printStackTrace();
-      LOG.error(t, t);
-    }
-    super.init(conf);
-    this.history.setState(getServiceState());
-  }
-
-  @Override
-  public void start() {
-    super.start();
-    history.setStartTime(getStartTime());
-    this.history.setState(getServiceState());
-    run();
-  }
-
-  @Override
-  public void stop() {
-    if(isStopped()) {
-      return;
-    }
-    this.finishTime = System.currentTimeMillis();
-    this.history.setFinishTime(finishTime);
-    // If this flag become true, taskLauncher will be terminated.
-
-    LOG.info("Stop TaskRunner: " + getId());
-    synchronized (this) {
-      this.stopped = true;
-
-      fetchLauncher.shutdown();
-      fetchLauncher = null;
-
-      notifyAll();
-    }
-
-    super.stop();
-    this.history.setState(getServiceState());
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public ExecutionBlockContext getContext() {
-    return executionBlockContext;
-  }
-
-  static void fatalError(QueryMasterProtocolService.Interface qmClientService,
-                         TaskAttemptId taskAttemptId, String message) {
-    if (message == null) {
-       message = "No error message";
-    }
-    TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
-        .setId(taskAttemptId.getProto())
-        .setErrorMessage(message);
-
-    qmClientService.fatalError(null, builder.build(), NullCallback.get());
-  }
-
-  public void run() {
-    LOG.info("TaskRunner startup");
-    try {
-
-      taskLauncher = new Thread(new Runnable() {
-
-        @Override
-        public void run() {
-          int receivedNum = 0;
-          CallFuture<TaskRequestProto> callFuture = null;
-          TaskRequestProto taskRequest = null;
-
-          while(!stopped && !executionBlockContext.isStopped()) {
-            QueryMasterProtocolService.Interface qmClientService = executionBlockContext.getStub();
-
-            try {
-              if (callFuture == null) {
-                callFuture = new CallFuture<TaskRequestProto>();
-                LOG.info("Request GetTask: " + getId());
-                GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
-                    .setExecutionBlockId(getExecutionBlockId().getProto())
-                    .setContainerId(((TajoContainerIdPBImpl) containerId).getProto())
-                    .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
-                    .build();
-
-                qmClientService.getTask(callFuture.getController(), request, callFuture);
-              }
-              try {
-                // wait for an assigning task for 3 seconds
-                taskRequest = callFuture.get(3, TimeUnit.SECONDS);
-              } catch (InterruptedException e) {
-                if(stopped) {
-                  break;
-                }
-              } catch (TimeoutException te) {
-                if(stopped) {
-                  break;
-                }
-                // if there has been no assigning task for a given period,
-                // TaskRunner will retry to request an assigning task.
-                if (LOG.isDebugEnabled()) {
-                  LOG.info("Retry assigning task:" + getId());
-                }
-                continue;
-              } catch (ExecutionException ee) {
-                if(!getContext().isStopped()){
-                  LOG.error(ee.getMessage(), ee);
-                } else {
-                  /* EB is stopped */
-                  break;
-                }
-              }
-
-              if (taskRequest != null) {
-                // QueryMaster can send the terminal signal to TaskRunner.
-                // If TaskRunner receives the terminal signal, TaskRunner will be terminated
-                // immediately.
-                if (taskRequest.getShouldDie()) {
-                  LOG.info("Received ShouldDie flag:" + getId());
-                  break;
-                } else {
-                  getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
-                  LOG.info("Accumulated Received Task: " + (++receivedNum));
-
-                  TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
-                  if (getContext().getTasks().containsKey(taskAttemptId)) {
-                    LOG.error("Duplicate Task Attempt: " + taskAttemptId);
-                    fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
-                    continue;
-                  }
-
-                  LOG.info("Initializing: " + taskAttemptId);
-                  Task task = null;
-                  try {
-                    task = new LegacyTaskImpl(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
-                        new TaskRequestImpl(taskRequest));
-                    getContext().getTasks().put(taskAttemptId, task);
-
-                    task.init();
-                    if (task.hasFetchPhase()) {
-                      task.fetch(); // The fetch is performed in an asynchronous way.
-                    }
-                    // task.run() is a blocking call.
-                    task.run();
-                  } catch (Throwable t) {
-                    LOG.error(t.getMessage(), t);
-                    fatalError(qmClientService, taskAttemptId, t.getMessage());
-                  } finally {
-                    if(task != null) {
-                      task.cleanup();
-                    }
-
-                    callFuture = null;
-                    taskRequest = null;
-                  }
-                }
-              }
-            } catch (Throwable t) {
-              LOG.fatal(t.getMessage(), t);
-            }
-          }
-          stop();
-          //notify to TaskRunnerManager
-          getContext().stopTaskRunner(getId());
-        }
-      });
-      taskLauncher.start();
-    } catch (Throwable t) {
-      LOG.fatal("Unhandled exception. Starting shutdown.", t);
-    }
-  }
-
-  /**
-   * @return true if a stop has been requested.
-   */
-  public boolean isStopped() {
-    return this.stopped;
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return getContext().getExecutionBlockId();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
deleted file mode 100644
index 16d32d4..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.service.Service;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.container.TajoConverterUtils;
-
-import java.util.Collections;
-import java.util.Map;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
-
-/**
- * The history class for TaskRunner processing.
- */
-@Deprecated
-public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
-
-  private Service.STATE state;
-  private TajoContainerId containerId;
-  private long startTime;
-  private long finishTime;
-  private ExecutionBlockId executionBlockId;
-  private Map<TaskAttemptId, TaskHistory> taskHistoryMap = null;
-
-  public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) {
-    init();
-    this.containerId = containerId;
-    this.executionBlockId = executionBlockId;
-  }
-
-  public TaskRunnerHistory(TaskRunnerHistoryProto proto) {
-    this.state = Service.STATE.valueOf(proto.getState());
-    this.containerId = TajoConverterUtils.toTajoContainerId(proto.getContainerId());
-    this.startTime = proto.getStartTime();
-    this.finishTime = proto.getFinishTime();
-    this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId());
-    this.taskHistoryMap = Maps.newTreeMap();
-    for (TaskHistoryProto taskHistoryProto : proto.getTaskHistoriesList()) {
-      TaskHistory taskHistory = new TaskHistory(taskHistoryProto);
-      taskHistoryMap.put(taskHistory.getTaskAttemptId(), taskHistory);
-    }
-  }
-
-  private void init() {
-    this.taskHistoryMap = Maps.newHashMap();
-  }
-
-  public int size() {
-    return this.taskHistoryMap.size();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(containerId, executionBlockId, state, startTime,
-        finishTime, taskHistoryMap.size());
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof TaskRunnerHistory) {
-      TaskRunnerHistory other = (TaskRunnerHistory) o;
-      return getProto().equals(other.getProto());
-    }
-    return false;
-  }
-
-  @Override
-  public TaskRunnerHistoryProto getProto() {
-    TaskRunnerHistoryProto.Builder builder = TaskRunnerHistoryProto.newBuilder();
-    builder.setContainerId(containerId.toString());
-    builder.setState(state.toString());
-    builder.setExecutionBlockId(executionBlockId.getProto());
-    builder.setStartTime(startTime);
-    builder.setFinishTime(finishTime);
-    for (TaskHistory taskHistory : taskHistoryMap.values()){
-      builder.addTaskHistories(taskHistory.getProto());
-    }
-    return builder.build();
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public void setFinishTime(long finishTime) {
-    this.finishTime = finishTime;
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return executionBlockId;
-  }
-
-  public Service.STATE getState() {
-    return state;
-  }
-
-  public void setState(Service.STATE state) {
-    this.state = state;
-  }
-
-  public TajoContainerId getContainerId() {
-    return containerId;
-  }
-
-  public void setContainerId(TajoContainerId containerId) {
-    this.containerId = containerId;
-  }
-
-  public TaskHistory getTaskHistory(TaskAttemptId taskAttemptId) {
-    return taskHistoryMap.get(taskAttemptId);
-  }
-
-  public Map<TaskAttemptId, TaskHistory> getTaskHistoryMap() {
-    return Collections.unmodifiableMap(taskHistoryMap);
-  }
-
-  public void addTaskHistory(TaskAttemptId taskAttemptId, TaskHistory taskHistory) {
-    taskHistoryMap.put(taskAttemptId, taskHistory);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
deleted file mode 100644
index d18a262..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.worker.event.TaskRunnerEvent;
-import org.apache.tajo.worker.event.TaskRunnerStartEvent;
-import org.apache.tajo.worker.event.TaskRunnerStopEvent;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-@Deprecated
-public class TaskRunnerManager extends CompositeService implements EventHandler<TaskRunnerEvent> {
-  private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
-
-  private final ConcurrentMap<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap = Maps.newConcurrentMap();
-  private final ConcurrentMap<String, TaskRunner> taskRunnerMap = Maps.newConcurrentMap();
-  private final ConcurrentMap<String, TaskRunnerHistory> taskRunnerHistoryMap = Maps.newConcurrentMap();
-  private TajoWorker.WorkerContext workerContext;
-  private TajoConf tajoConf;
-  private AtomicBoolean stop = new AtomicBoolean(false);
-  private FinishedTaskCleanThread finishedTaskCleanThread;
-  private Dispatcher dispatcher;
-
-  public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) {
-    super(TaskRunnerManager.class.getName());
-
-    this.workerContext = workerContext;
-    this.dispatcher = dispatcher;
-  }
-
-  public TajoWorker.WorkerContext getWorkerContext() {
-    return workerContext;
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
-    tajoConf = (TajoConf)conf;
-    dispatcher.register(TaskRunnerEvent.EventType.class, this);
-    super.init(tajoConf);
-  }
-
-  @Override
-  public void start() {
-    finishedTaskCleanThread = new FinishedTaskCleanThread();
-    finishedTaskCleanThread.start();
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(stop.getAndSet(true)) {
-      return;
-    }
-
-    synchronized(taskRunnerMap) {
-      for(TaskRunner eachTaskRunner: taskRunnerMap.values()) {
-        if(!eachTaskRunner.isStopped()) {
-          eachTaskRunner.stop();
-        }
-      }
-    }
-    for(ExecutionBlockContext context: executionBlockContextMap.values()) {
-      context.stop();
-    }
-
-    if(finishedTaskCleanThread != null) {
-      finishedTaskCleanThread.interrupt();
-    }
-
-    super.stop();
-  }
-
-  public void stopTaskRunner(String id) {
-    LOG.info("Stop Task:" + id);
-    TaskRunner taskRunner = taskRunnerMap.remove(id);
-    taskRunner.stop();
-  }
-
-  public Collection<TaskRunner> getTaskRunners() {
-    return Collections.unmodifiableCollection(taskRunnerMap.values());
-  }
-
-  public Collection<TaskRunnerHistory> getExecutionBlockHistories() {
-    return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
-  }
-
-  public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String taskRunnerId) {
-    return taskRunnerHistoryMap.get(taskRunnerId);
-  }
-
-  public TaskRunner getTaskRunner(String taskRunnerId) {
-    return taskRunnerMap.get(taskRunnerId);
-  }
-
-  public Task getTaskByTaskAttemptId(TaskAttemptId taskAttemptId) {
-    ExecutionBlockContext context = executionBlockContextMap.get(taskAttemptId.getTaskId().getExecutionBlockId());
-    if (context != null) {
-      return context.getTask(taskAttemptId);
-    }
-    return null;
-  }
-
-  public TaskHistory getTaskHistoryByTaskAttemptId(TaskAttemptId quAttemptId) {
-    synchronized (taskRunnerHistoryMap) {
-      for (TaskRunnerHistory history : taskRunnerHistoryMap.values()) {
-        TaskHistory taskHistory = history.getTaskHistory(quAttemptId);
-        if (taskHistory != null) return taskHistory;
-      }
-    }
-
-    return null;
-  }
-
-  public int getNumTasks() {
-    return taskRunnerMap.size();
-  }
-
-  @Override
-  public void handle(TaskRunnerEvent event) {
-    LOG.info("======================== Processing " + event.getExecutionBlockId() + " of type " + event.getType());
-    if (event instanceof TaskRunnerStartEvent) {
-      TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event;
-      ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId());
-
-      if(context == null){
-        try {
-          context = new ExecutionBlockContext(getWorkerContext(), this, startEvent.getRequest());
-          context.init();
-        } catch (Throwable e) {
-          LOG.fatal(e.getMessage(), e);
-          throw new RuntimeException(e);
-        }
-        executionBlockContextMap.put(event.getExecutionBlockId(), context);
-      }
-
-      TaskRunner taskRunner = new TaskRunner(context, startEvent.getRequest().getContainerId());
-      LOG.info("Start TaskRunner:" + taskRunner.getId());
-      taskRunnerMap.put(taskRunner.getId(), taskRunner);
-      taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());
-
-      taskRunner.init(context.getConf());
-      taskRunner.start();
-
-    } else if (event instanceof TaskRunnerStopEvent) {
-      ExecutionBlockContext executionBlockContext =  executionBlockContextMap.remove(event.getExecutionBlockId());
-      if(executionBlockContext != null){
-        try {
-          executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId());
-          executionBlockContext.sendShuffleReport();
-          workerContext.getTaskHistoryWriter().flushTaskHistories();
-        } catch (Exception e) {
-          LOG.fatal(e.getMessage(), e);
-          throw new RuntimeException(e);
-        } finally {
-          executionBlockContext.stop();
-        }
-      }
-      LOG.info("Stopped execution block:" + event.getExecutionBlockId());
-    }
-  }
-
-  public EventHandler getEventHandler(){
-    return dispatcher.getEventHandler();
-  }
-
-  public TajoConf getTajoConf() {
-    return tajoConf;
-  }
-
-  class FinishedTaskCleanThread extends Thread {
-    //TODO if history size is large, the historyMap should remove immediately
-    public void run() {
-      int expireIntervalTime = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
-      LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
-      while(!stop.get()) {
-        try {
-          Thread.sleep(60 * 1000);
-        } catch (InterruptedException e) {
-          break;
-        }
-        try {
-          long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000l;
-          cleanExpiredFinishedQueryMasterTask(expireTime);
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }
-
-    private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
-      synchronized(taskRunnerHistoryMap) {
-        List<String> expiredIds = new ArrayList<String>();
-        for(Map.Entry<String, TaskRunnerHistory> entry: taskRunnerHistoryMap.entrySet()) {
-           /* If a task runner are abnormal termination, the finished time will be zero. */
-          long finishedTime = Math.max(entry.getValue().getStartTime(), entry.getValue().getFinishTime());
-          if(finishedTime < expireTime) {
-            expiredIds.add(entry.getKey());
-          }
-        }
-
-        for(String eachId: expiredIds) {
-          taskRunnerHistoryMap.remove(eachId);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
deleted file mode 100644
index 050e2b5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
-import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.storage.DiskDeviceInfo;
-import org.apache.tajo.storage.DiskMountInfo;
-import org.apache.tajo.storage.DiskUtil;
-
-import java.io.File;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
-
-/**
- * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
- */
-@Deprecated
-public class WorkerHeartbeatService extends AbstractService {
-  /** class logger */
-  private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);
-
-  private final TajoWorker.WorkerContext context;
-  private TajoConf systemConf;
-  private RpcClientManager connectionManager;
-  private WorkerHeartbeatThread thread;
-  private static final float HDFS_DATANODE_STORAGE_SIZE;
-
-  static {
-    HDFS_DATANODE_STORAGE_SIZE = DiskUtil.getDataNodeStorageSize();
-  }
-
-  public WorkerHeartbeatService(TajoWorker.WorkerContext context) {
-    super(WorkerHeartbeatService.class.getSimpleName());
-    this.context = context;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
-    this.systemConf = (TajoConf) conf;
-
-    this.connectionManager = RpcClientManager.getInstance();
-    thread = new WorkerHeartbeatThread();
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public void serviceStart() throws Exception {
-    thread.start();
-    super.serviceStart();
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    if(thread.stopped.getAndSet(true)){
-      return;
-    }
-
-    synchronized (thread) {
-      thread.notifyAll();
-    }
-
-    super.serviceStop();
-  }
-
-  class WorkerHeartbeatThread extends Thread {
-    private volatile AtomicBoolean stopped = new AtomicBoolean(false);
-    ServerStatusProto.System systemInfo;
-    List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
-    float workerDiskSlots;
-    int workerMemoryMB;
-    List<DiskDeviceInfo> diskDeviceInfos;
-
-    public WorkerHeartbeatThread() {
-      int workerCpuCoreNum;
-
-      boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED);
-
-      try {
-        diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-      }
-
-      if(dedicatedResource) {
-        float dedicatedMemoryRatio = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
-        int totalMemory = getTotalMemoryMB();
-        workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
-        workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
-
-        if(diskDeviceInfos == null) {
-          workerDiskSlots = TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
-        } else {
-          workerDiskSlots = diskDeviceInfos.size();
-        }
-      } else {
-        workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
-        workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
-        workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
-
-        if (systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && HDFS_DATANODE_STORAGE_SIZE > 0) {
-          workerDiskSlots = HDFS_DATANODE_STORAGE_SIZE;
-        }
-      }
-
-      systemInfo = ServerStatusProto.System.newBuilder()
-          .setAvailableProcessors(workerCpuCoreNum)
-          .setFreeMemoryMB(0)
-          .setMaxMemoryMB(0)
-          .setTotalMemoryMB(getTotalMemoryMB())
-          .build();
-    }
-
-    public void run() {
-      LOG.info("Worker Resource Heartbeat Thread start.");
-      int sendDiskInfoCount = 0;
-
-      while(!stopped.get()) {
-        if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
-          getDiskUsageInfos();
-        }
-        ServerStatusProto.JvmHeap jvmHeap =
-            ServerStatusProto.JvmHeap.newBuilder()
-                .setMaxHeap(Runtime.getRuntime().maxMemory())
-                .setFreeHeap(Runtime.getRuntime().freeMemory())
-                .setTotalHeap(Runtime.getRuntime().totalMemory())
-                .build();
-
-        ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
-            .addAllDisk(diskInfos)
-            .setRunningTaskNum(
-                context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
-            .setSystem(systemInfo)
-            .setDiskSlots(workerDiskSlots)
-            .setMemoryResourceMB(workerMemoryMB)
-            .setJvmHeap(jvmHeap)
-            .build();
-
-        NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
-            .setConnectionInfo(context.getConnectionInfo().getProto())
-            .setServerStatus(serverStatus)
-            .build();
-
-        NettyClientBase rmClient = null;
-        try {
-          CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
-
-          ServiceTracker serviceTracker = context.getServiceTracker();
-          rmClient = connectionManager.getClient(serviceTracker.getResourceTrackerAddress(),
-              TajoResourceTrackerProtocol.class, true);
-          TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
-          resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
-
-          TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
-
-          QueryCoordinatorProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
-          if(clusterResourceSummary.getNumWorkers() > 0) {
-            context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
-          }
-          context.setClusterResource(clusterResourceSummary);
-
-        } catch (InterruptedException e) {
-          break;
-        } catch (TimeoutException te) {
-          LOG.warn("Heartbeat response is being delayed.", te);
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-
-        try {
-          if(!stopped.get()){
-            synchronized (thread){
-              thread.wait(10 * 1000);
-            }
-          }
-        } catch (InterruptedException e) {
-          break;
-        }
-        sendDiskInfoCount++;
-
-        if(sendDiskInfoCount > 10) {
-          sendDiskInfoCount = 0;
-        }
-      }
-
-      LOG.info("Worker Resource Heartbeat Thread stopped.");
-    }
-
-    private void getDiskUsageInfos() {
-      diskInfos.clear();
-      for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
-        List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
-        if(mountInfos != null) {
-          for(DiskMountInfo eachMount: mountInfos) {
-            File eachFile = new File(eachMount.getMountPath());
-            diskInfos.add(ServerStatusProto.Disk.newBuilder()
-                .setAbsolutePath(eachFile.getAbsolutePath())
-                .setTotalSpace(eachFile.getTotalSpace())
-                .setFreeSpace(eachFile.getFreeSpace())
-                .setUsableSpace(eachFile.getUsableSpace())
-                .build());
-          }
-        }
-      }
-    }
-  }
-
-  public static int getTotalMemoryMB() {
-    javax.management.MBeanServer mBeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer();
-    long max = 0;
-    Object maxObject = null;
-    try {
-      javax.management.ObjectName osName = new javax.management.ObjectName("java.lang:type=OperatingSystem");
-      if (!System.getProperty("java.vendor").startsWith("IBM")) {
-        maxObject = mBeanServer.getAttribute(osName, "TotalPhysicalMemorySize");
-      } else {
-        maxObject = mBeanServer.getAttribute(osName, "TotalPhysicalMemory");
-      }
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-    }
-    if (maxObject != null) {
-      max = ((Long)maxObject).longValue();
-    }
-    return ((int) (max / (1024 * 1024)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java
new file mode 100644
index 0000000..dfc54ab
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockErrorEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+
+public class ExecutionBlockErrorEvent extends TaskManagerEvent {
+
+  private ExecutionBlockId executionBlockId;
+  private Throwable error;
+
+  public ExecutionBlockErrorEvent(ExecutionBlockId executionBlockId, Throwable e) {
+    super(EventType.EB_FAIL);
+    this.executionBlockId = executionBlockId;
+    this.error = e;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public Throwable getError() {
+    return error;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
deleted file mode 100644
index 85d74e2..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-
-public class ExecutionBlockStartEvent extends TaskManagerEvent {
-  private TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto;
-
-  public ExecutionBlockStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto) {
-    super(EventType.EB_START, new ExecutionBlockId(requestProto.getExecutionBlockId()));
-    this.requestProto = requestProto;
-  }
-
-  public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequestProto() {
-    return requestProto;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
index 2b967ab..a1dfe50 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
@@ -20,18 +20,24 @@ package org.apache.tajo.worker.event;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ResourceProtos.ExecutionBlockListProto;
 
 public class ExecutionBlockStopEvent extends TaskManagerEvent {
-  private TajoWorkerProtocol.ExecutionBlockListProto cleanupList;
+  private ExecutionBlockListProto cleanupList;
 
+  private ExecutionBlockId executionBlockId;
   public ExecutionBlockStopEvent(TajoIdProtos.ExecutionBlockIdProto executionBlockId,
-                                 TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
-    super(EventType.EB_STOP, new ExecutionBlockId(executionBlockId));
+                                 ExecutionBlockListProto cleanupList) {
+    super(EventType.EB_STOP);
     this.cleanupList = cleanupList;
+    this.executionBlockId = new ExecutionBlockId(executionBlockId);
   }
 
-  public TajoWorkerProtocol.ExecutionBlockListProto getCleanupList() {
+  public ExecutionBlockListProto getCleanupList() {
     return cleanupList;
   }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
index 9a1c106..0ee0836 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
@@ -21,26 +21,26 @@ package org.apache.tajo.worker.event;
 
 import com.google.protobuf.RpcCallback;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto;
+import static org.apache.tajo.ResourceProtos.BatchAllocationRequest;
+import static org.apache.tajo.ResourceProtos.BatchAllocationResponse;
 
 public class NodeResourceAllocateEvent extends NodeResourceEvent {
 
-  private BatchAllocationRequestProto request;
-  private RpcCallback<BatchAllocationResponseProto> callback;
+  private BatchAllocationRequest request;
+  private RpcCallback<BatchAllocationResponse> callback;
 
-  public NodeResourceAllocateEvent(BatchAllocationRequestProto request,
-                                   RpcCallback<BatchAllocationResponseProto> callback) {
-    super(EventType.ALLOCATE);
+  public NodeResourceAllocateEvent(BatchAllocationRequest request,
+                                   RpcCallback<BatchAllocationResponse> callback) {
+    super(EventType.ALLOCATE, ResourceType.TASK);
     this.callback = callback;
     this.request = request;
   }
 
-  public BatchAllocationRequestProto getRequest() {
+  public BatchAllocationRequest getRequest() {
     return request;
   }
 
-  public RpcCallback<BatchAllocationResponseProto> getCallback() {
+  public RpcCallback<BatchAllocationResponse> getCallback() {
     return callback;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
index 31d9229..d8841a2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
@@ -25,12 +25,12 @@ public class NodeResourceDeallocateEvent extends NodeResourceEvent {
 
   private NodeResource resource;
 
-  public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) {
-    this(new NodeResource(proto));
+  public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto, ResourceType resourceType) {
+    this(new NodeResource(proto), resourceType);
   }
 
-  public NodeResourceDeallocateEvent(NodeResource resource) {
-    super(EventType.DEALLOCATE);
+  public NodeResourceDeallocateEvent(NodeResource resource, ResourceType resourceType) {
+    super(EventType.DEALLOCATE, resourceType);
     this.resource = resource;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
index 6fd2e0d..c12551f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
@@ -29,7 +29,19 @@ public class NodeResourceEvent extends AbstractEvent<NodeResourceEvent.EventType
     DEALLOCATE
   }
 
-  public NodeResourceEvent(EventType eventType) {
+  public enum ResourceType {
+    QUERY_MASTER,
+    TASK
+  }
+
+  private ResourceType resourceType;
+
+  public NodeResourceEvent(EventType eventType, ResourceType resourceType) {
     super(eventType);
+    this.resourceType = resourceType;
+  }
+
+  public ResourceType getResourceType() {
+    return resourceType;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java
new file mode 100644
index 0000000..4422d4d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/QMResourceAllocateEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+
+import com.google.protobuf.RpcCallback;
+import org.apache.tajo.ResourceProtos.AllocationResourceProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+public class QMResourceAllocateEvent extends NodeResourceEvent {
+
+  private AllocationResourceProto request;
+  private RpcCallback<PrimitiveProtos.BoolProto> callback;
+
+  public QMResourceAllocateEvent(AllocationResourceProto request,
+                                 RpcCallback<PrimitiveProtos.BoolProto> callback) {
+    super(EventType.ALLOCATE, ResourceType.QUERY_MASTER);
+    this.callback = callback;
+    this.request = request;
+  }
+
+  public AllocationResourceProto getRequest() {
+    return request;
+  }
+
+  public RpcCallback<PrimitiveProtos.BoolProto> getCallback() {
+    return callback;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java
new file mode 100644
index 0000000..892db92
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/QueryStopEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.QueryId;
+
+public class QueryStopEvent extends TaskManagerEvent {
+
+
+  private QueryId queryId;
+  public QueryStopEvent(QueryId queryId) {
+    super(EventType.QUERY_STOP);
+    this.queryId = queryId;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
deleted file mode 100644
index c609c67..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.TaskAttemptId;
-
-public class TaskExecutorEvent extends AbstractEvent<TaskExecutorEvent.EventType> {
-
-  // producer: NodeResourceManager, consumer: TaskExecutorEvent
-  public enum EventType {
-    START,
-    KILL,
-    ABORT
-  }
-
-  private TaskAttemptId taskAttemptId;
-
-  public TaskExecutorEvent(EventType eventType,
-                           TaskAttemptId taskAttemptId) {
-    super(eventType);
-    this.taskAttemptId = taskAttemptId;
-  }
-
-  public TaskAttemptId getTaskId() {
-    return taskAttemptId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
index 39b541b..7225e70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
@@ -19,25 +19,23 @@
 package org.apache.tajo.worker.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
 
 public class TaskManagerEvent extends AbstractEvent<TaskManagerEvent.EventType> {
   // producer: NodeResourceManager, consumer: TaskManager
   public enum EventType {
-    EB_START,
-    EB_STOP
+    TASK_START,
+    TASK_KILL,
+    TASK_ABORT,
+
+    //cleanup events
+    EB_STOP,
+    EB_FAIL,
+    QUERY_STOP
   }
 
-  private ExecutionBlockId executionBlockId;
 
-  public TaskManagerEvent(EventType eventType,
-                          ExecutionBlockId executionBlockId) {
-    super(eventType);
-    this.executionBlockId = executionBlockId;
-  }
 
-  public ExecutionBlockId getExecutionBlockId() {
-    return executionBlockId;
+  public TaskManagerEvent(EventType eventType) {
+    super(eventType);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
deleted file mode 100644
index 7175251..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-
-@Deprecated
-public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> {
-  public enum EventType {
-    START,
-    STOP
-  }
-
-  protected final ExecutionBlockId executionBlockId;
-
-  public TaskRunnerEvent(EventType eventType,
-                         ExecutionBlockId executionBlockId) {
-    super(eventType);
-    this.executionBlockId = executionBlockId;
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return executionBlockId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
deleted file mode 100644
index 9406794..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.serder.PlanProto;
-@Deprecated
-public class TaskRunnerStartEvent extends TaskRunnerEvent {
-
-  private final TajoWorkerProtocol.RunExecutionBlockRequestProto request;
-
-  public TaskRunnerStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
-    super(EventType.START, new ExecutionBlockId(request.getExecutionBlockId()));
-    this.request = request;
-  }
-
-  public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequest() {
-    return request;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
deleted file mode 100644
index 297f30c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.tajo.ExecutionBlockId;
-
-@Deprecated
-public class TaskRunnerStopEvent extends TaskRunnerEvent {
-
-  public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) {
-    super(EventType.STOP, executionBlockId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
index f60e7c4..1fb0c49 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
@@ -18,20 +18,24 @@
 
 package org.apache.tajo.worker.event;
 
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.resource.NodeResource;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
 
-public class TaskStartEvent extends TaskExecutorEvent {
+import static org.apache.tajo.ResourceProtos.TaskRequestProto;
+
+public class TaskStartEvent extends TaskManagerEvent {
 
   private NodeResource allocatedResource;
   private TaskRequestProto taskRequest;
+  private TaskAttemptId taskAttemptId;
 
   public TaskStartEvent(TaskRequestProto taskRequest,
                         NodeResource allocatedResource) {
-    super(EventType.START, new TaskAttemptId(taskRequest.getId()));
+    super(EventType.TASK_START);
     this.taskRequest = taskRequest;
     this.allocatedResource = allocatedResource;
+    this.taskAttemptId = new TaskAttemptId(taskRequest.getId());
   }
 
   public NodeResource getAllocatedResource() {
@@ -41,4 +45,12 @@ public class TaskStartEvent extends TaskExecutorEvent {
   public TaskRequestProto getTaskRequest() {
     return taskRequest;
   }
+
+  public TaskAttemptId getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return taskAttemptId.getTaskId().getExecutionBlockId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java
index 349fa4c..19d8b28 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java
@@ -35,7 +35,7 @@ import javax.ws.rs.core.UriInfo;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.NodeStatus;
 import org.apache.tajo.ws.rs.JerseyResourceDelegate;
 import org.apache.tajo.ws.rs.JerseyResourceDelegateContext;
 import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey;
@@ -99,11 +99,11 @@ public class ClusterResource {
           JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class);
       MasterContext masterContext = context.get(masterContextKey);
       
-      Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers();
+      Map<Integer, NodeStatus> workerMap = masterContext.getResourceManager().getNodes();
       List<WorkerResponse> workerList = new ArrayList<WorkerResponse>();
       
-      for (Worker worker: workerMap.values()) {
-        workerList.add(new WorkerResponse(worker));
+      for (NodeStatus nodeStatus : workerMap.values()) {
+        workerList.add(new WorkerResponse(nodeStatus));
       }
       
       Map<String, List<WorkerResponse>> workerResponseMap = new HashMap<String, List<WorkerResponse>>();


Mime
View raw message