tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/2] TAJO-602: WorkerResourceManager should be broke down into 3 parts. (hyunsik)
Date Fri, 28 Feb 2014 06:55:25 GMT
Repository: incubator-tajo
Updated Branches:
  refs/heads/master cf27e0572 -> 18aaa68e5


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 222d355..440887a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -43,6 +43,7 @@ import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.master.querymaster.SubQueryState;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
@@ -223,7 +224,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
               .setMinDiskSlotPerContainer(requiredDiskSlots)
               .setMaxDiskSlotPerContainer(requiredDiskSlots)
-              .setExecutionBlockId(event.getExecutionBlockId().getProto())
+              .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
               .build();
 
       RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
@@ -276,15 +277,18 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           container.setId(containerId);
           container.setNodeId(nodeId);
 
+
           WorkerResource workerResource = new WorkerResource();
-          workerResource.setAllocatedHost(nodeId.getHost());
-          workerResource.setPeerRpcPort(nodeId.getPort());
-          workerResource.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
-          workerResource.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
           workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
           workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
 
-          container.setWorkerResource(workerResource);
+          Worker worker = new Worker(null, workerResource);
+          worker.setHostName(nodeId.getHost());
+          worker.setPeerRpcPort(nodeId.getPort());
+          worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
+          worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
+
+          container.setWorkerResource(worker);
 
           containers.add(container);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 2f763e3..395758f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.worker;
 
 import com.codahale.metrics.Gauge;
-import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,14 +40,9 @@ import org.apache.tajo.master.querymaster.QueryMaster;
 import org.apache.tajo.master.querymaster.QueryMasterManagerService;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.v2.DiskDeviceInfo;
-import org.apache.tajo.storage.v2.DiskMountInfo;
-import org.apache.tajo.storage.v2.DiskUtil;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
@@ -62,7 +56,6 @@ import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -92,6 +85,8 @@ public class TajoWorker extends CompositeService {
 
   private InetSocketAddress tajoMasterAddress;
 
+  private InetSocketAddress workerResourceTrackerAddr;
+
   private CatalogClient catalogClient;
 
   private WorkerContext workerContext;
@@ -106,7 +101,7 @@ public class TajoWorker extends CompositeService {
 
   private boolean taskRunnerMode;
 
-  private WorkerHeartbeatThread workerHeartbeatThread;
+  private WorkerHeartbeatService workerHeartbeatThread;
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
@@ -259,11 +254,13 @@ public class TajoWorker extends CompositeService {
       taskRunnerManager.startTask(cmdArgs);
     } else {
       tajoMasterAddress = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+      workerResourceTrackerAddr = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
       connectToCatalog();
-      workerHeartbeatThread = new WorkerHeartbeatThread();
-      workerHeartbeatThread.start();
     }
 
+    workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
+    workerHeartbeatThread.init(conf);
+    addIfService(workerHeartbeatThread);
   }
 
   private void initWorkerMetrics() {
@@ -316,11 +313,6 @@ public class TajoWorker extends CompositeService {
         LOG.error(e.getMessage(), e);
       }
     }
-    if(workerHeartbeatThread != null) {
-      synchronized (workerHeartbeatThread){
-        workerHeartbeatThread.notifyAll();
-      }
-    }
 
     if (catalogClient != null) {
       catalogClient.close();
@@ -379,6 +371,10 @@ public class TajoWorker extends CompositeService {
       return pullService;
     }
 
+    public int getHttpPort() {
+      return httpPort;
+    }
+
     public String getWorkerName() {
       if(queryMasterMode) {
         return getQueryMasterManagerService().getHostAndPort();
@@ -461,6 +457,10 @@ public class TajoWorker extends CompositeService {
       return tajoMasterAddress;
     }
 
+    public InetSocketAddress getResourceTrackerAddress() {
+      return workerResourceTrackerAddr;
+    }
+
     public int getPeerRpcPort() {
       return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
     }
@@ -490,187 +490,6 @@ public class TajoWorker extends CompositeService {
     }
   }
 
-  class WorkerHeartbeatThread extends Thread {
-    TajoMasterProtocol.ServerStatusProto.System systemInfo;
-    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
-        new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
-    float workerDiskSlots;
-    int workerMemoryMB;
-    List<DiskDeviceInfo> diskDeviceInfos;
-
-    public WorkerHeartbeatThread() {
-      int workerCpuCoreNum;
-
-      boolean dedicatedResource = systemConf.getBoolVar(ConfVars.WORKER_RESOURCE_DEDICATED);
-      int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
-
-      try {
-        diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-      }
-
-      if(dedicatedResource) {
-        float dedicatedMemoryRatio = systemConf.getFloatVar(ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
-        int totalMemory = getTotalMemoryMB();
-        workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
-        workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
-
-        if(diskDeviceInfos == null) {
-          workerDiskSlots = ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
-        } else {
-          workerDiskSlots = diskDeviceInfos.size();
-        }
-      } else {
-        workerMemoryMB = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
-        workerCpuCoreNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
-        workerDiskSlots = systemConf.getFloatVar(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
-      }
-
-      systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
-          .setAvailableProcessors(workerCpuCoreNum)
-          .setFreeMemoryMB(0)
-          .setMaxMemoryMB(0)
-          .setTotalMemoryMB(getTotalMemoryMB())
-          .build();
-    }
-
-    public void run() {
-      LOG.info("Worker Resource Heartbeat Thread start.");
-      int sendDiskInfoCount = 0;
-      int pullServerPort = 0;
-      if(pullService != null) {
-        long startTime = System.currentTimeMillis();
-        while(true) {
-          pullServerPort = pullService.getPort();
-          if(pullServerPort > 0) {
-            break;
-          }
-          //waiting while pull server init
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e) {
-          }
-          if(System.currentTimeMillis() - startTime > 30 * 1000) {
-            LOG.fatal("Too long push server init.");
-            System.exit(0);
-          }
-        }
-      }
-
-      String hostName = null;
-      int peerRpcPort = 0;
-      int queryMasterPort = 0;
-      int clientPort = 0;
-      if(workerContext.getTajoWorkerManagerService() != null) {
-        hostName = workerContext.getTajoWorkerManagerService().getBindAddr().getHostName();
-        peerRpcPort = workerContext.getTajoWorkerManagerService().getBindAddr().getPort();
-      }
-      if(workerContext.getQueryMasterManagerService() != null) {
-        hostName = workerContext.getQueryMasterManagerService().getBindAddr().getHostName();
-        queryMasterPort = workerContext.getQueryMasterManagerService().getBindAddr().getPort();
-      }
-      if(workerContext.getTajoWorkerClientService() != null) {
-        clientPort = workerContext.getTajoWorkerClientService().getBindAddr().getPort();
-      }
-
-      while(!stopped.get()) {
-        if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
-          getDiskUsageInfos();
-        }
-        TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
-          TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
-            .setMaxHeap(Runtime.getRuntime().maxMemory())
-            .setFreeHeap(Runtime.getRuntime().freeMemory())
-            .setTotalHeap(Runtime.getRuntime().totalMemory())
-            .build();
-
-        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
-            .addAllDisk(diskInfos)
-            .setRunningTaskNum(taskRunnerManager == null ? 1 : taskRunnerManager.getNumTasks())   //TODO
-            .setSystem(systemInfo)
-            .setDiskSlots(workerDiskSlots)
-            .setMemoryResourceMB(workerMemoryMB)
-            .setJvmHeap(jvmHeap)
-            .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(queryMasterMode))
-            .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(taskRunnerMode))
-            .build();
-
-        TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
-            .setTajoWorkerHost(hostName)
-            .setTajoQueryMasterPort(queryMasterPort)
-            .setPeerRpcPort(peerRpcPort)
-            .setTajoWorkerClientPort(clientPort)
-            .setTajoWorkerHttpPort(httpPort)
-            .setTajoWorkerPullServerPort(pullServerPort)
-            .setServerStatus(serverStatus)
-            .build();
-
-        NettyClientBase tmClient = null;
-        try {
-          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
-              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
-
-          tmClient = connPool.getConnection(tajoMasterAddress, TajoMasterProtocol.class, true);
-          TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-          masterClientService.heartbeat(callBack.getController(), heartbeatProto, callBack);
-
-          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
-          if(response != null) {
-            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
-            if(clusterResourceSummary.getNumWorkers() > 0) {
-              workerContext.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
-            }
-            workerContext.setClusterResource(clusterResourceSummary);
-          } else {
-            if(callBack.getController().failed()) {
-              throw new ServiceException(callBack.getController().errorText());
-            }
-          }
-        } catch (InterruptedException e) {
-          break;
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        } finally {
-          connPool.releaseConnection(tmClient);
-        }
-
-        try {
-          synchronized (workerHeartbeatThread){
-            wait(10 * 1000);
-          }
-        } catch (InterruptedException e) {
-          break;
-        }
-        sendDiskInfoCount++;
-
-        if(sendDiskInfoCount > 10) {
-          sendDiskInfoCount = 0;
-        }
-      }
-
-      LOG.info("Worker Resource Heartbeat Thread stopped.");
-    }
-
-    private void getDiskUsageInfos() {
-      diskInfos.clear();
-      for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
-        List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
-        if(mountInfos != null) {
-          for(DiskMountInfo eachMount: mountInfos) {
-            File eachFile = new File(eachMount.getMountPath());
-            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
-                .setAbsolutePath(eachFile.getAbsolutePath())
-                .setTotalSpace(eachFile.getTotalSpace())
-                .setFreeSpace(eachFile.getFreeSpace())
-                .setUsableSpace(eachFile.getUsableSpace())
-                .build());
-          }
-        }
-      }
-    }
-  }
-
   private class ShutdownHook implements Runnable {
     @Override
     public void run() {
@@ -752,14 +571,6 @@ public class TajoWorker extends CompositeService {
     }
   }
 
-  public static int getTotalMemoryMB() {
-    com.sun.management.OperatingSystemMXBean bean =
-        (com.sun.management.OperatingSystemMXBean)
-            java.lang.management.ManagementFactory.getOperatingSystemMXBean();
-    long max = bean.getTotalPhysicalMemorySize();
-    return ((int) (max / (1024 * 1024)));
-  }
-
   public static void main(String[] args) throws Exception {
     StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index 5e4088e..8fccbaf 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -61,17 +61,13 @@ message ServerStatusProto {
 
 message TajoHeartbeat {
   required string tajoWorkerHost = 1;
-  required int32 peerRpcPort = 2;
-  required int32 tajoQueryMasterPort = 3;
-  optional ServerStatusProto serverStatus = 4;
-  optional int32 tajoWorkerClientPort = 5;
-  optional QueryIdProto queryId = 6;
-  optional QueryState state = 7;
-  optional string statusMessage = 8;
-  optional int32 tajoWorkerPullServerPort = 9;
-  optional int32 tajoWorkerHttpPort = 10;
-  optional float queryProgress = 11;
-  optional int64 queryFinishTime = 12;
+  required int32 tajoQueryMasterPort = 2;
+  optional int32 tajoWorkerClientPort = 3;
+  optional QueryIdProto queryId = 4;
+  optional QueryState state = 5;
+  optional string statusMessage = 6;
+  optional float queryProgress = 7;
+  optional int64 queryFinishTime = 8;
 }
 
 message TajoHeartbeatResponse {
@@ -101,7 +97,7 @@ enum ResourceRequestPriority {
 }
 
 message WorkerResourceAllocationRequest {
-    required ExecutionBlockIdProto executionBlockId = 1;
+    required QueryIdProto queryId = 1;
     required ResourceRequestPriority resourceRequestPriority = 2;
 
     required int32 numContainers = 3;
@@ -137,14 +133,15 @@ message WorkerAllocatedResource {
     required string workerHost = 3;
     required int32 peerRpcPort = 4;
     required int32 queryMasterPort = 5;
-    required int32 workerPullServerPort = 6;
+    required int32 clientPort = 6;
+    required int32 workerPullServerPort = 7;
 
-    required int32 allocatedMemoryMB = 7;
-    required float allocatedDiskSlots = 8;
+    required int32 allocatedMemoryMB = 8;
+    required float allocatedDiskSlots = 9;
 }
 
 message WorkerResourceAllocationResponse {
-    required ExecutionBlockIdProto executionBlockId = 1;
+    required QueryIdProto queryId = 1;
     repeated WorkerAllocatedResource workerAllocatedResource = 2;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
index 0600145..88a9f04 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
@@ -19,46 +19,51 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.master.*" %>
-<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.master.TajoMaster" %>
+<%@ page import="org.apache.tajo.master.rm.Worker" %>
+<%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
+<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="java.util.*" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
-  Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+  Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
   List<String> wokerKeys = new ArrayList<String>(workers.keySet());
   Collections.sort(wokerKeys);
 
   int runningQueryMasterTasks = 0;
 
-  Set<WorkerResource> liveWorkers = new TreeSet<WorkerResource>();
-  Set<WorkerResource> deadWorkers = new TreeSet<WorkerResource>();
-  Set<WorkerResource> decommissionWorkers = new TreeSet<WorkerResource>();
+  Set<Worker> liveWorkers = new TreeSet<Worker>();
+  Set<Worker> deadWorkers = new TreeSet<Worker>();
+  Set<Worker> decommissionWorkers = new TreeSet<Worker>();
 
-  Set<WorkerResource> liveQueryMasters = new TreeSet<WorkerResource>();
-  Set<WorkerResource> deadQueryMasters = new TreeSet<WorkerResource>();
+  Set<Worker> liveQueryMasters = new TreeSet<Worker>();
+  Set<Worker> deadQueryMasters = new TreeSet<Worker>();
 
-  for(WorkerResource eachWorker: workers.values()) {
-    if(eachWorker.isQueryMasterMode()) {
-      if(eachWorker.getWorkerStatus() == WorkerStatus.LIVE) {
-        liveQueryMasters.add(eachWorker);
-        runningQueryMasterTasks += eachWorker.getNumQueryMasterTasks();
-      }
-      if(eachWorker.getWorkerStatus() == WorkerStatus.DEAD) {
-        deadQueryMasters.add(eachWorker);
-      }
+  for(Worker eachWorker: workers.values()) {
+    if(eachWorker.getResource().isQueryMasterMode()) {
+      liveQueryMasters.add(eachWorker);
+      runningQueryMasterTasks += eachWorker.getResource().getNumQueryMasterTasks();
+    }
+
+    if(eachWorker.getResource().isTaskRunnerMode()) {
+      liveWorkers.add(eachWorker);
     }
+  }
+
+  for (Worker inactiveWorker : master.getContext().getResourceManager().getInactiveWorkers().values()) {
+    WorkerState state = inactiveWorker.getState();
 
-    if(eachWorker.isTaskRunnerMode()) {
-      if(eachWorker.getWorkerStatus() == WorkerStatus.LIVE) {
-        liveWorkers.add(eachWorker);
-      } else if(eachWorker.getWorkerStatus() == WorkerStatus.DEAD) {
-        deadWorkers.add(eachWorker);
-      } else if(eachWorker.getWorkerStatus() == WorkerStatus.DECOMMISSION) {
-        decommissionWorkers.add(eachWorker);
+    if (state == WorkerState.LOST) {
+      if (inactiveWorker.getResource().isQueryMasterMode()) {
+        deadQueryMasters.add(inactiveWorker);
+      } else {
+        deadWorkers.add(inactiveWorker);
       }
+    } else if (state == WorkerState.DECOMMISSIONED) {
+      decommissionWorkers.add(inactiveWorker);
     }
   }
 
@@ -91,17 +96,18 @@
 
 <%
     int no = 1;
-    for(WorkerResource queryMaster: liveQueryMasters) {
-          String queryMasterHttp = "http://" + queryMaster.getAllocatedHost() + ":" + queryMaster.getHttpPort() + "/index.jsp";
+    for(Worker queryMaster: liveQueryMasters) {
+      WorkerResource resource = queryMaster.getResource();
+          String queryMasterHttp = "http://" + queryMaster.getHostName() + ":" + queryMaster.getHttpPort() + "/index.jsp";
 %>
     <tr>
       <td width='30' align='right'><%=no++%></td>
-      <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></a></td>
+      <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></a></td>
       <td width='100' align='center'><%=queryMaster.getClientPort()%></td>
-      <td width='200' align='right'><%=queryMaster.getNumQueryMasterTasks()%></td>
-      <td width='200' align='center'><%=queryMaster.getFreeHeap()/1024/1024%>/<%=queryMaster.getMaxHeap()/1024/1024%> MB</td>
-      <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeat(), System.currentTimeMillis())%></td>
-      <td width='100' align='center'><%=queryMaster.getWorkerStatus()%></td>
+      <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td>
+      <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+      <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+      <td width='100' align='center'><%=queryMaster.getState()%></td>
     </tr>
 <%
     } //end fo for
@@ -122,13 +128,14 @@
     <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Status</th></tr>
 <%
       int no = 1;
-      for(WorkerResource queryMaster: deadQueryMasters) {
+      for(Worker queryMaster: deadQueryMasters) {
+        WorkerResource resource = queryMaster.getResource();
 %>
     <tr>
       <td width='30' align='right'><%=no++%></td>
-      <td><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></td>
+      <td><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></td>
       <td><%=queryMaster.getClientPort()%></td>
-      <td align='center'><%=queryMaster.getWorkerStatus()%></td>
+      <td align='center'><%=queryMaster.getState()%></td>
     </tr>
 <%
       } //end fo for
@@ -153,19 +160,20 @@
     <tr><th>No</th><th>Worker</th><th>PullServer<br/>Port</th><th>Running Tasks</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
 <%
     int no = 1;
-    for(WorkerResource worker: liveWorkers) {
-          String workerHttp = "http://" + worker.getAllocatedHost() + ":" + worker.getHttpPort() + "/index.jsp";
+    for(Worker worker: liveWorkers) {
+      WorkerResource resource = worker.getResource();
+          String workerHttp = "http://" + worker.getHostName() + ":" + worker.getHttpPort() + "/index.jsp";
 %>
     <tr>
       <td width='30' align='right'><%=no++%></td>
-      <td><a href='<%=workerHttp%>'><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></a></td>
+      <td><a href='<%=workerHttp%>'><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></a></td>
       <td width='80' align='center'><%=worker.getPullServerPort()%></td>
-      <td width='100' align='right'><%=worker.getNumRunningTasks()%></td>
-      <td width='150' align='center'><%=worker.getUsedMemoryMB()%>/<%=worker.getMemoryMB()%></td>
-      <td width='100' align='center'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
-      <td width='100' align='center'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
-      <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeat(), System.currentTimeMillis())%></td>
-      <td width='100' align='center'><%=worker.getWorkerStatus()%></td>
+      <td width='100' align='right'><%=resource.getNumRunningTasks()%></td>
+      <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
+      <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
+      <td width='100' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+      <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+      <td width='100' align='center'><%=worker.getState()%></td>
     </tr>
 <%
     } //end fo for
@@ -191,16 +199,17 @@
     <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Memory Resource</th><th>Disk Resource</th></th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
 <%
       int no = 1;
-      for(WorkerResource worker: deadWorkers) {
+      for(Worker worker: deadWorkers) {
+        WorkerResource resource = worker.getResource();
 %>
     <tr>
       <td width='30' align='right'><%=no++%></td>
-      <td><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></td>
+      <td><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></td>
       <td width='150' align='center'><%=worker.getPullServerPort()%></td>
-      <td width='100' align='right'><%=worker.getUsedMemoryMB()%>/<%=worker.getMemoryMB()%></td>
-      <td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
-      <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
-      <td width='100' align='center'><%=worker.getWorkerStatus()%></td>
+      <td width='100' align='right'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
+      <td width='100' align='right'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
+      <td width='100' align='left'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+      <td width='100' align='center'><%=worker.getState()%></td>
     </tr>
 <%
       } //end fo for

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index f652ea5..ebd017d 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -19,21 +19,23 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.master.*" %>
-<%@ page import="org.apache.tajo.master.rm.*" %>
-<%@ page import="org.apache.tajo.catalog.*" %>
-<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
-<%@ page import="org.apache.tajo.util.NetUtils" %>
-<%@ page import="org.apache.hadoop.util.StringUtils" %>
 <%@ page import="org.apache.hadoop.fs.FileSystem" %>
 <%@ page import="org.apache.tajo.conf.TajoConf" %>
 <%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
+<%@ page import="org.apache.tajo.master.TajoMaster" %>
+<%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.rm.Worker" %>
+<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
+<%@ page import="org.apache.tajo.util.NetUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="java.util.Collection" %>
+<%@ page import="java.util.Date" %>
+<%@ page import="java.util.Map" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
-  Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+  Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
+  Map<String, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
 
   int numWorkers = 0;
   int numLiveWorkers = 0;
@@ -49,27 +51,29 @@
   TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary =
           master.getContext().getResourceManager().getClusterResourceSummary();
 
-  for(WorkerResource eachWorker: workers.values()) {
-    if(eachWorker.getWorkerStatus() == WorkerStatus.LIVE) {
-      if(eachWorker.isQueryMasterMode()) {
-        numQueryMasters++;
-        numLiveQueryMasters++;
-        runningQueryMasterTask += eachWorker.getNumQueryMasterTasks();
-      }
-      if(eachWorker.isTaskRunnerMode()) {
-        numWorkers++;
-        numLiveWorkers++;
-      }
-    } else if(eachWorker.getWorkerStatus() == WorkerStatus.DEAD) {
-      if(eachWorker.isQueryMasterMode()) {
+  for(Worker eachWorker: workers.values()) {
+    if(eachWorker.getResource().isQueryMasterMode()) {
+      numQueryMasters++;
+      numLiveQueryMasters++;
+      runningQueryMasterTask += eachWorker.getResource().getNumQueryMasterTasks();
+    }
+    if(eachWorker.getResource().isTaskRunnerMode()) {
+      numWorkers++;
+      numLiveWorkers++;
+    }
+  }
+
+  for (Worker eachWorker : inactiveWorkers.values()) {
+    if (eachWorker.getState() == WorkerState.LOST) {
+      if(eachWorker.getResource().isQueryMasterMode()) {
         numQueryMasters++;
         numDeadQueryMasters++;
       }
-      if(eachWorker.isTaskRunnerMode()) {
+      if(eachWorker.getResource().isTaskRunnerMode()) {
         numWorkers++;
         numDeadWorkers++;
       }
-    } else if(eachWorker.getWorkerStatus() == WorkerStatus.DECOMMISSION) {
+    } else if(eachWorker.getState() == WorkerState.DECOMMISSIONED) {
       numDecommissionWorkers++;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
index e3a356d..4dfc18a 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -19,13 +19,17 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
-<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.master.*" %>
-<%@ page import="org.apache.tajo.util.*" %>
+<%@ page import="org.apache.tajo.master.TajoMaster" %>
 <%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.rm.Worker" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.StringUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
+<%@ page import="java.util.Collection" %>
+<%@ page import="java.util.HashMap" %>
+<%@ page import="java.util.List" %>
+<%@ page import="java.util.Map" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -38,14 +42,14 @@
 
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-  Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+  Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
   Map<String, Integer> portMap = new HashMap<String, Integer>();
 
   Collection<String> queryMasters = master.getContext().getResourceManager().getQueryMasters();
   for(String eachQueryMasterKey: queryMasters) {
-    WorkerResource queryMaster = workers.get(eachQueryMasterKey);
+    Worker queryMaster = workers.get(eachQueryMasterKey);
     if(queryMaster != null) {
-      portMap.put(queryMaster.getAllocatedHost(), queryMaster.getHttpPort());
+      portMap.put(queryMaster.getHostName(), queryMaster.getHttpPort());
     }
   }
 %>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 9c96e0e..aa72b06 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -269,6 +269,7 @@ public class TajoTestingCluster {
     TajoConf c = getConfiguration();
     c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0");
     c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0");
+    c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
     c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
     c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
     c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
@@ -296,7 +297,7 @@ public class TajoTestingCluster {
 
     this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
         tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
-
+    this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
     this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
 
     if(standbyWorkerMode) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
deleted file mode 100644
index 9504927..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestTajoResourceManager.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol.*;
-import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestTajoResourceManager {
-  private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
-  private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
-
-  TajoConf tajoConf;
-  long queryIdTime = System.currentTimeMillis();
-  int numWorkers = 5;
-  float workerDiskSlots = 5.0f;
-  int workerMemoryMB = 512 * 10;
-  WorkerResourceAllocationResponse response;
-
-  private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception {
-    tajoConf = new org.apache.tajo.conf.TajoConf();
-
-    tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
-    tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
-
-    TajoWorkerResourceManager tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
-
-    for(int i = 0; i < numWorkers; i++) {
-      ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
-          .setAvailableProcessors(1)
-          .setFreeMemoryMB(workerMemoryMB)
-          .setMaxMemoryMB(workerMemoryMB)
-          .setTotalMemoryMB(workerMemoryMB)
-          .build();
-
-      ServerStatusProto.JvmHeap jvmHeap = ServerStatusProto.JvmHeap.newBuilder()
-          .setFreeHeap(workerMemoryMB)
-          .setMaxHeap(workerMemoryMB)
-          .setTotalHeap(workerMemoryMB)
-          .build();
-
-      ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder()
-          .setAbsolutePath("/")
-          .setFreeSpace(0)
-          .setTotalSpace(0)
-          .setUsableSpace(0)
-          .build();
-
-      List<ServerStatusProto.Disk> disks = new ArrayList<ServerStatusProto.Disk>();
-
-      disks.add(disk);
-
-      ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
-          .setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE)
-          .setTaskRunnerMode(BOOL_TRUE)
-          .setDiskSlots(workerDiskSlots)
-          .setMemoryResourceMB(workerMemoryMB)
-          .setJvmHeap(jvmHeap)
-          .setSystem(system)
-          .addAllDisk(disks)
-          .setRunningTaskNum(0)
-          .build();
-
-      TajoHeartbeat tajoHeartbeat = TajoHeartbeat.newBuilder()
-          .setTajoWorkerHost("host" + (i + 1))
-          .setQueryId(QueryIdFactory.newQueryId(queryIdTime, i + 1).getProto())
-          .setTajoQueryMasterPort(21000)
-          .setPeerRpcPort(29000 + i)
-          .setTajoWorkerHttpPort(28080 + i)
-          .setServerStatus(serverStatus)
-          .build();
-
-      tajoWorkerResourceManager.workerHeartbeat(tajoHeartbeat);
-    }
-
-    return tajoWorkerResourceManager;
-  }
-
-
-  @Test
-  public void testHeartbeat() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
-    assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
-      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
-    }
-    tajoWorkerResourceManager.stop();
-  }
-
-  @Test
-  public void testMemoryResource() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
-
-    final int minMemory = 256;
-    final int maxMemory = 512;
-    float diskSlots = 1.0f;
-
-    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1);
-    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId);
-
-    WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
-        .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
-        .setNumContainers(60)
-        .setExecutionBlockId(ebId.getProto())
-        .setMaxDiskSlotPerContainer(diskSlots)
-        .setMinDiskSlotPerContainer(diskSlots)
-        .setMinMemoryMBPerContainer(minMemory)
-        .setMaxMemoryMBPerContainer(maxMemory)
-        .build();
-
-    final Object monitor = new Object();
-    final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
-
-
-    RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
-
-      @Override
-      public void run(WorkerResourceAllocationResponse response) {
-        TestTajoResourceManager.this.response = response;
-        synchronized(monitor) {
-          monitor.notifyAll();
-        }
-      }
-    };
-
-    tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
-    synchronized(monitor) {
-      monitor.wait();
-    }
-
-
-    // assert after callback
-    int totalUsedMemory = 0;
-    int totalUsedDisks = 0;
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      assertEquals(0, eachWorker.getAvailableMemoryMB());
-      assertEquals(0, eachWorker.getAvailableDiskSlots(), 0);
-      assertEquals(5.0f, eachWorker.getUsedDiskSlots(), 0);
-
-      totalUsedMemory += eachWorker.getUsedMemoryMB();
-      totalUsedDisks += eachWorker.getUsedDiskSlots();
-    }
-
-    assertEquals(workerMemoryMB * numWorkers, totalUsedMemory);
-    assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
-
-    assertEquals(numWorkers * 10, response.getWorkerAllocatedResourceList().size());
-
-    for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
-      assertTrue(
-          eachResource.getAllocatedMemoryMB() >= minMemory &&  eachResource.getAllocatedMemoryMB() <= maxMemory);
-      containerIds.add(eachResource.getContainerId());
-    }
-
-    for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
-      tajoWorkerResourceManager.releaseWorkerResource(ebId, eachContainerId);
-    }
-
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
-      assertEquals(0, eachWorker.getUsedMemoryMB());
-
-      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
-      assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
-    }
-
-    tajoWorkerResourceManager.stop();
-  }
-
-  @Test
-  public void testMemoryNotCommensurable() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
-
-    final int minMemory = 200;
-    final int maxMemory = 500;
-    float diskSlots = 1.0f;
-
-    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2);
-    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId);
-
-    int requiredContainers = 60;
-
-    int numAllocatedContainers = 0;
-
-    int loopCount = 0;
-    while(true) {
-      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
-          .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
-          .setNumContainers(requiredContainers - numAllocatedContainers)
-          .setExecutionBlockId(ebId.getProto())
-          .setMaxDiskSlotPerContainer(diskSlots)
-          .setMinDiskSlotPerContainer(diskSlots)
-          .setMinMemoryMBPerContainer(minMemory)
-          .setMaxMemoryMBPerContainer(maxMemory)
-          .build();
-
-      final Object monitor = new Object();
-
-      RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
-        @Override
-        public void run(WorkerResourceAllocationResponse response) {
-          TestTajoResourceManager.this.response = response;
-          synchronized(monitor) {
-            monitor.notifyAll();
-          }
-        }
-      };
-
-      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
-      synchronized(monitor) {
-        monitor.wait();
-      }
-
-      numAllocatedContainers += TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size();
-
-      //release resource
-      for(WorkerAllocatedResource eachResource: TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) {
-        assertTrue(
-            eachResource.getAllocatedMemoryMB() >= minMemory &&  eachResource.getAllocatedMemoryMB() <= maxMemory);
-        tajoWorkerResourceManager.releaseWorkerResource(ebId, eachResource.getContainerId());
-      }
-
-      for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-        assertEquals(0, eachWorker.getUsedMemoryMB());
-        assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
-
-        assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
-        assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
-      }
-
-      loopCount++;
-
-      if(loopCount == 2) {
-        assertEquals(requiredContainers, numAllocatedContainers);
-        break;
-      }
-    }
-
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      assertEquals(0, eachWorker.getUsedMemoryMB());
-      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
-
-      assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
-      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
-    }
-
-    tajoWorkerResourceManager.stop();
-  }
-
-  @Test
-  public void testDiskResource() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(false);
-    final float minDiskSlots = 1.0f;
-    final float maxDiskSlots = 2.0f;
-    int memoryMB = 256;
-
-    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
-    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId);
-
-    WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
-        .setResourceRequestPriority(ResourceRequestPriority.DISK)
-        .setNumContainers(60)
-        .setExecutionBlockId(ebId.getProto())
-        .setMaxDiskSlotPerContainer(maxDiskSlots)
-        .setMinDiskSlotPerContainer(minDiskSlots)
-        .setMinMemoryMBPerContainer(memoryMB)
-        .setMaxMemoryMBPerContainer(memoryMB)
-        .build();
-
-    final Object monitor = new Object();
-    final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
-
-
-    RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
-
-      @Override
-      public void run(WorkerResourceAllocationResponse response) {
-        TestTajoResourceManager.this.response = response;
-        synchronized(monitor) {
-          monitor.notifyAll();
-        }
-      }
-    };
-
-    tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
-    synchronized(monitor) {
-      monitor.wait();
-    }
-    for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
-      assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(),
-          eachResource.getAllocatedDiskSlots() >= minDiskSlots &&
-              eachResource.getAllocatedDiskSlots() <= maxDiskSlots);
-      containerIds.add(eachResource.getContainerId());
-    }
-
-    // assert after callback
-    int totalUsedDisks = 0;
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
-      assertEquals(0, eachWorker.getAvailableDiskSlots(), 0);
-      assertEquals(5.0f, eachWorker.getUsedDiskSlots(), 0);
-      assertEquals(256 * 3, eachWorker.getUsedMemoryMB());
-
-      totalUsedDisks += eachWorker.getUsedDiskSlots();
-    }
-
-    assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
-
-    assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
-
-    for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
-      tajoWorkerResourceManager.releaseWorkerResource(ebId, eachContainerId);
-    }
-
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      assertEquals(workerMemoryMB, eachWorker.getAvailableMemoryMB());
-      assertEquals(0, eachWorker.getUsedMemoryMB());
-
-      assertEquals(workerDiskSlots, eachWorker.getAvailableDiskSlots(), 0);
-      assertEquals(0.0f, eachWorker.getUsedDiskSlots(), 0);
-    }
-
-    tajoWorkerResourceManager.stop();
-  }
-
-  @Test
-  public void testQueryMasterResource() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = initResourceManager(true);
-
-    int qmDefaultMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
-    float qmDefaultDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
-
-    QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 4);
-
-    tajoWorkerResourceManager.allocateQueryMaster(queryId);
-
-    // assert after callback
-    int totalUsedMemory = 0;
-    int totalUsedDisks = 0;
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      if(eachWorker.getUsedMemoryMB() > 0) {
-        //worker which allocated querymaster
-        assertEquals(qmDefaultMemoryMB, eachWorker.getUsedMemoryMB());
-        assertEquals(qmDefaultDiskSlots, eachWorker.getUsedDiskSlots(), 0);
-      } else {
-        assertEquals(0, eachWorker.getUsedMemoryMB());
-        assertEquals(0, eachWorker.getUsedDiskSlots(), 0);
-      }
-
-      totalUsedMemory += eachWorker.getUsedMemoryMB();
-      totalUsedDisks += eachWorker.getUsedDiskSlots();
-    }
-
-    assertEquals(qmDefaultMemoryMB, totalUsedMemory);
-    assertEquals(qmDefaultDiskSlots, totalUsedDisks, 0);
-
-    //release
-    tajoWorkerResourceManager.stopQueryMaster(queryId);
-    for(WorkerResource eachWorker: tajoWorkerResourceManager.getWorkers().values()) {
-      assertEquals(0, eachWorker.getUsedMemoryMB());
-      assertEquals(0, eachWorker.getUsedDiskSlots(), 0);
-      totalUsedMemory += eachWorker.getUsedMemoryMB();
-      totalUsedDisks += eachWorker.getUsedDiskSlots();
-    }
-
-    tajoWorkerResourceManager.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/18aaa68e/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 371f879..8f98d3a 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -123,8 +123,10 @@ public class NettyServerBase {
       bootstrap.releaseExternalResources();
     }
 
-    LOG.info("Rpc (" + serviceName + ") listened on "
-        + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+    if (bindAddress != null) {
+      LOG.info("Rpc (" + serviceName + ") listened on "
+          + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
+    }
   }
 
   private static String getNextDefaultServiceName() {


Mime
View raw message