tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [11/11] tajo git commit: TAJO-1397: Resource allocation should be fine grained. (jinho)
Date Mon, 20 Jul 2015 08:34:29 GMT
TAJO-1397: Resource allocation should be fine grained. (jinho)

Closes #608


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5a02873d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5a02873d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5a02873d

Branch: refs/heads/master
Commit: 5a02873d5f0bd8fedd8527808bd3d4ecbe7d8af3
Parents: bedce3a
Author: Jinho Kim <jhkim@apache.org>
Authored: Mon Jul 20 17:31:22 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Mon Jul 20 17:31:22 2015 +0900

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 CHANGES                                         |   2 +
 .../org/apache/tajo/catalog/FunctionDesc.java   |   7 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |  16 +-
 .../dictionary/ClusterTableDescriptor.java      |   6 +-
 .../org/apache/tajo/cli/tools/TajoAdmin.java    |  23 +-
 tajo-client/src/main/proto/ClientProtos.proto   |  19 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |  55 +-
 .../java/org/apache/tajo/util/NumberUtil.java   |   4 +
 .../java/org/apache/tajo/util/StringUtils.java  |   9 +-
 .../main/java/org/apache/tajo/util/TUtil.java   |   9 +
 tajo-core/pom.xml                               |  13 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |  16 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  24 +-
 .../tajo/engine/planner/enforce/Enforcer.java   |  13 +-
 .../tajo/engine/planner/global/DataChannel.java |   3 +-
 .../tajo/engine/planner/global/MasterPlan.java  |  11 +-
 .../global/builder/DistinctGroupbyBuilder.java  |   6 +-
 .../tajo/engine/planner/physical/ScanExec.java  |  10 +-
 .../apache/tajo/engine/query/TaskRequest.java   |  27 +-
 .../tajo/engine/query/TaskRequestImpl.java      |  87 +-
 .../org/apache/tajo/master/ContainerProxy.java  |  85 --
 .../tajo/master/LaunchTaskRunnersEvent.java     |  46 -
 .../tajo/master/QueryCoordinatorService.java    |  77 +-
 .../org/apache/tajo/master/QueryInProgress.java | 102 ++-
 .../org/apache/tajo/master/QueryManager.java    | 104 +--
 .../apache/tajo/master/TajoContainerProxy.java  | 177 ----
 .../java/org/apache/tajo/master/TajoMaster.java | 120 +--
 .../tajo/master/TajoMasterClientService.java    |  62 +-
 .../tajo/master/TaskRunnerGroupEvent.java       |  51 --
 .../apache/tajo/master/TaskRunnerLauncher.java  |  25 -
 .../master/cluster/WorkerConnectionInfo.java    |   4 +
 .../tajo/master/container/TajoContainer.java    | 173 ----
 .../tajo/master/container/TajoContainerId.java  | 171 ----
 .../master/container/TajoContainerIdPBImpl.java | 100 ---
 .../master/container/TajoConverterUtils.java    |  87 --
 .../master/event/ContainerAllocationEvent.java  |  77 --
 .../event/ContainerAllocatorEventType.java      |  26 -
 .../tajo/master/event/ContainerEvent.java       |  37 -
 .../event/GrouppedContainerAllocatorEvent.java  |  45 -
 .../tajo/master/event/LocalTaskEvent.java       |  11 +-
 .../tajo/master/event/QueryStartEvent.java      |   9 +-
 .../event/StageContainerAllocationEvent.java    |  38 -
 .../tajo/master/event/StageEventType.java       |   2 -
 .../master/event/StageShuffleReportEvent.java   |   8 +-
 .../master/event/TaskAttemptAssignedEvent.java  |   9 +-
 .../tajo/master/event/TaskAttemptEventType.java |   1 +
 .../event/TaskAttemptStatusUpdateEvent.java     |   2 +-
 .../event/TaskAttemptToSchedulerEvent.java      |  26 +-
 .../tajo/master/event/TaskCompletionEvent.java  |   2 +-
 .../tajo/master/event/TaskFatalErrorEvent.java  |   2 +-
 .../tajo/master/event/TaskRequestEvent.java     |  35 +-
 .../NonForwardQueryResultSystemScanner.java     |  94 +--
 .../org/apache/tajo/master/rm/NodeEvent.java    |  37 +
 .../apache/tajo/master/rm/NodeEventType.java    |  30 +
 .../tajo/master/rm/NodeLivelinessMonitor.java   |  56 ++
 .../tajo/master/rm/NodeReconnectEvent.java      |  35 +
 .../org/apache/tajo/master/rm/NodeState.java    |  44 +
 .../org/apache/tajo/master/rm/NodeStatus.java   | 295 +++++++
 .../apache/tajo/master/rm/NodeStatusEvent.java  |  58 ++
 .../apache/tajo/master/rm/TajoRMContext.java    |  42 +-
 .../tajo/master/rm/TajoResourceManager.java     | 185 ++++
 .../tajo/master/rm/TajoResourceTracker.java     | 197 +++--
 .../tajo/master/rm/TajoWorkerContainer.java     | 125 ---
 .../tajo/master/rm/TajoWorkerContainerId.java   |  94 ---
 .../master/rm/TajoWorkerResourceManager.java    | 605 -------------
 .../java/org/apache/tajo/master/rm/Worker.java  | 290 -------
 .../org/apache/tajo/master/rm/WorkerEvent.java  |  37 -
 .../apache/tajo/master/rm/WorkerEventType.java  |  30 -
 .../tajo/master/rm/WorkerLivelinessMonitor.java |  56 --
 .../tajo/master/rm/WorkerReconnectEvent.java    |  35 -
 .../tajo/master/rm/WorkerResourceManager.java   | 115 ---
 .../org/apache/tajo/master/rm/WorkerState.java  |  44 -
 .../tajo/master/rm/WorkerStatusEvent.java       |  54 --
 .../scheduler/AbstractQueryScheduler.java       |  73 ++
 .../master/scheduler/QuerySchedulingInfo.java   |  60 +-
 .../apache/tajo/master/scheduler/QueueInfo.java | 101 +++
 .../tajo/master/scheduler/QueueState.java       |  50 ++
 .../master/scheduler/SchedulingAlgorithms.java  |   6 +-
 .../master/scheduler/SimpleFifoScheduler.java   | 148 ----
 .../tajo/master/scheduler/SimpleScheduler.java  | 388 +++++++++
 .../master/scheduler/TajoResourceScheduler.java |  75 ++
 .../event/ResourceReserveSchedulerEvent.java    |  45 +
 .../master/scheduler/event/SchedulerEvent.java  |  27 +
 .../scheduler/event/SchedulerEventType.java     |  26 +
 .../metrics/WorkerResourceMetricsGaugeSet.java  |  16 +-
 .../tajo/querymaster/AbstractTaskScheduler.java |  17 +-
 .../tajo/querymaster/DefaultTaskScheduler.java  | 500 ++++++-----
 .../java/org/apache/tajo/querymaster/Query.java |  25 +-
 .../apache/tajo/querymaster/QueryMaster.java    | 249 +++---
 .../querymaster/QueryMasterManagerService.java  | 180 ++--
 .../tajo/querymaster/QueryMasterTask.java       | 218 +++--
 .../apache/tajo/querymaster/Repartitioner.java  |   4 +-
 .../java/org/apache/tajo/querymaster/Stage.java | 247 ++----
 .../java/org/apache/tajo/querymaster/Task.java  |  29 +-
 .../apache/tajo/querymaster/TaskAttempt.java    |  34 +-
 .../resource/DefaultResourceCalculator.java     |   4 +-
 .../org/apache/tajo/resource/NodeResource.java  |   2 +-
 .../java/org/apache/tajo/session/Session.java   |   2 +-
 .../main/java/org/apache/tajo/util/JSPUtil.java |  33 +-
 .../apache/tajo/util/history/HistoryReader.java |   8 +-
 .../apache/tajo/util/history/HistoryWriter.java |  78 +-
 .../tajo/worker/AbstractResourceAllocator.java  |  69 --
 .../tajo/worker/ExecutionBlockContext.java      |  70 +-
 .../java/org/apache/tajo/worker/FetchImpl.java  |  12 +-
 .../org/apache/tajo/worker/LegacyTaskImpl.java  | 844 -------------------
 .../apache/tajo/worker/NodeResourceManager.java | 140 +--
 .../apache/tajo/worker/NodeStatusUpdater.java   | 120 ++-
 .../apache/tajo/worker/ResourceAllocator.java   |  29 -
 .../tajo/worker/TajoResourceAllocator.java      | 415 ---------
 .../java/org/apache/tajo/worker/TajoWorker.java |  96 +--
 .../tajo/worker/TajoWorkerManagerService.java   |  79 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   9 +-
 .../org/apache/tajo/worker/TaskContainer.java   |   1 -
 .../org/apache/tajo/worker/TaskExecutor.java    | 117 ++-
 .../org/apache/tajo/worker/TaskHistory.java     |   4 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   |  31 +-
 .../org/apache/tajo/worker/TaskManager.java     | 161 +++-
 .../java/org/apache/tajo/worker/TaskRunner.java | 306 -------
 .../apache/tajo/worker/TaskRunnerHistory.java   | 152 ----
 .../apache/tajo/worker/TaskRunnerManager.java   | 238 ------
 .../tajo/worker/WorkerHeartbeatService.java     | 262 ------
 .../worker/event/ExecutionBlockErrorEvent.java  |  41 +
 .../worker/event/ExecutionBlockStartEvent.java  |  35 -
 .../worker/event/ExecutionBlockStopEvent.java   |  16 +-
 .../worker/event/NodeResourceAllocateEvent.java |  18 +-
 .../event/NodeResourceDeallocateEvent.java      |   8 +-
 .../tajo/worker/event/NodeResourceEvent.java    |  14 +-
 .../worker/event/QMResourceAllocateEvent.java   |  45 +
 .../tajo/worker/event/QueryStopEvent.java       |  35 +
 .../tajo/worker/event/TaskExecutorEvent.java    |  44 -
 .../tajo/worker/event/TaskManagerEvent.java     |  22 +-
 .../tajo/worker/event/TaskRunnerEvent.java      |  42 -
 .../tajo/worker/event/TaskRunnerStartEvent.java |  39 -
 .../tajo/worker/event/TaskRunnerStopEvent.java  |  29 -
 .../tajo/worker/event/TaskStartEvent.java       |  18 +-
 .../tajo/ws/rs/resources/ClusterResource.java   |   8 +-
 .../tajo/ws/rs/responses/WorkerResponse.java    |  53 +-
 .../src/main/proto/ContainerProtocol.proto      |  48 --
 .../main/proto/QueryCoordinatorProtocol.proto   | 117 +--
 .../src/main/proto/QueryMasterProtocol.proto    |  11 +-
 tajo-core/src/main/proto/ResourceProtos.proto   | 311 +++++++
 .../main/proto/ResourceTrackerProtocol.proto    |  39 +-
 .../src/main/proto/TajoWorkerProtocol.proto     | 348 +-------
 .../main/resources/webapps/admin/cluster.jsp    |  99 +--
 .../src/main/resources/webapps/admin/index.jsp  |  34 +-
 .../src/main/resources/webapps/admin/query.jsp  |  57 +-
 .../resources/webapps/admin/query_executor.jsp  |   3 +-
 .../resources/webapps/admin/querydetail.jsp     |   4 +-
 .../main/resources/webapps/admin/querytasks.jsp |  34 +-
 .../src/main/resources/webapps/admin/task.jsp   |  18 +-
 .../src/main/resources/webapps/worker/index.jsp |  58 +-
 .../resources/webapps/worker/querydetail.jsp    |  21 +-
 .../main/resources/webapps/worker/queryplan.jsp |   2 +-
 .../resources/webapps/worker/querytasks.jsp     |  44 +-
 .../src/main/resources/webapps/worker/task.jsp  |  16 +-
 .../resources/webapps/worker/taskcontainers.jsp |  93 --
 .../resources/webapps/worker/taskdetail.jsp     |  39 +-
 .../resources/webapps/worker/taskhistory.jsp    |   6 +-
 .../src/main/resources/webapps/worker/tasks.jsp | 107 ---
 .../java/org/apache/tajo/QueryTestCaseBase.java |   2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  24 +-
 .../planner/physical/TestBNLJoinExec.java       |   2 +-
 .../physical/TestFullOuterHashJoinExec.java     |   2 +-
 .../physical/TestFullOuterMergeJoinExec.java    |   2 +-
 .../planner/physical/TestHashJoinExec.java      |   2 +-
 .../physical/TestLeftOuterHashJoinExec.java     |   2 +-
 .../planner/physical/TestMergeJoinExec.java     |   2 +-
 .../planner/physical/TestPhysicalPlanner.java   |   4 +-
 .../physical/TestRightOuterMergeJoinExec.java   |   2 +-
 .../query/TestJoinOnPartitionedTables.java      |   1 +
 .../tajo/engine/query/TestOuterJoinQuery.java   |   2 -
 .../TestNonForwardQueryResultSystemScanner.java |  42 +-
 .../apache/tajo/master/TestRepartitioner.java   |  11 +-
 .../tajo/master/rm/TestTajoResourceManager.java | 454 ----------
 .../master/scheduler/TestFifoScheduler.java     | 115 ---
 .../master/scheduler/TestSimpleScheduler.java   | 301 +++++++
 .../apache/tajo/querymaster/TestKillQuery.java  | 166 +---
 .../apache/tajo/querymaster/TestQueryState.java |  38 +-
 .../tajo/util/metrics/TestSystemMetrics.java    |   1 +
 .../apache/tajo/worker/MockExecutionBlock.java  |   6 +-
 .../tajo/worker/MockNodeResourceManager.java    |  30 +-
 .../tajo/worker/MockNodeStatusUpdater.java      |  26 +-
 .../apache/tajo/worker/MockTaskExecutor.java    |  29 +-
 .../org/apache/tajo/worker/MockTaskManager.java |  24 +-
 .../apache/tajo/worker/MockWorkerContext.java   |  20 -
 .../org/apache/tajo/worker/TestFetcher.java     |  14 +-
 .../org/apache/tajo/worker/TestHistory.java     | 124 ---
 .../tajo/worker/TestNodeResourceManager.java    | 100 ++-
 .../tajo/worker/TestNodeStatusUpdater.java      |  83 +-
 .../apache/tajo/worker/TestTaskExecutor.java    | 108 +--
 .../org/apache/tajo/worker/TestTaskManager.java |  75 +-
 .../configuration/worker_configuration.rst      |  94 ++-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |   8 +-
 tajo-plan/src/main/proto/Plan.proto             | 125 +++
 tajo-project/pom.xml                            |   2 +-
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |   6 +-
 198 files changed, 5011 insertions(+), 9450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index ef8fd97..671a31f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -33,7 +33,7 @@ notifications:
   - issues@tajo.apache.org
   irc: "chat.freenode.net#tajo"
 
-before_install: ulimit -t 514029
+before_install: ulimit -t 514029 -u 2048 -n 3000
 install: ./dev-support/travis-install-dependencies.sh
 
 script: 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 890aecf..1529c08 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1397: Resource allocation should be fine grained. (jinho)
+
     TAJO-1352: Improve the join order algorithm to consider missed cases of 
     associative join operators. (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
index 6ea6ac6..9f71e8e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java
@@ -38,7 +38,6 @@ import java.lang.reflect.Constructor;
  *
  */
 public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable, GsonObject, Comparable<FunctionDesc> {
-  private FunctionDescProto.Builder builder = FunctionDescProto.newBuilder();
 
   @Expose private FunctionSignature signature;
   @Expose private FunctionInvocation invocation;
@@ -184,11 +183,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable,
 
   @Override
   public FunctionDescProto getProto() {
-    if (builder == null) {
-      builder = FunctionDescProto.newBuilder();
-    } else {
-      builder.clear();
-    }
+    FunctionDescProto.Builder builder = FunctionDescProto.newBuilder();
     builder.setSignature(signature.getProto());
     builder.setSupplement(supplement.getProto());
     builder.setInvocation(invocation.getProto());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index f2e9795..b1410dd 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -172,7 +172,8 @@ public class CatalogServer extends AbstractService {
     }
   }
 
-  public void start() {
+  @Override
+  public void serviceStart() throws Exception {
     String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
     int workerNum = conf.getIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM);
@@ -189,10 +190,11 @@ public class CatalogServer extends AbstractService {
     }
 
     LOG.info("Catalog Server startup (" + bindAddressStr + ")");
-    super.start();
+    super.serviceStart();
   }
 
-  public void stop() {
+  @Override
+  public void serviceStop() throws Exception {
     LOG.info("Catalog Server (" + bindAddressStr + ") shutdown");
 
     // If CatalogServer shutdowns before it started, rpcServer and store may be NULL.
@@ -201,13 +203,9 @@ public class CatalogServer extends AbstractService {
       this.rpcServer.shutdown();
     }
     if (store != null) {
-      try {
-        store.close();
-      } catch (IOException ioe) {
-        LOG.error(ioe.getMessage(), ioe);
-      }
+      store.close();
     }
-    super.stop();
+    super.serviceStop();
   }
 
   public CatalogProtocolHandler getHandler() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
index e3c830f..69067f4 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java
@@ -31,10 +31,8 @@ class ClusterTableDescriptor extends AbstractTableDescriptor {
       new ColumnDescriptor("total_cpu", Type.INT4, 0),
       new ColumnDescriptor("used_mem", Type.INT8, 0),
       new ColumnDescriptor("total_mem", Type.INT8, 0),
-      new ColumnDescriptor("free_heap", Type.INT8, 0),
-      new ColumnDescriptor("max_heap", Type.INT8, 0),
-      new ColumnDescriptor("used_diskslots", Type.FLOAT4, 0),
-      new ColumnDescriptor("total_diskslots", Type.FLOAT4, 0),
+      new ColumnDescriptor("used_disk", Type.INT4, 0),
+      new ColumnDescriptor("total_disk", Type.INT4, 0),
       new ColumnDescriptor("running_tasks", Type.INT4, 0),
       new ColumnDescriptor("last_heartbeat_ts", Type.TIMESTAMP, 0)
   };

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
index 739cd54..76ba7a9 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
@@ -268,7 +268,7 @@ public class TajoAdmin {
     } else {
       String fmtQueryMasterLine = "%1$-25s %2$-5s %3$-5s %4$-10s %5$-10s%n";
       line = String.format(fmtQueryMasterLine, "QueryMaster", "Port", "Query",
-                           "Heap", "Status");
+                           "Mem", "Status");
       writer.write(line);
       line = String.format(fmtQueryMasterLine, DASHLINE_LEN25, DASHLINE_LEN5,
               DASHLINE_LEN5, DASHLINE_LEN10, DASHLINE_LEN10);
@@ -276,12 +276,12 @@ public class TajoAdmin {
       for (WorkerResourceInfo queryMaster : liveQueryMasters) {
         TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo();
         String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort());
-        String heap = String.format("%d MB", queryMaster.getMaxHeap() / 1024 / 1024);
+        String memory = String.format("%d MB", queryMaster.getAvailableResource().getMemory());
         line = String.format(fmtQueryMasterLine,
             queryMasterHost,
             connInfo.getClientPort(),
             queryMaster.getNumQueryMasterTasks(),
-            heap,
+            memory,
             queryMaster.getWorkerStatus());
         writer.write(line);
       }
@@ -348,7 +348,7 @@ public class TajoAdmin {
     String line = String.format(fmtWorkerLine,
         "Worker", "Port", "Tasks",
         "Mem", "Disk",
-        "Heap", "Status");
+        "Cpu", "Status");
     writer.write(line);
     line = String.format(fmtWorkerLine,
         DASHLINE_LEN25, DASHLINE_LEN5, DASHLINE_LEN5,
@@ -359,17 +359,16 @@ public class TajoAdmin {
     for (WorkerResourceInfo worker : workers) {
       TajoProtos.WorkerConnectionInfoProto connInfo = worker.getConnectionInfo();
       String workerHost = String.format("%s:%d", connInfo.getHost(), connInfo.getPeerRpcPort());
-      String mem = String.format("%d/%d", worker.getUsedMemoryMB(),
-          worker.getMemoryMB());
-      String disk = String.format("%.2f/%.2f", worker.getUsedDiskSlots(),
-          worker.getDiskSlots());
-      String heap = String.format("%d/%d MB", worker.getFreeHeap()/1024/1024,
-          worker.getMaxHeap()/1024/1024);
-
+      String mem = String.format("%d/%d", worker.getAvailableResource().getMemory(),
+          worker.getTotalResource().getMemory());
+      String disk = String.format("%d/%d", worker.getAvailableResource().getDisks(),
+          worker.getTotalResource().getDisks());
+      String cpu = String.format("%d/%d", worker.getAvailableResource().getVirtualCores(),
+          worker.getTotalResource().getVirtualCores());
       line = String.format(fmtWorkerLine, workerHost,
           connInfo.getPullServerPort(),
           worker.getNumRunningTasks(),
-          mem, disk, heap, worker.getWorkerStatus());
+          mem, disk, cpu, worker.getWorkerStatus());
       writer.write(line);
     }
     writer.write("\n\n");

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 5497faa..9c20fd8 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -177,19 +177,12 @@ message GetClusterInfoRequest {
 
 message WorkerResourceInfo {
   required WorkerConnectionInfoProto connectionInfo = 1;
-  required float diskSlots = 2;
-  required int32 cpuCoreSlots = 3;
-  required int32 memoryMB = 4;
-  required float usedDiskSlots = 5;
-  required int32 usedMemoryMB = 6;
-  required int32 usedCpuCoreSlots = 7;
-  required int64 maxHeap = 8;
-  required int64 freeHeap = 9;
-  required int64 totalHeap = 10;
-  required int32 numRunningTasks = 11;
-  required string workerStatus = 12;
-  required int64 lastHeartbeat = 13;
-  required int32 numQueryMasterTasks = 14;
+  required NodeResourceProto totalResource = 2;
+  required NodeResourceProto availableResource = 3;
+  required int32 numRunningTasks = 4;
+  required string workerStatus = 5;
+  required int64 lastHeartbeat = 6;
+  required int32 numQueryMasterTasks = 7;
 }
 
 message GetClusterInfoResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 28fdb0b..832a5b4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -108,7 +108,7 @@ public enum SessionVars implements ConfigKey {
       Boolean.class, Validators.bool()),
 
   QUERY_EXECUTE_PARALLEL(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, "Maximum parallel running of execution blocks for a query",
-      DEFAULT, Integer.class, Validators.min("0")),
+      DEFAULT, Integer.class, Validators.min("1")),
 
   // for physical Executors
   EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT,

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 14cfb11..0436116 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -152,12 +152,13 @@ public class TajoConf extends Configuration {
     // Resource tracker service
     RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003",
         Validators.networkAddr()),
-    RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120 * 1000), // seconds
+    RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120), // seconds
 
     // QueryMaster resource
-    TAJO_QUERYMASTER_DISK_SLOT("tajo.qm.resource.disk.slots", 0.0f, Validators.min("0.0f")),
-    TAJO_QUERYMASTER_MEMORY_MB("tajo.qm.resource.memory-mb", 512, Validators.min("64")),
-    TAJO_QUERYMASTER_ALLOCATION_TIMEOUT("tajo.qm.resource.allocation.timeout", "3 sec"),
+    QUERYMASTER_MINIMUM_MEMORY("tajo.qm.resource.min.memory-mb", 500, Validators.min("64")),
+
+    // Worker task resource
+    TASK_RESOURCE_MINIMUM_MEMORY("tajo.task.resource.min.memory-mb", 500, Validators.min("64")),
 
     // Tajo Worker Service Addresses
     WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080", Validators.networkAddr()),
@@ -172,42 +173,32 @@ public class TajoConf extends Configuration {
 
     // Tajo Worker Resources
     WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores",
-        Runtime.getRuntime().availableProcessors(), Validators.min("1")),
-    WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")),
-    @Deprecated
-    WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
-    WORKER_RESOURCE_AVAILABLE_DISKS_NUM("tajo.worker.resource.disks.num", 1, Validators.min("1")),
+        Runtime.getRuntime().availableProcessors(), Validators.min("2")), // 1qm + 1task
+    WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1000, Validators.min("64")),
+
+    WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1, Validators.min("1")),
+
     WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2,
         Validators.min("1")),
-    WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
-    WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()),
 
-    // Tajo Worker Dedicated Resources
-    WORKER_RESOURCE_DEDICATED("tajo.worker.resource.dedicated", false, Validators.bool()),
-    WORKER_RESOURCE_DEDICATED_MEMORY_RATIO("tajo.worker.resource.dedicated-memory-ratio", 0.8f, 
-        Validators.range("0.0f", "1.0f")),
+    WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()),
 
     // Tajo History
     WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours
-    QUERYMASTER_HISTORY_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 6 * 60), // 6 hours
+    QUERYMASTER_CACHE_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 10), // 10 mins
 
-    WORKER_HEARTBEAT_INTERVAL("tajo.worker.heartbeat.interval", 10 * 1000),  // 10 sec
+    WORKER_HEARTBEAT_IDLE_INTERVAL("tajo.worker.heartbeat.idle.interval", 10 * 1000),  // 10 sec
+    WORKER_HEARTBEAT_ACTIVE_INTERVAL("tajo.worker.heartbeat.active.interval", 1000),  // 1 sec
 
-    // Resource Manager
-    RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager",
+    //Default query scheduler
+    RESOURCE_SCHEDULER_CLASS("tajo.resource.scheduler", "org.apache.tajo.master.scheduler.SimpleScheduler",
         Validators.groups(Validators.notNull(), Validators.clazz())),
 
+    QUERYMASTER_TASK_SCHEDULER_DELAY("tajo.qm.task-scheduler.delay", 50),  // 50 ms
+
     // Catalog
     CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005", Validators.networkAddr()),
 
-
-    // for Yarn Resource Manager ----------------------------------------------
-
-    /** how many launching TaskRunners in parallel */
-    @Deprecated
-    YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num",
-        Runtime.getRuntime().availableProcessors() * 2),
-
     // Query Configuration
     QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),
     QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 1024, Validators.min("0")),
@@ -216,7 +207,8 @@ public class TajoConf extends Configuration {
     PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
-    SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2),
+    SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
+        Runtime.getRuntime().availableProcessors() * 2, Validators.min("1")),
     SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  8192),
     SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120),
     SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20),
@@ -262,8 +254,6 @@ public class TajoConf extends Configuration {
         Runtime.getRuntime().availableProcessors() * 1),
 
     // Task Configuration -----------------------------------------------------
-    TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
-    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 0.5f),
     TASK_DEFAULT_SIZE("tajo.task.size-mb", 128),
 
     // Query and Optimization -------------------------------------------------
@@ -285,8 +275,6 @@ public class TajoConf extends Configuration {
     HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"),
     HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"),
     HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7),
-    HISTORY_QUERY_REPLICATION("tajo.history.query.replication", 1, Validators.min("1")),
-    HISTORY_TASK_REPLICATION("tajo.history.task.replication", 1, Validators.min("1")),
 
     // Misc -------------------------------------------------------------------
     // Fragment
@@ -329,8 +317,7 @@ public class TajoConf extends Configuration {
 
     $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true),
 
-    // WARN "tajo.yarn-rm.parallel-task-runner-launcher-num" should be set enough to avoid deadlock
-    $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 1),
+    $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 10),
 
     // for physical Executors
     $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
index 0d70cc2..32e086c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java
@@ -1050,4 +1050,8 @@ public class NumberUtil {
 
     return returnNumber;
   }
+
+  public static int compare(long x, long y) {
+    return (x < y) ? -1 : ((x == y) ? 0 : 1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
index 0a16072..018c62a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.util;
 
+import io.netty.util.CharsetUtil;
 import org.apache.commons.lang.CharUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.SystemUtils;
@@ -77,10 +78,12 @@ public class StringUtils {
     }
     return buf.toString();
   }
-
-  static CharsetEncoder asciiEncoder = Charset.forName("US-ASCII").newEncoder(); // or "ISO-8859-1" for ISO Latin 1
-
+  /**
+   * Check Seven-bit ASCII
+   */
   public static boolean isPureAscii(String v) {
+    // get thread-safe encoder
+    CharsetEncoder asciiEncoder = CharsetUtil.getEncoder(CharsetUtil.US_ASCII);
     return asciiEncoder.canEncode(v);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 2293ef5..66e8acc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -267,4 +267,13 @@ public class TUtil {
     StackTraceElement element = ste[2 + depth];
     return element.getClassName() + ":" + element.getMethodName() + "(" + element.getLineNumber() +")";
   }
+
+  public static <T> T checkTypeAndGet(Object instance, Class<T> type) {
+    if (!type.isInstance(instance)) {
+      throw new IllegalArgumentException(instance.getClass().getSimpleName()
+          + " must be a " + type.getSimpleName() + " type");
+
+    }
+    return (T) instance;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 33e37b6..bd52e12 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -163,12 +163,12 @@
                 <argument>--proto_path=../tajo-client/src/main/proto</argument>
                 <argument>--proto_path=../tajo-plan/src/main/proto</argument>
                 <argument>--java_out=target/generated-sources/proto</argument>
-                <argument>src/main/proto/ContainerProtocol.proto</argument>
                 <argument>src/main/proto/ResourceTrackerProtocol.proto</argument>
                 <argument>src/main/proto/QueryMasterProtocol.proto</argument>
                 <argument>src/main/proto/QueryCoordinatorProtocol.proto</argument>
                 <argument>src/main/proto/TajoWorkerProtocol.proto</argument>
                 <argument>src/main/proto/InternalTypes.proto</argument>
+                <argument>src/main/proto/ResourceProtos.proto</argument>
               </arguments>
             </configuration>
             <goals>
@@ -390,11 +390,6 @@
       <version>1.2.3</version>
     </dependency>
     <dependency>
-      <groupId>org.eclipse.jdt</groupId>
-      <artifactId>core</artifactId>
-      <version>3.1.1</version>
-    </dependency>
-    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-codec-http</artifactId>
     </dependency>
@@ -407,6 +402,12 @@
       <groupId>org.mortbay.jetty</groupId>
       <artifactId>jsp-2.1</artifactId>
       <version>6.1.14</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>core</artifactId>
+          <groupId>org.eclipse.jdt</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.codahale.metrics</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 9f850f9..5c62654 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -28,9 +28,7 @@ import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.Aggregation.GroupType;
 import org.apache.tajo.algebra.LiteralValue.LiteralType;
-import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.engine.parser.SQLParser.*;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.StringUtils;
 
@@ -44,7 +42,6 @@ import static org.apache.tajo.common.TajoDataTypes.Type;
 import static org.apache.tajo.engine.parser.SQLParser.*;
 
 public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
-  private SQLParser parser;
 
   public SQLAnalyzer() {
   }
@@ -53,15 +50,14 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     ANTLRInputStream input = new ANTLRInputStream(sql);
     SQLLexer lexer = new SQLLexer(input);
     CommonTokenStream tokens = new CommonTokenStream(lexer);
-    this.parser = new SQLParser(tokens);
-    parser.setBuildParseTree(true);
-    parser.removeErrorListeners();
-
-    parser.setErrorHandler(new SQLErrorStrategy());
-    parser.addErrorListener(new SQLErrorListener());
-
     SqlContext context;
     try {
+      SQLParser parser = new SQLParser(tokens);
+      parser.setBuildParseTree(true);
+      parser.removeErrorListeners();
+
+      parser.setErrorHandler(new SQLErrorStrategy());
+      parser.addErrorListener(new SQLErrorListener());
       context = parser.sql();
     } catch (SQLParseError e) {
       e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c6b9b41..377aebe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -40,11 +40,10 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
@@ -66,12 +65,13 @@ import java.util.Stack;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
+import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
+import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
+import static org.apache.tajo.plan.serder.PlanProto.GroupbyEnforce.GroupbyAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.SortedInputEnforce;
+import static org.apache.tajo.plan.serder.PlanProto.SortEnforce;
 
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
@@ -885,7 +885,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT);
     if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) {
       SortNode sortNode = (SortNode) node.peek();
-      TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
+      SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
 
       boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName());
       SortSpec [] sortSpecs = LogicalNodeDeserializer.convertSortSpecs(sortEnforcer.getSortSpecsList());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 8128390..92ecadd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -18,24 +18,21 @@
 
 package org.apache.tajo.engine.planner.enforce;
 
-
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.util.TUtil;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.*;
+import static org.apache.tajo.plan.serder.PlanProto.GroupbyEnforce.GroupbyAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.*;
+import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
+import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
 
 public class Enforcer implements ProtoObject<EnforcerProto> {
   Map<EnforceType, List<EnforceProperty>> properties;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 3adc0a3..e09684a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -25,8 +25,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.util.StringUtils;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.plan.serder.PlanProto.DataChannelProto;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.apache.tajo.plan.serder.PlanProto.TransmitType;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index f8cd1e9..80317b0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -23,13 +23,14 @@ package org.apache.tajo.engine.planner.global;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
+import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.util.graph.DirectedGraphVisitor;
 import org.apache.tajo.util.graph.SimpleDirectedGraph;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -37,8 +38,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-
 public class MasterPlan {
   private final QueryId queryId;
   private final QueryContext context;
@@ -288,7 +287,7 @@ public class MasterPlan {
       if (block.getEnforcer().getProperties().size() > 0) {
         sb.append("\n[Enforcers]\n");
         int i = 0;
-        for (TajoWorkerProtocol.EnforceProperty enforce : block.getEnforcer().getProperties()) {
+        for (EnforceProperty enforce : block.getEnforcer().getProperties()) {
           sb.append(" ").append(i++).append(": ");
           sb.append(Enforcer.toString(enforce));
           sb.append("\n");

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 8095458..f181193 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -30,9 +30,9 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
index 86874ba..5cca4c5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
@@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -57,11 +57,11 @@ public abstract class ScanExec extends PhysicalExec {
   private boolean checkIfBroadcast() {
     Enforcer enforcer = context.getEnforcer();
 
-    if (enforcer != null && enforcer.hasEnforceProperty(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST)) {
-      List<TajoWorkerProtocol.EnforceProperty> properties =
-          enforcer.getEnforceProperties(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST);
+    if (enforcer != null && enforcer.hasEnforceProperty(EnforceProperty.EnforceType.BROADCAST)) {
+      List<EnforceProperty> properties =
+          enforcer.getEnforceProperties(EnforceProperty.EnforceType.BROADCAST);
 
-      for (TajoWorkerProtocol.EnforceProperty property : properties) {
+      for (EnforceProperty property : properties) {
         if (getCanonicalName().equals(property.getBroadcast().getTableName())) {
           return true;
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
index 2fa272a..48d4780 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
@@ -21,32 +21,27 @@
  */
 package org.apache.tajo.engine.query;
 
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.util.List;
 
-public interface TaskRequest extends ProtoObject<TajoWorkerProtocol.TaskRequestProto> {
+public interface TaskRequest extends ProtoObject<TaskRequestProto> {
 
-	public TaskAttemptId getId();
-	public List<CatalogProtos.FragmentProto> getFragments();
-	public String getOutputTableId();
-	public boolean isClusteredOutput();
-	public PlanProto.LogicalNodeTree getPlan();
-	public boolean isInterQuery();
-	public void setInterQuery();
-	public void addFetch(String name, FetchImpl fetch);
-	public List<FetchImpl> getFetches();
-  public boolean shouldDie();
-  public void setShouldDie();
-  public QueryContext getQueryContext(TajoConf conf);
-  public DataChannel getDataChannel();
-  public Enforcer getEnforcer();
+	TaskAttemptId getId();
+	List<CatalogProtos.FragmentProto> getFragments();
+	PlanProto.LogicalNodeTree getPlan();
+	void setInterQuery();
+	void addFetch(String name, FetchImpl fetch);
+	List<FetchImpl> getFetches();
+  QueryContext getQueryContext(TajoConf conf);
+  DataChannel getDataChannel();
+  Enforcer getEnforcer();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
index b4727dc..f97d005 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
@@ -22,9 +22,9 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProtoOrBuilder;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
+import org.apache.tajo.ResourceProtos.FetchProto;
+import org.apache.tajo.ResourceProtos.TaskRequestProtoOrBuilder;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.worker.FetchImpl;
 
@@ -34,22 +34,22 @@ import java.util.List;
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
 public class TaskRequestImpl implements TaskRequest {
-	
-  private TaskAttemptId id;
-  private List<FragmentProto> fragments;
-  private String outputTable;
+
+	private TaskAttemptId id;
+	private List<FragmentProto> fragments;
+	private String outputTable;
 	private boolean isUpdated;
 	private boolean clusteredOutput;
 	private PlanProto.LogicalNodeTree plan;     // logical node
 	private Boolean interQuery;
 	private List<FetchImpl> fetches;
-  private Boolean shouldDie;
-  private QueryContext queryContext;
-  private DataChannel dataChannel;
-  private Enforcer enforcer;
+	private QueryContext queryContext;
+	private DataChannel dataChannel;
+	private Enforcer enforcer;
+	private String queryMasterHostAndPort;
 	
-	private TaskRequestProto proto = TajoWorkerProtocol.TaskRequestProto.getDefaultInstance();
-	private TajoWorkerProtocol.TaskRequestProto.Builder builder = null;
+	private TaskRequestProto proto = TaskRequestProto.getDefaultInstance();
+	private TaskRequestProto.Builder builder = null;
 	private boolean viaProto = false;
 	
 	public TaskRequestImpl() {
@@ -61,9 +61,9 @@ public class TaskRequestImpl implements TaskRequest {
 	public TaskRequestImpl(TaskAttemptId id, List<FragmentProto> fragments,
 												 String outputTable, boolean clusteredOutput,
 												 PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel channel,
-												 Enforcer enforcer) {
+												 Enforcer enforcer, String queryMasterHostAndPort) {
 		this();
-		this.set(id, fragments, outputTable, clusteredOutput, plan, queryContext, channel, enforcer);
+		this.set(id, fragments, outputTable, clusteredOutput, plan, queryContext, channel, enforcer, queryMasterHostAndPort);
 	}
 	
 	public TaskRequestImpl(TaskRequestProto proto) {
@@ -75,7 +75,8 @@ public class TaskRequestImpl implements TaskRequest {
 	
 	public void set(TaskAttemptId id, List<FragmentProto> fragments,
 			String outputTable, boolean clusteredOutput,
-			PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
+			PlanProto.LogicalNodeTree plan, QueryContext queryContext,
+									DataChannel dataChannel, Enforcer enforcer, String queryMasterHostAndPort) {
 		this.id = id;
 		this.fragments = fragments;
 		this.outputTable = outputTable;
@@ -86,6 +87,7 @@ public class TaskRequestImpl implements TaskRequest {
     this.queryContext = queryContext;
     this.dataChannel = dataChannel;
     this.enforcer = enforcer;
+		this.queryMasterHostAndPort = queryMasterHostAndPort;
 	}
 
 	@Override
@@ -124,32 +126,6 @@ public class TaskRequestImpl implements TaskRequest {
 		return this.fragments;
 	}
 
-	@Override
-	public String getOutputTableId() {
-		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (outputTable != null) {
-			return this.outputTable;
-		}
-		if (!p.hasOutputTable()) {
-			return null;
-		}
-		this.outputTable = p.getOutputTable();
-		return this.outputTable;
-	}
-
-	@Override
-	public boolean isClusteredOutput() {
-		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (isUpdated) {
-			return this.clusteredOutput;
-		}
-		if (!p.hasClusteredOutput()) {
-			return false;
-		}
-		this.clusteredOutput = p.getClusteredOutput();
-		this.isUpdated = true;
-		return this.clusteredOutput;
-	}
 
 	@Override
 	public PlanProto.LogicalNodeTree getPlan() {
@@ -248,33 +224,14 @@ public class TaskRequestImpl implements TaskRequest {
     }
     TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
     this.fetches = new ArrayList<FetchImpl>();
-    for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) {
+    for(FetchProto fetch : p.getFetchesList()) {
       fetches.add(new FetchImpl(fetch));
     }
 	}
 
-  @Override
-  public boolean shouldDie() {
-    TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
-    if (shouldDie != null) {
-      return shouldDie;
-    }
-    if (!p.hasShouldDie()) {
-      return false;
-    }
-    this.shouldDie = p.getShouldDie();
-    return this.shouldDie;
-  }
-
-  @Override
-  public void setShouldDie() {
-    maybeInitBuilder();
-    shouldDie = true;
-  }
-
   private void maybeInitBuilder() {
 		if (viaProto || builder == null) {
-			builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(proto);
+			builder = TaskRequestProto.newBuilder(proto);
 		}
 		viaProto = true;
 	}
@@ -305,8 +262,8 @@ public class TaskRequestImpl implements TaskRequest {
         builder.addFetches(fetches.get(i).getProto());
       }
     }
-    if (this.shouldDie != null) {
-      builder.setShouldDie(this.shouldDie);
+    if (this.queryMasterHostAndPort != null) {
+      builder.setQueryMasterHostAndPort(this.queryMasterHostAndPort);
     }
     if (this.queryContext != null) {
       builder.setQueryContext(queryContext.getProto());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
deleted file mode 100644
index cad63a0..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
-
-public abstract class ContainerProxy {
-  protected static final Log LOG = LogFactory.getLog(ContainerProxy.class);
-
-  final public static FsPermission QUERYCONF_FILE_PERMISSION =
-          FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-
-  protected static enum ContainerState {
-    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
-  }
-
-  protected final ExecutionBlockId executionBlockId;
-  protected Configuration conf;
-  protected QueryMasterTask.QueryMasterTaskContext context;
-
-  protected ContainerState state;
-  // store enough information to be able to cleanup the container
-  protected TajoContainer container;
-  protected TajoContainerId containerId;
-  protected String hostName;
-  protected int port = -1;
-
-  public abstract void launch(ContainerLaunchContext containerLaunchContext);
-  public abstract void stopContainer();
-
-  public ContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf,
-                        ExecutionBlockId executionBlockId, TajoContainer container) {
-    this.context = context;
-    this.conf = conf;
-    this.state = ContainerState.PREP;
-    this.container = container;
-    this.executionBlockId = executionBlockId;
-    this.containerId = container.getId();
-  }
-
-  public synchronized boolean isCompletelyDone() {
-    return state == ContainerState.DONE || state == ContainerState.FAILED;
-  }
-
-  public String getTaskHostName() {
-    return this.hostName;
-  }
-
-  public int getTaskPort() {
-    return this.port;
-  }
-
-  public TajoContainerId getContainerId() {
-    return containerId;
-  }
-
-  public ExecutionBlockId getBlockId() {
-    return executionBlockId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
deleted file mode 100644
index e620afa..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.container.TajoContainer;
-
-import java.util.Collection;
-
-public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent {
-  private final QueryContext queryContext;
-  private final String planJson;
-
-  public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId,
-                                Collection<TajoContainer> containers, QueryContext queryContext,
-                                String planJson) {
-    super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers);
-    this.queryContext = queryContext;
-    this.planJson = planJson;
-  }
-
-  public QueryContext getQueryContext() {
-    return queryContext;
-  }
-
-  public String getPlanJson() {
-    return planJson;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
index 1b1d49e..3b04fc5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -23,13 +23,13 @@ import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.NodeStatus;
+import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
@@ -37,7 +37,8 @@ import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
-import java.util.List;
+
+import static org.apache.tajo.ResourceProtos.*;
 
 public class QueryCoordinatorService extends AbstractService {
   private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class);
@@ -59,30 +60,27 @@ public class QueryCoordinatorService extends AbstractService {
   }
 
   @Override
-  public void start() {
+  public void serviceStart() throws Exception {
     String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
     int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
-    try {
-      server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
-    } catch (Exception e) {
-      LOG.error(e, e);
-    }
+
+    server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
     server.start();
     bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
     this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
         NetUtils.normalizeInetSocketAddress(bindAddress));
     LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
-    super.start();
+    super.serviceStart();
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     if(server != null) {
       server.shutdown();
       server = null;
     }
-    super.stop();
+    super.serviceStop();
   }
 
   public InetSocketAddress getBindAddress() {
@@ -97,62 +95,47 @@ public class QueryCoordinatorService extends AbstractService {
     @Override
     public void heartbeat(
         RpcController controller,
-        TajoHeartbeat request, RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+        TajoHeartbeatRequest request, RpcCallback<TajoHeartbeatResponse> done) {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
       }
 
-      QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+      TajoHeartbeatResponse.ResponseCommand command;
 
       QueryManager queryManager = context.getQueryJobManager();
       command = queryManager.queryHeartbeat(request);
 
-      QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder();
+      TajoHeartbeatResponse.Builder builder = TajoHeartbeatResponse.newBuilder();
       builder.setHeartbeatResult(BOOL_TRUE);
       if(command != null) {
         builder.setResponseCommand(command);
       }
 
-      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
       done.run(builder.build());
     }
 
+    /**
+     * Reserve a node resources to TajoMaster
+     */
     @Override
-    public void allocateWorkerResources(
-        RpcController controller,
-        QueryCoordinatorProtocol.WorkerResourceAllocationRequest request,
-        RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> done) {
-      context.getResourceManager().allocateWorkerResources(request, done);
-    }
-
-    @Override
-    public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request,
-                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
-
-      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
-        context.getResourceManager().releaseWorkerResource(eachContainer);
-      }
-      done.run(BOOL_TRUE);
+    public void reserveNodeResources(RpcController controller, NodeResourceRequest request,
+                                     RpcCallback<NodeResourceResponse> done) {
+      Dispatcher dispatcher = context.getResourceManager().getRMContext().getDispatcher();
+      dispatcher.getEventHandler().handle(new ResourceReserveSchedulerEvent(request, done));
     }
 
+    /**
+     * Get all worker connection information
+     */
     @Override
-    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
-                                     RpcCallback<WorkerResourcesRequest> done) {
-
-      WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder();
-      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
-
-      for(Worker worker: workers) {
-        WorkerResource resource = worker.getResource();
-
-        WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder();
+    public void getAllWorkers(RpcController controller, PrimitiveProtos.NullProto request,
+                                     RpcCallback<WorkerConnectionsResponse> done) {
 
-        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
-        workerResource.setMemoryMB(resource.getMemoryMB());
-        workerResource.setDiskSlots(resource.getDiskSlots());
+      WorkerConnectionsResponse.Builder builder = WorkerConnectionsResponse.newBuilder();
+      Collection<NodeStatus> nodeStatuses = context.getResourceManager().getRMContext().getNodes().values();
 
-        builder.addWorkerResources(workerResource);
+      for(NodeStatus nodeStatus : nodeStatuses) {
+        builder.addWorker(nodeStatus.getConnectionInfo().getProto());
       }
       done.run(builder.build());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index ece42f7..e22663a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -22,24 +22,26 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.ResourceProtos.AllocationResourceProto;
+import org.apache.tajo.ResourceProtos.QueryExecutionRequest;
 import org.apache.tajo.TajoProtos;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.TajoResourceManager;
 import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.*;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
 
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -53,9 +55,9 @@ public class QueryInProgress {
 
   private LogicalRootNode plan;
 
-  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+  private volatile boolean querySubmitted;
 
-  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private volatile boolean isStopped;
 
   private QueryInfo queryInfo;
 
@@ -65,6 +67,8 @@ public class QueryInProgress {
 
   private QueryMasterProtocolService queryMasterRpcClient;
 
+  private AllocationResourceProto allocationResource;
+
   private final Lock readLock;
   private final Lock writeLock;
 
@@ -104,14 +108,16 @@ public class QueryInProgress {
   }
 
   public void stopProgress() {
-    if(stopped.getAndSet(true)) {
+    if (isStopped) {
       return;
+    } else {
+      isStopped = true;
     }
 
     LOG.info("=========================================================");
     LOG.info("Stop query:" + queryId);
 
-    masterContext.getResourceManager().releaseQueryMaster(queryId);
+    masterContext.getResourceManager().getScheduler().stopQuery(queryId);
 
     RpcClientManager.cleanup(queryMasterRpc);
 
@@ -122,7 +128,13 @@ public class QueryInProgress {
     }
   }
 
-  public boolean startQueryMaster() {
+  /**
+   * Connect to QueryMaster and allocate QM resource.
+   *
+   * @param allocation QM resource
+   * @return If there is no available resource, It returns false
+   */
+  protected boolean allocateToQueryMaster(AllocationResourceProto allocation) {
     try {
       writeLock.lockInterruptibly();
     } catch (Exception e) {
@@ -130,19 +142,29 @@ public class QueryInProgress {
       return false;
     }
     try {
-      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
-      WorkerResourceManager resourceManager = masterContext.getResourceManager();
-      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+      TajoResourceManager resourceManager = masterContext.getResourceManager();
+      WorkerConnectionInfo connectionInfo =
+          resourceManager.getRMContext().getNodes().get(allocation.getWorkerId()).getConnectionInfo();
+      try {
+        if(queryMasterRpcClient == null) {
+          connectQueryMaster(connectionInfo);
+        }
+
+        CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>();
+        queryMasterRpcClient.allocateQueryMaster(callFuture.getController(), allocation, callFuture);
 
-      // if no resource to allocate a query master
-      if(resource == null) {
-        throw new RuntimeException("No Available Resources for QueryMaster");
+        if(!callFuture.get().getValue()) return false;
+
+      } catch (ConnectException ce) {
+        return false;
       }
 
-      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
-      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
-      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
-      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+      this.allocationResource = allocation;
+      this.queryInfo.setQueryMaster(connectionInfo.getHost());
+      this.queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+      this.queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
+      this.queryInfo.setQueryMasterInfoPort(connectionInfo.getHttpInfoPort());
 
       return true;
     } catch (Exception e) {
@@ -153,54 +175,58 @@ public class QueryInProgress {
     }
   }
 
-  private void connectQueryMaster() throws Exception {
-    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
-    LOG.info("Connect to QueryMaster:" + addr);
-
+  private void connectQueryMaster(WorkerConnectionInfo connectionInfo)
+      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
     RpcClientManager.cleanup(queryMasterRpc);
+
+    InetSocketAddress addr = NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getQueryMasterPort());
+    LOG.info("Try to connect to QueryMaster:" + addr);
     queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true);
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 
-  public void submitQueryToMaster() {
-    if(querySubmitted.get()) {
-      return;
+  /**
+   * Launch the allocated query to QueryMaster
+   */
+  public boolean submitToQueryMaster() {
+    if(querySubmitted) {
+      return false;
     }
 
     try {
       writeLock.lockInterruptibly();
     } catch (Exception e) {
       LOG.error("Failed to lock by exception " + e.getMessage(), e);
-      return;
+      return false;
     }
 
     try {
-      if(queryMasterRpcClient == null) {
-        connectQueryMaster();
-      }
 
       LOG.info("Call executeQuery to :" +
           queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
 
-      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+      QueryExecutionRequest.Builder builder = QueryExecutionRequest.newBuilder();
       builder.setQueryId(queryId.getProto())
           .setQueryContext(queryInfo.getQueryContext().getProto())
           .setSession(session.getProto())
           .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
-          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
+          .setAllocation(allocationResource);
 
       CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
       queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture);
       callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
-      querySubmitted.set(true);
+      querySubmitted = true;
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
+      return true;
     } catch (Exception e) {
       LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e);
       catchException(e.getMessage(), e);
     } finally {
       writeLock.unlock();
     }
+    return false;
   }
 
   public void catchException(String message, Throwable e) {
@@ -222,10 +248,6 @@ public class QueryInProgress {
     }
   }
 
-  public boolean isStarted() {
-    return !stopped.get() && this.querySubmitted.get();
-  }
-
   public void heartbeat(QueryInfo queryInfo) {
     LOG.info("Received QueryMaster heartbeat:" + queryInfo);
 


Mime
View raw message