tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [01/52] [abbrv] tajo git commit: TAJO-1397: Resource allocation should be fine grained. (jinho)
Date Wed, 22 Jul 2015 13:00:34 GMT
Repository: tajo
Updated Branches:
  refs/heads/index_support 15749c5a6 -> 49367117b


http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
index f62733f..2340551 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -18,33 +18,34 @@
 
 package org.apache.tajo.worker;
 
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
+import org.apache.tajo.ResourceProtos.TaskStatusProto;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.worker.event.TaskExecutorEvent;
+import org.apache.tajo.worker.event.TaskStartEvent;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.Semaphore;
 
 public class MockTaskExecutor extends TaskExecutor {
 
   protected final Semaphore barrier;
 
-  public MockTaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) {
-    super(taskManager, rmEventHandler);
+  public MockTaskExecutor(Semaphore barrier, TajoWorker.WorkerContext workerContext) {
+    super(workerContext);
     this.barrier = barrier;
   }
 
   @Override
-  public void handle(TaskExecutorEvent event) {
+  public void handle(TaskStartEvent event) {
     super.handle(event);
     barrier.release();
   }
 
   @Override
-  protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) {
+  protected Task createTask(final ExecutionBlockContext context, TaskRequestProto taskRequest) {
     final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
 
     //ignore status changed log
@@ -126,8 +127,8 @@ public class MockTaskExecutor extends TaskExecutor {
       }
 
       @Override
-      public TajoWorkerProtocol.TaskStatusProto getReport() {
-        TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder();
+      public TaskStatusProto getReport() {
+        TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
       builder.setWorkerName("localhost:0");
       builder.setId(taskAttemptContext.getTaskId().getProto())
           .setProgress(taskAttemptContext.getProgress())
@@ -136,6 +137,16 @@ public class MockTaskExecutor extends TaskExecutor {
       builder.setInputStats(new TableStats().getProto());
       return builder.build();
       }
+
+      @Override
+      public TaskHistory createTaskHistory() {
+        return null;
+      }
+
+      @Override
+      public List<Fetcher> getFetchers() {
+        return null;
+      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
index 678b063..76ce9f7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -19,9 +19,13 @@
 package org.apache.tajo.worker;
 
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
+import org.apache.tajo.ResourceProtos.ExecutionBlockListProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.worker.event.TaskManagerEvent;
 
 import java.io.IOException;
@@ -31,15 +35,21 @@ public class MockTaskManager extends TaskManager {
 
   private final Semaphore barrier;
 
-  public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) {
-    super(dispatcher, workerContext, rmEventHandler);
+  public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
+    super(dispatcher, workerContext);
     this.barrier = barrier;
   }
 
   @Override
-  protected ExecutionBlockContext createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+  protected ExecutionBlockContext createExecutionBlock(ExecutionBlockId executionBlockId, String queryMaster) {
     try {
-      return new MockExecutionBlock(getWorkerContext(), request);
+      ExecutionBlockContextResponse.Builder builder = ExecutionBlockContextResponse.newBuilder();
+      builder.setExecutionBlockId(executionBlockId.getProto())
+          .setPlanJson("test")
+          .setQueryContext(new QueryContext(new TajoConf()).getProto())
+          .setQueryOutputPath("testpath")
+          .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+      return new MockExecutionBlock(getWorkerContext(), builder.build());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -47,7 +57,7 @@ public class MockTaskManager extends TaskManager {
 
   @Override
   protected void stopExecutionBlock(ExecutionBlockContext context,
-                                    TajoWorkerProtocol.ExecutionBlockListProto cleanupList) {
+                                    ExecutionBlockListProto cleanupList) {
     //skip for testing
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
index e8c2b9c..f57fa85 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
@@ -52,11 +52,6 @@ public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
   }
 
   @Override
-  public TaskRunnerManager getTaskRunnerManager() {
-    return null;
-  }
-
-  @Override
   public CatalogService getCatalog() {
     return null;
   }
@@ -77,11 +72,6 @@ public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
   }
 
   @Override
-  public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() {
-    return null;
-  }
-
-  @Override
   public TajoSystemMetrics getWorkerSystemMetrics() {
 
     if (tajoSystemMetrics == null) {
@@ -115,15 +105,5 @@ public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
   public void cleanupTemporalDirectories() {
 
   }
-
-  @Override
-  public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) {
-
-  }
-
-  @Override
-  public void setNumClusterNodes(int numClusterNodes) {
-
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 65627c1..a91fc30 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -104,13 +104,13 @@ public class TestFetcher {
 
   @Test
   public void testAdjustFetchProcess() {
-    assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(0, 0), 0);
-    assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(10, 10), 0);
-    assertEquals(0.05f, LegacyTaskImpl.adjustFetchProcess(10, 9), 0);
-    assertEquals(0.1f, LegacyTaskImpl.adjustFetchProcess(10, 8), 0);
-    assertEquals(0.25f, LegacyTaskImpl.adjustFetchProcess(10, 5), 0);
-    assertEquals(0.45f, LegacyTaskImpl.adjustFetchProcess(10, 1), 0);
-    assertEquals(0.5f, LegacyTaskImpl.adjustFetchProcess(10, 0), 0);
+    assertEquals(0.0f, TaskImpl.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, TaskImpl.adjustFetchProcess(10, 10), 0);
+    assertEquals(0.05f, TaskImpl.adjustFetchProcess(10, 9), 0);
+    assertEquals(0.1f, TaskImpl.adjustFetchProcess(10, 8), 0);
+    assertEquals(0.25f, TaskImpl.adjustFetchProcess(10, 5), 0);
+    assertEquals(0.45f, TaskImpl.adjustFetchProcess(10, 1), 0);
+    assertEquals(0.5f, TaskImpl.adjustFetchProcess(10, 0), 0);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
deleted file mode 100644
index df6d714..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.service.Service;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.benchmark.TPCH;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.QueryInfo;
-import org.apache.tajo.master.TajoMaster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
-public class TestHistory {
-  private static TajoTestingCluster cluster;
-  private static TajoMaster master;
-  private static  TajoConf conf;
-  private static TajoClient client;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    cluster = new TajoTestingCluster();
-    cluster.startMiniClusterInLocal(1);
-    master = cluster.getMaster();
-    conf = cluster.getConfiguration();
-    client = cluster.newTajoClient();
-    File file = TPCH.getDataFile("lineitem");
-    client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
-        + "using text location 'file://" + file.getAbsolutePath() + "'");
-    assertTrue(client.existTable("default.lineitem"));
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (client != null) client.close();
-    if (cluster != null) cluster.shutdownMiniCluster();
-  }
-
-  @Test
-  public final void testTaskRunnerHistory() throws IOException, ServiceException, InterruptedException {
-    int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
-    client.executeQueryAndGetResult("select count(*) from lineitem");
-
-    Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
-    assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
-
-    TajoWorker worker = cluster.getTajoWorkers().get(0);
-    TaskRunnerManager taskRunnerManager = worker.getWorkerContext().getTaskRunnerManager();
-    assertNotNull(taskRunnerManager);
-
-
-    Collection<TaskRunnerHistory> histories = taskRunnerManager.getExecutionBlockHistories();
-    assertTrue(histories.size() > 0);
-
-    TaskRunnerHistory history = histories.iterator().next();
-    assertEquals(Service.STATE.STOPPED, history.getState());
-    TaskRunnerHistory fromProto = new TaskRunnerHistory(history.getProto());
-    assertEquals(history.getExecutionBlockId(), fromProto.getExecutionBlockId());
-    assertEquals(history.getFinishTime(), fromProto.getFinishTime());
-    assertEquals(history.getStartTime(), fromProto.getStartTime());
-    assertEquals(history.getState(), fromProto.getState());
-    assertEquals(history.getContainerId(), fromProto.getContainerId());
-    assertEquals(history.getProto().getTaskHistoriesCount(), fromProto.getProto().getTaskHistoriesCount());
-  }
-
-  @Test
-  public final void testTaskHistory() throws IOException, ServiceException, InterruptedException {
-    int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size();
-    client.executeQueryAndGetResult("select count(*) from lineitem");
-
-    Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries();
-    assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
-
-    TajoWorker worker = cluster.getTajoWorkers().get(0);
-    TaskRunnerManager taskRunnerManager = worker.getWorkerContext().getTaskRunnerManager();
-    assertNotNull(taskRunnerManager);
-
-
-    Collection<TaskRunnerHistory> histories = taskRunnerManager.getExecutionBlockHistories();
-    assertTrue(histories.size() > 0);
-
-    TaskRunnerHistory history = histories.iterator().next();
-
-    assertTrue(history.size() > 0);
-    assertEquals(Service.STATE.STOPPED, history.getState());
-
-    Map.Entry<TaskAttemptId, TaskHistory> entry =
-        history.getTaskHistoryMap().entrySet().iterator().next();
-
-    TaskAttemptId taskAttemptId = entry.getKey();
-    TaskHistory taskHistory = entry.getValue();
-
-    assertEquals(TajoProtos.TaskAttemptState.TA_SUCCEEDED, taskHistory.getState());
-    assertEquals(taskAttemptId, taskHistory.getTaskAttemptId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
index 0f58854..7539beb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -22,26 +22,26 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.tajo.*;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
 import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
-import org.junit.*;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.tajo.ResourceProtos.*;
 import static org.junit.Assert.*;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 public class TestNodeResourceManager {
 
   private MockNodeResourceManager resourceManager;
@@ -65,8 +65,9 @@ public class TestNodeResourceManager {
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
         taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
-    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
 
     dispatcher = new AsyncDispatcher();
     taskDispatcher = new AsyncDispatcher();
@@ -79,6 +80,21 @@ public class TestNodeResourceManager {
       }
 
       @Override
+      public TaskManager getTaskManager() {
+        return taskManager;
+      }
+
+      @Override
+      public TaskExecutor getTaskExecuor() {
+        return taskExecutor;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return resourceManager;
+      }
+
+      @Override
       public WorkerConnectionInfo getConnectionInfo() {
         if (workerConnectionInfo == null) {
           workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
@@ -87,10 +103,10 @@ public class TestNodeResourceManager {
       }
     };
 
-    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler());
-    taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler());
-    resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, taskDispatcher.getEventHandler());
-    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext);
+    taskExecutor = new MockTaskExecutor(new Semaphore(0), workerContext);
+    resourceManager = new MockNodeResourceManager(new Semaphore(0), dispatcher, workerContext);
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext);
 
     service = new CompositeService("MockService") {
       @Override
@@ -103,6 +119,12 @@ public class TestNodeResourceManager {
         addIfService(statusUpdater);
         super.serviceInit(conf);
       }
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getWorkerSystemMetrics().stop();
+        super.serviceStop();
+      }
     };
 
     service.init(conf);
@@ -119,8 +141,8 @@ public class TestNodeResourceManager {
     int requestSize = 4;
     resourceManager.setTaskHandlerEvent(false); //skip task execution
 
-    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
-    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    CallFuture<BatchAllocationResponse> callFuture  = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     requestProto.setExecutionBlockId(ebId.getProto());
 
@@ -129,7 +151,7 @@ public class TestNodeResourceManager {
 
     dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
 
-    BatchAllocationResponseProto responseProto = callFuture.get();
+    BatchAllocationResponse responseProto = callFuture.get();
     assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
     // allocated all
     assertEquals(0, responseProto.getCancellationTaskCount());
@@ -142,8 +164,8 @@ public class TestNodeResourceManager {
     int overSize = 10;
     resourceManager.setTaskHandlerEvent(false); //skip task execution
 
-    CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
-    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     requestProto.setExecutionBlockId(ebId.getProto());
 
@@ -152,7 +174,7 @@ public class TestNodeResourceManager {
         MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize + overSize));
 
     dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
-    BatchAllocationResponseProto responseProto = callFuture.get();
+    BatchAllocationResponse responseProto = callFuture.get();
 
     assertEquals(overSize, responseProto.getCancellationTaskCount());
   }
@@ -162,8 +184,8 @@ public class TestNodeResourceManager {
     int requestSize = 4;
     resourceManager.setTaskHandlerEvent(false); //skip task execution
 
-    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
-    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    CallFuture<BatchAllocationResponse> callFuture  = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
     ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
     requestProto.setExecutionBlockId(ebId.getProto());
 
@@ -172,14 +194,15 @@ public class TestNodeResourceManager {
 
     dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
 
-    BatchAllocationResponseProto responseProto = callFuture.get();
+    BatchAllocationResponse responseProto = callFuture.get();
     assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
     assertEquals(0, responseProto.getCancellationTaskCount());
 
     //deallocate
-    for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) {
+    for(TaskAllocationProto allocationRequestProto : requestProto.getTaskRequestList()) {
       // direct invoke handler for testing
-      resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource()));
+      resourceManager.handle(new NodeResourceDeallocateEvent(
+          allocationRequestProto.getResource(), NodeResourceEvent.ResourceType.TASK));
     }
 
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
@@ -195,33 +218,22 @@ public class TestNodeResourceManager {
     final AtomicInteger totalCanceled = new AtomicInteger();
 
     final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
-    final Queue<TaskAllocationRequestProto>
+    final Queue<TaskAllocationProto>
         totalTasks = MockNodeResourceManager.createTaskRequests(ebId, taskMemory, taskSize);
 
-    // first request with starting ExecutionBlock
-    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
-        ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
-    ebRequestProto.setExecutionBlockId(ebId.getProto())
-        .setQueryMaster(workerContext.getConnectionInfo().getProto())
-        .setNodeId(workerContext.getConnectionInfo().getHost() + ":" +
-            workerContext.getConnectionInfo().getQueryMasterPort())
-        .setContainerId("test")
-        .setQueryContext(new QueryContext(conf).getProto())
-        .setPlanJson("test")
-        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
-
-    TaskAllocationRequestProto task = totalTasks.poll();
-    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+
+    TaskAllocationProto task = totalTasks.poll();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
     requestProto.addTaskRequest(task);
     requestProto.setExecutionBlockId(ebId.getProto());
-    requestProto.setExecutionBlockRequest(ebRequestProto.build());
-    CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+    CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
     dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
     assertTrue(callFuture.get().getCancellationTaskCount() == 0);
     totalComplete.incrementAndGet();
 
     // start parallel request
     ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+
     List<Future> futureList = Lists.newArrayList();
 
     long startTime = System.currentTimeMillis();
@@ -231,18 +243,18 @@ public class TestNodeResourceManager {
             public void run() {
               int complete = 0;
               while (true) {
-                TaskAllocationRequestProto task = totalTasks.poll();
+                TaskAllocationProto task = totalTasks.poll();
                 if (task == null) break;
 
 
-                BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+                BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
                 requestProto.addTaskRequest(task);
                 requestProto.setExecutionBlockId(ebId.getProto());
 
-                CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+                CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>();
                 dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
                 try {
-                  BatchAllocationResponseProto proto = callFuture.get();
+                  BatchAllocationResponse proto = callFuture.get();
                   if (proto.getCancellationTaskCount() > 0) {
                     totalTasks.addAll(proto.getCancellationTaskList());
                     totalCanceled.addAndGet(proto.getCancellationTaskCount());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
index af40554..707aa49 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -18,11 +18,11 @@
 
 package org.apache.tajo.worker;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.event.NodeStatusEvent;
 import org.junit.After;
@@ -30,13 +30,19 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+import static org.apache.tajo.ResourceProtos.NodeHeartbeatRequest;
 import static org.junit.Assert.*;
 
 public class TestNodeStatusUpdater {
 
   private NodeResourceManager resourceManager;
   private MockNodeStatusUpdater statusUpdater;
+  private MockTaskManager taskManager;
   private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private CompositeService service;
   private TajoConf conf;
   private TajoWorker.WorkerContext workerContext;
 
@@ -45,6 +51,9 @@ public class TestNodeStatusUpdater {
   public void setup() {
     conf = new TajoConf();
     conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 2);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
+
     workerContext = new MockWorkerContext() {
       WorkerConnectionInfo workerConnectionInfo;
 
@@ -54,6 +63,21 @@ public class TestNodeStatusUpdater {
       }
 
       @Override
+      public TaskManager getTaskManager() {
+        return taskManager;
+      }
+
+      @Override
+      public TaskExecutor getTaskExecuor() {
+        return null;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return resourceManager;
+      }
+
+      @Override
       public WorkerConnectionInfo getConnectionInfo() {
         if (workerConnectionInfo == null) {
           workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
@@ -62,27 +86,48 @@ public class TestNodeStatusUpdater {
       }
     };
 
-    conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL, 1000);
     dispatcher = new AsyncDispatcher();
-    dispatcher.init(conf);
-    dispatcher.start();
+    resourceManager = new NodeResourceManager(dispatcher, workerContext);
+    taskDispatcher = new AsyncDispatcher();
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext) {
+      @Override
+      public int getRunningTasks() {
+        return 0;
+      }
+    };
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getWorkerSystemMetrics().stop();
+        super.serviceStop();
+      }
+    };
 
-    resourceManager = new NodeResourceManager(dispatcher, null);
-    resourceManager.init(conf);
-    resourceManager.start();
+    service.init(conf);
+    service.start();
   }
 
   @After
   public void tearDown() {
-    resourceManager.stop();
-    if (statusUpdater != null) statusUpdater.stop();
-    dispatcher.stop();
+    service.stop();
   }
 
   @Test(timeout = 20000)
   public void testNodeMembership() throws Exception {
     CountDownLatch barrier = new CountDownLatch(1);
-    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
     statusUpdater.init(conf);
     statusUpdater.start();
 
@@ -100,16 +145,18 @@ public class TestNodeStatusUpdater {
   @Test(timeout = 20000)
   public void testPing() throws Exception {
     CountDownLatch barrier = new CountDownLatch(2);
-    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
     statusUpdater.init(conf);
     statusUpdater.start();
 
     MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
     barrier.await();
 
-    TajoResourceTrackerProtocol.NodeHeartbeatRequestProto lastRequest = resourceTracker.getLastRequest();
+    NodeHeartbeatRequest lastRequest = resourceTracker.getLastRequest();
     assertTrue(lastRequest.hasWorkerId());
-    assertFalse(lastRequest.hasAvailableResource());
+    assertTrue(lastRequest.hasAvailableResource());
+    assertTrue(lastRequest.hasRunningTasks());
+    assertTrue(lastRequest.hasRunningQueryMasters());
     assertFalse(lastRequest.hasTotalResource());
     assertFalse(lastRequest.hasConnectionInfo());
   }
@@ -117,12 +164,12 @@ public class TestNodeStatusUpdater {
   @Test(timeout = 20000)
   public void testResourceReport() throws Exception {
     CountDownLatch barrier = new CountDownLatch(2);
-    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
     statusUpdater.init(conf);
     statusUpdater.start();
 
     assertEquals(0, statusUpdater.getQueueSize());
-    for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) {
+    for (int i = 0; i < statusUpdater.getQueueingThreshold(); i++) {
       dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
     }
     barrier.await();
@@ -132,7 +179,7 @@ public class TestNodeStatusUpdater {
   @Test(timeout = 20000)
   public void testFlushResourceReport() throws Exception {
     CountDownLatch barrier = new CountDownLatch(2);
-    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext);
     statusUpdater.init(conf);
     statusUpdater.start();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
index 98b187b..9b6af68 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -19,55 +19,36 @@
 package org.apache.tajo.worker;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.*;
-import org.apache.tajo.annotation.ThreadSafe;
-import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.querymaster.QueryMaster;
-import org.apache.tajo.querymaster.QueryMasterManagerService;
-import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.history.HistoryReader;
-import org.apache.tajo.util.history.HistoryWriter;
-import org.apache.tajo.util.metrics.TajoSystemMetrics;
-import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
-import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
 import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Queue;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import static org.junit.Assert.*;
+import static org.apache.tajo.ResourceProtos.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestTaskExecutor {
 
   private NodeResourceManager resourceManager;
   private NodeStatusUpdater statusUpdater;
   private TaskManager taskManager;
-  private TaskExecutor taskExecutor;
+  private MyTaskExecutor taskExecutor;
   private AsyncDispatcher dispatcher;
   private AsyncDispatcher taskDispatcher;
   private TajoWorker.WorkerContext workerContext;
@@ -81,6 +62,8 @@ public class TestTaskExecutor {
   public void setup() {
     conf = new TajoConf();
     conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 2);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
     dispatcher = new AsyncDispatcher();
     taskDispatcher = new AsyncDispatcher();
 
@@ -93,6 +76,21 @@ public class TestTaskExecutor {
       }
 
       @Override
+      public TaskManager getTaskManager() {
+        return taskManager;
+      }
+
+      @Override
+      public org.apache.tajo.worker.TaskExecutor getTaskExecuor() {
+        return taskExecutor;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return resourceManager;
+      }
+
+      @Override
       public WorkerConnectionInfo getConnectionInfo() {
         if (workerConnectionInfo == null) {
           workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
@@ -103,10 +101,10 @@ public class TestTaskExecutor {
 
     barrier = new Semaphore(0);
     resourceManagerBarrier = new Semaphore(0);
-    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext, dispatcher.getEventHandler());
-    taskExecutor = new TaskExecutor(barrier, taskManager, dispatcher.getEventHandler());
-    resourceManager = new MockNodeResourceManager(resourceManagerBarrier, dispatcher, taskDispatcher.getEventHandler());
-    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext);
+    taskExecutor = new MyTaskExecutor(barrier, workerContext);
+    resourceManager = new MockNodeResourceManager(resourceManagerBarrier, dispatcher, workerContext);
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext);
 
     service = new CompositeService("MockService") {
       @Override
@@ -141,24 +139,12 @@ public class TestTaskExecutor {
   public void testTaskRequest() throws Exception {
     int requestSize = 1;
 
-    RunExecutionBlockRequestProto.Builder
-        ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
     QueryId qid = LocalTajoTestingUtility.newQueryId();
     ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
 
-    ebRequestProto.setExecutionBlockId(ebId.getProto())
-        .setQueryMaster(workerContext.getConnectionInfo().getProto())
-        .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
-            + workerContext.getConnectionInfo().getQueryMasterPort())
-        .setContainerId("test")
-        .setQueryContext(new QueryContext(conf).getProto())
-        .setPlanJson("test")
-        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
-
-    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
-    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    CallFuture<BatchAllocationResponse> callFuture  = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
     requestProto.setExecutionBlockId(ebId.getProto());
-    requestProto.setExecutionBlockRequest(ebRequestProto.build());
 
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
     requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
@@ -181,24 +167,12 @@ public class TestTaskExecutor {
   public void testTaskException() throws Exception {
     int requestSize = 1;
 
-    RunExecutionBlockRequestProto.Builder
-        ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
     QueryId qid = LocalTajoTestingUtility.newQueryId();
     ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
 
-    ebRequestProto.setExecutionBlockId(ebId.getProto())
-        .setQueryMaster(workerContext.getConnectionInfo().getProto())
-        .setNodeId(workerContext.getConnectionInfo().getHost()+":"
-            + workerContext.getConnectionInfo().getQueryMasterPort())
-        .setContainerId("test")
-        .setQueryContext(new QueryContext(conf).getProto())
-        .setPlanJson("test")
-        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
-
-    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
-    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    CallFuture<BatchAllocationResponse> callFuture  = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
     requestProto.setExecutionBlockId(ebId.getProto());
-    requestProto.setExecutionBlockRequest(ebRequestProto.build());
 
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
     requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
@@ -218,12 +192,12 @@ public class TestTaskExecutor {
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
   }
 
-  class TaskExecutor extends MockTaskExecutor {
+  class MyTaskExecutor extends MockTaskExecutor {
     int completeTasks;
     AtomicBoolean throwException = new AtomicBoolean();
 
-    public TaskExecutor(Semaphore barrier, TaskManager taskManager, EventHandler rmEventHandler) {
-      super(barrier, taskManager, rmEventHandler);
+    public MyTaskExecutor(Semaphore barrier, TajoWorker.WorkerContext workerContext) {
+      super(barrier, workerContext);
     }
 
     @Override
@@ -233,7 +207,7 @@ public class TestTaskExecutor {
     }
 
     @Override
-    protected Task createTask(final ExecutionBlockContext context, TajoWorkerProtocol.TaskRequestProto taskRequest) {
+    protected Task createTask(final ExecutionBlockContext context, TaskRequestProto taskRequest) {
       final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
       final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, context, taskAttemptId, null, null);
 
@@ -314,8 +288,8 @@ public class TestTaskExecutor {
         }
 
         @Override
-        public TajoWorkerProtocol.TaskStatusProto getReport() {
-          TajoWorkerProtocol.TaskStatusProto.Builder builder = TajoWorkerProtocol.TaskStatusProto.newBuilder();
+        public TaskStatusProto getReport() {
+          TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
           builder.setWorkerName("localhost:0");
           builder.setId(taskAttemptContext.getTaskId().getProto())
               .setProgress(taskAttemptContext.getProgress())
@@ -324,6 +298,16 @@ public class TestTaskExecutor {
           builder.setInputStats(new TableStats().getProto());
           return builder.build();
         }
+
+        @Override
+        public TaskHistory createTaskHistory() {
+          return null;
+        }
+
+        @Override
+        public List<Fetcher> getFetchers() {
+          return null;
+        }
       };
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
index 8bca489..630f3f7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
@@ -21,24 +21,27 @@ package org.apache.tajo.worker;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.tajo.*;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
 import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
 import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.TaskStartEvent;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.concurrent.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ResourceProtos.*;
 import static org.junit.Assert.*;
 
 public class TestTaskManager {
@@ -65,8 +68,9 @@ public class TestTaskManager {
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
         taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
-    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS, 4);
     conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
 
     dispatcher = new AsyncDispatcher();
     taskDispatcher = new AsyncDispatcher();
@@ -80,6 +84,21 @@ public class TestTaskManager {
       }
 
       @Override
+      public TaskManager getTaskManager() {
+        return taskManager;
+      }
+
+      @Override
+      public TaskExecutor getTaskExecuor() {
+        return taskExecutor;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return resourceManager;
+      }
+
+      @Override
       public WorkerConnectionInfo getConnectionInfo() {
         if (workerConnectionInfo == null) {
           workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
@@ -88,10 +107,10 @@ public class TestTaskManager {
       }
     };
     barrier = new Semaphore(0);
-    taskManager = new MockTaskManager(barrier, taskDispatcher, workerContext, dispatcher.getEventHandler());
-    taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, dispatcher.getEventHandler());
-    resourceManager = new NodeResourceManager(dispatcher, taskDispatcher.getEventHandler());
-    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext, resourceManager);
+    taskManager = new MockTaskManager(barrier, taskDispatcher, workerContext);
+    taskExecutor = new MockTaskExecutor(new Semaphore(0), workerContext);
+    resourceManager = new NodeResourceManager(dispatcher, workerContext);
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext);
 
     service = new CompositeService("MockService") {
       @Override
@@ -125,25 +144,12 @@ public class TestTaskManager {
   @Test(timeout = 10000)
   public void testExecutionBlockStart() throws Exception {
     int requestSize = 1;
-
-    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
-        ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
     QueryId qid = LocalTajoTestingUtility.newQueryId();
     ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
 
-    ebRequestProto.setExecutionBlockId(ebId.getProto())
-        .setQueryMaster(workerContext.getConnectionInfo().getProto())
-        .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
-            + workerContext.getConnectionInfo().getQueryMasterPort())
-        .setContainerId("test")
-        .setQueryContext(new QueryContext(conf).getProto())
-        .setPlanJson("test")
-        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
-
-    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
-    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    CallFuture<BatchAllocationResponse> callFuture  = new CallFuture<BatchAllocationResponse>();
+    BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
     requestProto.setExecutionBlockId(ebId.getProto());
-    requestProto.setExecutionBlockRequest(ebRequestProto.build());
 
     assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
     requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, taskMemory, requestSize));
@@ -158,21 +164,14 @@ public class TestTaskManager {
   @Test(timeout = 10000)
   public void testExecutionBlockStop() throws Exception {
 
-    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
-        ebRequestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
     QueryId qid = LocalTajoTestingUtility.newQueryId();
     ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+    TaskAllocationProto requestProto =
+        MockNodeResourceManager.createTaskRequests(ebId, taskMemory, 1).poll();
 
-    ebRequestProto.setExecutionBlockId(ebId.getProto())
-        .setQueryMaster(workerContext.getConnectionInfo().getProto())
-        .setNodeId(workerContext.getConnectionInfo().getHost()+":"
-            + workerContext.getConnectionInfo().getQueryMasterPort())
-        .setContainerId("test")
-        .setQueryContext(new QueryContext(conf).getProto())
-        .setPlanJson("test")
-        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+    taskDispatcher.getEventHandler().handle(new TaskStartEvent(requestProto.getTaskRequest(),
+        new NodeResource(requestProto.getResource())));
 
-    taskDispatcher.getEventHandler().handle(new ExecutionBlockStartEvent(ebRequestProto.build()));
     assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
     assertNotNull(taskManager.getExecutionBlockContext(ebId));
     assertEquals(ebId, taskManager.getExecutionBlockContext(ebId).getExecutionBlockId());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst b/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
index 2713948..0756c0a 100644
--- a/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
+++ b/tajo-docs/src/main/sphinx/configuration/worker_configuration.rst
@@ -43,21 +43,26 @@ Worker Resources
 ==========================================================
 
 Each worker can execute multiple tasks simultaneously.
-In Tajo, users can specify the total size of memory and the number of disks for each worker. Available resources affect how many tasks are executed simultaneously.
+
+In Tajo, users can specify the number of cpu cores, the total size of memory and the number of disks for each worker. Available resources affect how many tasks are executed simultaneously.
+CPU cores are a unit for expressing CPU parallelism, the unit for memory is megabytes and the unit for disks is the number of disk
 
 In order to specify the resource capacity of each worker, you should add the following configs to ``tajo-site.xml`` :
 
-=================================  ==========================  ===================   =========================
-  property name                     description                value type            default value            
-=================================  ==========================  ===================   =========================
-  tajo.worker.resource.cpu-cores    the number of cpu cores    integer               1                        
-  tajo.worker.resource.memory-mb    memory size (MB)           integer               1024                     
-  tajo.worker.resource.disks        the number of disks        integer               1                        
-=================================  ==========================  ===================   =========================
+===================================  =============   ======================   =================================
+  property name                        value type      default value            description
+===================================  =============   ======================   =================================
+  tajo.worker.resource.cpu-cores       Integer         available cpu-cores      the number of cpu cores
+  tajo.worker.resource.memory-mb       Integer         available jvm heap       memory size (MB)
+  tajo.worker.resource.disks           Integer         1                        the number of disks
+  tajo.task.resource.min.memory-mb     Integer         500                      minimum allocatable memory per task
+  tajo.qm.resource.min.memory-mb       Integer         500                      minimum allocatable memory per query
+===================================  =============   ======================   =================================
 
 .. note:: 
   
-  Currently, QueryMaster requests 512MB memory and 0.5 disk per task for the backward compatibility.
+  Currently, QueryMaster requests 500MB memory and 1 cpu-core per task for the backward compatibility.
+  If you want to give more memory, you can set to ``tajo.qm.resource.min.memory-mb``
 
 .. note::
 
@@ -68,7 +73,14 @@ In order to specify the resource capacity of each worker, you should add the fol
  Example
 ------------
 
-Assume that you want to give 5120 MB memory, 4 disks, and 24 cores on each worker. The example configuration is as follows:
+Assume that you want to give 15GB Jvm heap, 1GB memory per task, 4 disks, and 12 cores on each worker. The example configuration is as follows:
+
+``tajo-env.sh``
+
+.. code-block:: bash
+
+  export TAJO_WORKER_HEAPSIZE=15000
+
 
 ``tajo-site.xml``
 
@@ -76,35 +88,63 @@ Assume that you want to give 5120 MB memory, 4 disks, and 24 cores on each worke
 
   <property>
     <name>tajo.worker.resource.tajo.worker.resource.cpu-cores</name>
-    <value>24</value>
+    <value>12</value>
   </property>
   
    <property>
-    <name>tajo.worker.resource.memory-mb</name>
-    <value>5120</value>
+    <name>tajo.task.resource.min.memory-mb</name>
+    <value>1000</value>
   </property>
   
   <property>
-    <name>tajo.worker.resource.tajo.worker.resource.disks</name>
-    <value>4.0</value>
-  </property>  
+    <name>tajo.worker.resource.disks</name>
+    <value>4</value>
+  </property>
+
 
---------------------
- Dedicated Mode
---------------------
-Tajo provides a dedicated mode that allows each worker in a Tajo cluster to use whole available system resources including cpu-cores, memory, and disks. For this mode, a user should add the following config to ``tajo-site.xml`` : 
+* Example with HDFS
+
+``tajo-env.sh``
+
+.. code-block:: bash
+
+  export TAJO_WORKER_HEAPSIZE=15000
+
+
+``tajo-site.xml``
 
 .. code-block:: xml
 
+   <property>
+    <name>tajo.task.resource.min.memory-mb</name>
+    <value>1000</value>
+  </property>
+
   <property>
-    <name>tajo.worker.resource.dedicated</name>
+    <name>tajo.worker.resource.dfs-dir-aware</name>
     <value>true</value>
   </property>
 
-In addition, it can limit the memory capacity used for Tajo worker as follows:
 
-===============================================  ================================================   ===================   =======================
-  property name                                  description                                        value type            default value           
-===============================================  ================================================   ===================   =======================
-  tajo.worker.resource.dedicated-memory-ratio    how much memory to be used in whole memory         float                 0.8                     
-===============================================  ================================================   ===================   =======================
\ No newline at end of file
+* Example with S3
+
+``tajo-env.sh``
+
+.. code-block:: bash
+
+  export TAJO_WORKER_HEAPSIZE=15000
+
+
+``tajo-site.xml``
+
+.. code-block:: xml
+
+   <property>
+    <name>tajo.task.resource.min.memory-mb</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <name>tajo.worker.resource.disk.parallel-execution.num</name>
+    <value>4</value>
+  </property>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index e314f99..1aee05f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -21,19 +21,19 @@ package org.apache.tajo.plan.util;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.InvalidQueryException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.plan.visitor.ExplainLogicalPlanVisitor;
 import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
-import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 7855acd..537d180 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -24,6 +24,7 @@ option java_generate_equals_and_hash = true;
 import "PrimitiveProtos.proto";
 import "CatalogProtos.proto";
 import "DataTypes.proto";
+import "TajoIdProtos.proto";
 
 enum NodeType {
   SET_SESSION = 0;
@@ -505,4 +506,128 @@ enum TransmitType {
   PUSH_TRANSMIT = 0;
   PULL_TRANSMIT = 1;
   FILE_WRITE = 2;
+}
+
+message DataChannelProto {
+  required ExecutionBlockIdProto srcId = 1;
+  required ExecutionBlockIdProto targetId = 2;
+
+  required TransmitType transmitType = 3 [default = PULL_TRANSMIT];
+  required ShuffleType shuffleType = 4;
+
+  optional SchemaProto schema = 5;
+
+  repeated ColumnProto shuffleKeys = 7;
+  optional int32 numOutputs = 9 [default = 1];
+
+  optional string storeType = 10;
+}
+
+message EnforcerProto {
+  repeated EnforceProperty properties = 1;
+}
+
+message EnforceProperty {
+  enum EnforceType {
+    SORTED_INPUT = 0;
+    OUTPUT_DISTINCT = 1;
+    GROUP_BY = 2;
+    JOIN = 3;
+    SORT = 4;
+    BROADCAST = 5;
+    COLUMN_PARTITION = 6;
+    DISTINCT_GROUP_BY = 7;
+  }
+
+  // Identifies which field is filled in.
+  required EnforceType type = 1;
+
+  // One of the following will be filled in.
+  optional SortedInputEnforce sortedInput = 2;
+  optional OutputDistinctEnforce outputDistinct = 3;
+  optional GroupbyEnforce groupby = 4;
+  optional JoinEnforce join = 5;
+  optional SortEnforce sort = 6;
+  optional BroadcastEnforce broadcast = 7;
+  optional ColumnPartitionEnforcer columnPartition = 8;
+  optional DistinctGroupbyEnforcer distinct = 9;
+}
+
+message SortedInputEnforce {
+  required string tableName = 1;
+  repeated SortSpecProto sortSpecs = 2;
+}
+
+message OutputDistinctEnforce {
+}
+
+message JoinEnforce {
+  enum JoinAlgorithm {
+    NESTED_LOOP_JOIN = 0;
+    BLOCK_NESTED_LOOP_JOIN = 1;
+    IN_MEMORY_HASH_JOIN = 2;
+    HYBRID_HASH_JOIN = 3;
+    MERGE_JOIN = 4;
+  }
+
+  required int32 nodeId = 1;
+  required JoinAlgorithm algorithm = 2;
+}
+
+message GroupbyEnforce {
+  enum GroupbyAlgorithm {
+    HASH_AGGREGATION = 0;
+    SORT_AGGREGATION = 1;
+  }
+
+  required int32 nodeId = 1;
+  required GroupbyAlgorithm algorithm = 2;
+  repeated SortSpecProto sortSpecs = 3;
+}
+
+message SortEnforce {
+  enum SortAlgorithm {
+    IN_MEMORY_SORT = 0;
+    MERGE_SORT = 1;
+  }
+
+  required int32 nodeId = 1;
+  required SortAlgorithm algorithm = 2;
+}
+
+message BroadcastEnforce {
+  required string tableName = 1;
+}
+
+message ColumnPartitionEnforcer {
+  enum ColumnPartitionAlgorithm {
+    HASH_PARTITION = 0;
+    SORT_PARTITION = 1;
+  }
+
+  required int32 nodeId = 1;
+  required ColumnPartitionAlgorithm algorithm = 2;
+}
+
+message DistinctGroupbyEnforcer {
+  enum DistinctAggregationAlgorithm {
+    HASH_AGGREGATION = 0;
+    SORT_AGGREGATION = 1;
+  }
+
+  enum MultipleAggregationStage {
+    FIRST_STAGE = 0;
+    SECOND_STAGE = 1;
+    THRID_STAGE = 3;
+  }
+
+  message SortSpecArray {
+    required int32 nodeId = 1;
+    repeated SortSpecProto sortSpecs = 2;
+  }
+  required int32 nodeId = 1;
+  required DistinctAggregationAlgorithm algorithm = 2;
+  repeated SortSpecArray sortSpecArrays = 3;
+  required bool isMultipleAggregation = 4 [default = false];
+  optional MultipleAggregationStage multipleAggregationStage = 5;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index a98cc13..d7e2f91 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -38,7 +38,7 @@
     <tajo.version>0.11.0-SNAPSHOT</tajo.version>
     <hbase.version>0.98.7-hadoop2</hbase.version>
     <hive.version>1.1.0</hive.version>
-    <netty.version>4.0.25.Final</netty.version>
+    <netty.version>4.0.29.Final</netty.version>
     <jersey.version>2.6</jersey.version>
     <tajo.root>${project.parent.relativePath}/..</tajo.root>
     <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index 134b3cf..88ffaf6 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -131,10 +131,8 @@ public class AsyncRpcServer extends NettyServerBase {
         RemoteCallException callException = (RemoteCallException) cause;
         ctx.writeAndFlush(callException.getResponse());
 
-        if(LOG.isDebugEnabled()) {
-          Throwable rootCause = ExceptionUtils.getRootCause(cause);
-          LOG.error(ExceptionUtils.getMessage(rootCause), rootCause);
-        }
+        Throwable rootCause = ExceptionUtils.getRootCause(cause);
+        LOG.error(ExceptionUtils.getMessage(rootCause), rootCause);
       } else {
         /* unhandled exception. */
         if (ctx.channel().isOpen()) {


Mime
View raw message