tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [21/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)
Date Fri, 18 Apr 2014 09:19:43 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
new file mode 100644
index 0000000..15ac6b6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import static org.apache.tajo.ipc.TajoMasterProtocol.*;
+
+
+/**
+ * It manages all resources of tajo workers.
+ */
+public class TajoWorkerResourceManager extends CompositeService implements WorkerResourceManager {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(TajoWorkerResourceManager.class);
+
+  static AtomicInteger containerIdSeq = new AtomicInteger(0);
+
+  private TajoMaster.MasterContext masterContext;
+
+  private TajoRMContext rmContext;
+
+  private String queryIdSeed;
+
+  private WorkerResourceAllocationThread workerResourceAllocator;
+
+  /**
+   * Worker Liveliness monitor
+   */
+  private WorkerLivelinessMonitor workerLivelinessMonitor;
+
+  private BlockingQueue<WorkerResourceRequest> requestQueue;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private TajoConf systemConf;
+
+  private ConcurrentMap<ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps.newConcurrentMap();
+
+  /** It receives status messages from workers and their resources. */
+  private TajoResourceTracker resourceTracker;
+
+  public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
+    super(TajoWorkerResourceManager.class.getSimpleName());
+    this.masterContext = masterContext;
+  }
+
+  public TajoWorkerResourceManager(TajoConf systemConf) {
+    super(TajoWorkerResourceManager.class.getSimpleName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    this.systemConf = (TajoConf) conf;
+
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    addIfService(dispatcher);
+
+    rmContext = new TajoRMContext(dispatcher);
+
+    this.queryIdSeed = String.valueOf(System.currentTimeMillis());
+
+    requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
+
+    workerResourceAllocator = new WorkerResourceAllocationThread();
+    workerResourceAllocator.start();
+
+    this.workerLivelinessMonitor = new WorkerLivelinessMonitor(this.rmContext.getDispatcher());
+    addIfService(this.workerLivelinessMonitor);
+
+    // Register event handler for Workers
+    rmContext.getDispatcher().register(WorkerEventType.class, new WorkerEventDispatcher(rmContext));
+
+    resourceTracker = new TajoResourceTracker(rmContext, workerLivelinessMonitor);
+    addIfService(resourceTracker);
+
+    super.serviceInit(systemConf);
+  }
+
+  @InterfaceAudience.Private
+  public static final class WorkerEventDispatcher implements EventHandler<WorkerEvent> {
+
+    private final TajoRMContext rmContext;
+
+    public WorkerEventDispatcher(TajoRMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(WorkerEvent event) {
+      String workerId = event.getWorkerId();
+      Worker node = this.rmContext.getWorkers().get(workerId);
+      if (node != null) {
+        try {
+          node.handle(event);
+        } catch (Throwable t) {
+          LOG.error("Error in handling event type " + event.getType() + " for node " + workerId, t);
+        }
+      }
+    }
+  }
+
+  @Override
+  public Map<String, Worker> getWorkers() {
+    return ImmutableMap.copyOf(rmContext.getWorkers());
+  }
+
+  @Override
+  public Map<String, Worker> getInactiveWorkers() {
+    return ImmutableMap.copyOf(rmContext.getInactiveWorkers());
+  }
+
+  public Collection<String> getQueryMasters() {
+    return Collections.unmodifiableSet(rmContext.getQueryMasterWorker());
+  }
+
+  @Override
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+    return resourceTracker.getClusterResourceSummary();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    if(stopped.get()) {
+      return;
+    }
+    stopped.set(true);
+    if(workerResourceAllocator != null) {
+      workerResourceAllocator.interrupt();
+    }
+
+    super.serviceStop();
+  }
+
+  /**
+   *
+   * @return The prefix of queryId. It is generated when a TajoMaster starts up.
+   */
+  @Override
+  public String getSeedQueryId() throws IOException {
+    return queryIdSeed;
+  }
+
+  @VisibleForTesting
+  TajoResourceTracker getResourceTracker() {
+    return resourceTracker;
+  }
+
+  private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) {
+    float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar(
+        TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+    int queryMasterDefaultMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
+
+    WorkerResourceAllocationRequest.Builder builder = WorkerResourceAllocationRequest.newBuilder();
+    builder.setQueryId(queryId.getProto());
+    builder.setMaxMemoryMBPerContainer(queryMasterDefaultMemoryMB);
+    builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
+    builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
+    builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
+    builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY);
+    builder.setNumContainers(1);
+    return builder.build();
+  }
+
+  @Override
+  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) {
+    // Create a resource request for a query master
+    WorkerResourceAllocationRequest qmResourceRequest = createQMResourceRequest(queryInProgress.getQueryId());
+
+    // call future for async call
+    CallFuture<WorkerResourceAllocationResponse> callFuture = new CallFuture<WorkerResourceAllocationResponse>();
+    allocateWorkerResources(qmResourceRequest, callFuture);
+
+    // Wait for 3 seconds
+    WorkerResourceAllocationResponse response = null;
+    try {
+      response = callFuture.get(3, TimeUnit.SECONDS);
+    } catch (Throwable t) {
+      LOG.error(t);
+      return null;
+    }
+
+    if (response.getWorkerAllocatedResourceList().size() == 0) {
+      return null;
+    }
+
+    WorkerAllocatedResource resource = response.getWorkerAllocatedResource(0);
+    registerQueryMaster(queryInProgress.getQueryId(), resource.getContainerId());
+    return resource;
+  }
+
+  private void registerQueryMaster(QueryId queryId, ContainerIdProto containerId) {
+    rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId);
+  }
+
+  @Override
+  public void allocateWorkerResources(WorkerResourceAllocationRequest request,
+                                      RpcCallback<WorkerResourceAllocationResponse> callBack) {
+    try {
+      //TODO checking queue size
+      requestQueue.put(new WorkerResourceRequest(new QueryId(request.getQueryId()), false, request, callBack));
+    } catch (InterruptedException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  static class WorkerResourceRequest {
+    boolean queryMasterRequest;
+    QueryId queryId;
+    WorkerResourceAllocationRequest request;
+    RpcCallback<WorkerResourceAllocationResponse> callBack;
+    WorkerResourceRequest(
+        QueryId queryId,
+        boolean queryMasterRequest, WorkerResourceAllocationRequest request,
+        RpcCallback<WorkerResourceAllocationResponse> callBack) {
+      this.queryId = queryId;
+      this.queryMasterRequest = queryMasterRequest;
+      this.request = request;
+      this.callBack = callBack;
+    }
+  }
+
+  static class AllocatedWorkerResource {
+    Worker worker;
+    int allocatedMemoryMB;
+    float allocatedDiskSlots;
+  }
+
+  class WorkerResourceAllocationThread extends Thread {
+    @Override
+    public void run() {
+      LOG.info("WorkerResourceAllocationThread start");
+      while(!stopped.get()) {
+        try {
+          WorkerResourceRequest resourceRequest = requestQueue.take();
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("allocateWorkerResources:" +
+                (new QueryId(resourceRequest.request.getQueryId())) +
+                ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+                "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+                ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+                ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+                "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
+                ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+                ", liveWorkers=" + rmContext.getWorkers().size());
+          }
+
+          List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
+
+          if(allocatedWorkerResources.size() > 0) {
+            List<WorkerAllocatedResource> allocatedResources =
+                new ArrayList<WorkerAllocatedResource>();
+
+            for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
+              NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
+                  allocatedResource.worker.getPeerRpcPort());
+
+              TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+              containerId.setApplicationAttemptId(
+                  ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+              containerId.setId(containerIdSeq.incrementAndGet());
+
+              ContainerIdProto containerIdProto = containerId.getProto();
+              allocatedResources.add(WorkerAllocatedResource.newBuilder()
+                  .setContainerId(containerIdProto)
+                  .setNodeId(nodeId.toString())
+                  .setWorkerHost(allocatedResource.worker.getHostName())
+                  .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
+                  .setClientPort(allocatedResource.worker.getClientPort())
+                  .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
+                  .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
+                  .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
+                  .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
+                  .build());
+
+
+              allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
+            }
+
+            resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder()
+                .setQueryId(resourceRequest.request.getQueryId())
+                .addAllWorkerAllocatedResource(allocatedResources)
+                .build()
+            );
+
+          } else {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("=========================================");
+              LOG.debug("Available Workers");
+              for(String liveWorker: rmContext.getWorkers().keySet()) {
+                LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
+              }
+              LOG.debug("=========================================");
+            }
+            requestQueue.put(resourceRequest);
+            Thread.sleep(100);
+          }
+        } catch(InterruptedException ie) {
+          LOG.error(ie);
+        }
+      }
+    }
+  }
+
+  private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) {
+    List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();
+
+    int allocatedResources = 0;
+
+    TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+        = resourceRequest.request.getResourceRequestPriority();
+
+    if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+      synchronized(rmContext) {
+        List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+        Collections.shuffle(randomWorkers);
+
+        int numContainers = resourceRequest.request.getNumContainers();
+        int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
+        int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
+        float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
+            resourceRequest.request.getMinDiskSlotPerContainer());
+
+        int liveWorkerSize = randomWorkers.size();
+        Set<String> insufficientWorkers = new HashSet<String>();
+        boolean stop = false;
+        boolean checkMax = true;
+        while(!stop) {
+          if(allocatedResources >= numContainers) {
+            break;
+          }
+
+          if(insufficientWorkers.size() >= liveWorkerSize) {
+            if(!checkMax) {
+              break;
+            }
+            insufficientWorkers.clear();
+            checkMax = false;
+          }
+          int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
+
+          for(String eachWorker: randomWorkers) {
+            if(allocatedResources >= numContainers) {
+              stop = true;
+              break;
+            }
+
+            if(insufficientWorkers.size() >= liveWorkerSize) {
+              break;
+            }
+
+            Worker worker = rmContext.getWorkers().get(eachWorker);
+            WorkerResource workerResource = worker.getResource();
+            if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
+              int workerMemory;
+              if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
+                workerMemory = maxMemoryMB;
+              } else {
+                workerMemory = workerResource.getAvailableMemoryMB();
+              }
+              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+              allocatedWorkerResource.worker = worker;
+              allocatedWorkerResource.allocatedMemoryMB = workerMemory;
+              if(workerResource.getAvailableDiskSlots() >= diskSlot) {
+                allocatedWorkerResource.allocatedDiskSlots = diskSlot;
+              } else {
+                allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
+              }
+
+              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+                  allocatedWorkerResource.allocatedMemoryMB);
+
+              selectedWorkers.add(allocatedWorkerResource);
+
+              allocatedResources++;
+            } else {
+              insufficientWorkers.add(eachWorker);
+            }
+          }
+        }
+      }
+    } else {
+      synchronized(rmContext) {
+        List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+        Collections.shuffle(randomWorkers);
+
+        int numContainers = resourceRequest.request.getNumContainers();
+        float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
+        float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
+        int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
+            resourceRequest.request.getMinMemoryMBPerContainer());
+
+        int liveWorkerSize = randomWorkers.size();
+        Set<String> insufficientWorkers = new HashSet<String>();
+        boolean stop = false;
+        boolean checkMax = true;
+        while(!stop) {
+          if(allocatedResources >= numContainers) {
+            break;
+          }
+
+          if(insufficientWorkers.size() >= liveWorkerSize) {
+            if(!checkMax) {
+              break;
+            }
+            insufficientWorkers.clear();
+            checkMax = false;
+          }
+          float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
+
+          for(String eachWorker: randomWorkers) {
+            if(allocatedResources >= numContainers) {
+              stop = true;
+              break;
+            }
+
+            if(insufficientWorkers.size() >= liveWorkerSize) {
+              break;
+            }
+
+            Worker worker = rmContext.getWorkers().get(eachWorker);
+            WorkerResource workerResource = worker.getResource();
+            if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
+              float workerDiskSlots;
+              if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
+                workerDiskSlots = maxDiskSlots;
+              } else {
+                workerDiskSlots = workerResource.getAvailableDiskSlots();
+              }
+              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+              allocatedWorkerResource.worker = worker;
+              allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
+
+              if(workerResource.getAvailableMemoryMB() >= memoryMB) {
+                allocatedWorkerResource.allocatedMemoryMB = memoryMB;
+              } else {
+                allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
+              }
+              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+                  allocatedWorkerResource.allocatedMemoryMB);
+
+              selectedWorkers.add(allocatedWorkerResource);
+
+              allocatedResources++;
+            } else {
+              insufficientWorkers.add(eachWorker);
+            }
+          }
+        }
+      }
+    }
+    return selectedWorkers;
+  }
+
+  /**
+   * Release allocated resource.
+   *
+   * @param containerId ContainerIdProto to be released
+   */
+  @Override
+  public void releaseWorkerResource(ContainerIdProto containerId) {
+    AllocatedWorkerResource allocated = allocatedResourceMap.get(containerId);
+    if(allocated != null) {
+      LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB);
+      allocated.worker.getResource().releaseResource( allocated.allocatedDiskSlots, allocated.allocatedMemoryMB);
+    } else {
+      LOG.warn("No AllocatedWorkerResource data for [" + containerId + "]");
+      return;
+    }
+  }
+
+  @Override
+  public boolean isQueryMasterStopped(QueryId queryId) {
+    return !rmContext.getQueryMasterContainer().containsKey(queryId);
+  }
+
+  @Override
+  public void stopQueryMaster(QueryId queryId) {
+    WorkerResource resource = null;
+    if(!rmContext.getQueryMasterContainer().containsKey(queryId)) {
+      LOG.warn("No QueryMaster resource info for " + queryId);
+      return;
+    } else {
+      ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
+      releaseWorkerResource(containerId);
+      LOG.info(String.format("Released QueryMaster (%s) resource:" + resource, queryId.toString()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
new file mode 100644
index 0000000..0d6b5ee
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+
+import java.util.EnumSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * It contains resource and various information for a worker.
+ */
+public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(Worker.class);
+
+  private final ReentrantReadWriteLock.ReadLock readLock;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+
+  /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
+  private final TajoRMContext rmContext;
+
+  /** Hostname */
+  private String hostName;
+  /** QueryMaster rpc port */
+  private int qmRpcPort;
+  /** Peer rpc port */
+  private int peerRpcPort;
+  /** http info port */
+  private int httpInfoPort;
+  /** the port of QueryMaster client rpc which provides an client API */
+  private int qmClientPort;
+  /** pull server port */
+  private int pullServerPort;
+  /** last heartbeat time */
+  private long lastHeartbeatTime;
+
+  /** Resource capability */
+  private WorkerResource resource;
+
+  private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
+  private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
+
+  private static final StateMachineFactory<Worker,
+      WorkerState,
+      WorkerEventType,
+      WorkerEvent> stateMachineFactory
+      = new StateMachineFactory<Worker,
+      WorkerState,
+      WorkerEventType,
+      WorkerEvent>(WorkerState.NEW)
+
+      // Transition from NEW
+      .addTransition(WorkerState.NEW, WorkerState.RUNNING,
+          WorkerEventType.STARTED,
+          new AddNodeTransition())
+
+      // Transition from RUNNING
+      .addTransition(WorkerState.RUNNING, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+          WorkerEventType.STATE_UPDATE,
+          STATUS_UPDATE_TRANSITION)
+      .addTransition(WorkerState.RUNNING, WorkerState.LOST,
+          WorkerEventType.EXPIRE,
+          new DeactivateNodeTransition(WorkerState.LOST))
+      .addTransition(WorkerState.RUNNING, WorkerState.RUNNING,
+          WorkerEventType.RECONNECTED,
+          RECONNECT_NODE_TRANSITION)
+
+      // Transitions from UNHEALTHY state
+      .addTransition(WorkerState.UNHEALTHY, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+          WorkerEventType.STATE_UPDATE,
+          STATUS_UPDATE_TRANSITION)
+      .addTransition(WorkerState.UNHEALTHY, WorkerState.LOST,
+          WorkerEventType.EXPIRE,
+          new DeactivateNodeTransition(WorkerState.LOST))
+      .addTransition(WorkerState.UNHEALTHY, WorkerState.UNHEALTHY,
+          WorkerEventType.RECONNECTED,
+          RECONNECT_NODE_TRANSITION);
+
+  private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine =
+      stateMachineFactory.make(this, WorkerState.NEW);
+
+  public Worker(TajoRMContext rmContext, WorkerResource resource) {
+    this.rmContext = rmContext;
+
+    this.lastHeartbeatTime = System.currentTimeMillis();
+    this.resource = resource;
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
+  }
+
+  public String getWorkerId() {
+    return hostName + ":" + qmRpcPort + ":" + peerRpcPort;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String allocatedHost) {
+    this.hostName = allocatedHost;
+  }
+
+  public int getPeerRpcPort() {
+    return peerRpcPort;
+  }
+
+  public void setPeerRpcPort(int peerRpcPort) {
+    this.peerRpcPort = peerRpcPort;
+  }
+
+  public int getQueryMasterPort() {
+    return qmRpcPort;
+  }
+
+  public void setQueryMasterPort(int queryMasterPort) {
+    this.qmRpcPort = queryMasterPort;
+  }
+
+  public int getClientPort() {
+    return qmClientPort;
+  }
+
+  public void setClientPort(int clientPort) {
+    this.qmClientPort = clientPort;
+  }
+
+  public int getPullServerPort() {
+    return pullServerPort;
+  }
+
+  public void setPullServerPort(int pullServerPort) {
+    this.pullServerPort = pullServerPort;
+  }
+
+  public int getHttpPort() {
+    return httpInfoPort;
+  }
+
+  public void setHttpPort(int port) {
+    this.httpInfoPort = port;
+  }
+
+  public void setLastHeartbeatTime(long lastheartbeatReportTime) {
+    this.writeLock.lock();
+
+    try {
+      this.lastHeartbeatTime = lastheartbeatReportTime;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  public long getLastHeartbeatTime() {
+    this.readLock.lock();
+
+    try {
+      return this.lastHeartbeatTime;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   *
+   * @return the current state of worker
+   */
+  public WorkerState getState() {
+    this.readLock.lock();
+
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   *
+   * @return the current resource capability of worker
+   */
+  public WorkerResource getResource() {
+    return this.resource;
+  }
+
+  @Override
+  public int compareTo(Worker o) {
+    if(o == null) {
+      return 1;
+    }
+    return getWorkerId().compareTo(o.getWorkerId());
+  }
+
+  public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+    @Override
+    public void transition(Worker worker, WorkerEvent workerEvent) {
+
+      if(worker.getResource().isQueryMasterMode()) {
+        worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId());
+      }
+      LOG.info("Worker with " + worker.getResource() + " is joined to Tajo cluster");
+    }
+  }
+
+  public static class StatusUpdateTransition implements
+      MultipleArcTransition<Worker, WorkerEvent, WorkerState> {
+
+    @Override
+    public WorkerState transition(Worker worker, WorkerEvent event) {
+      WorkerStatusEvent statusEvent = (WorkerStatusEvent) event;
+
+      // TODO - the synchronization scope using rmContext is too coarsen.
+      synchronized (worker.rmContext) {
+        worker.setLastHeartbeatTime(System.currentTimeMillis());
+        worker.getResource().setNumRunningTasks(statusEvent.getRunningTaskNum());
+        worker.getResource().setMaxHeap(statusEvent.maxHeap());
+        worker.getResource().setFreeHeap(statusEvent.getFreeHeap());
+        worker.getResource().setTotalHeap(statusEvent.getTotalHeap());
+      }
+
+      return WorkerState.RUNNING;
+    }
+  }
+
+  public static class DeactivateNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+    private final WorkerState finalState;
+
+    public DeactivateNodeTransition(WorkerState finalState) {
+      this.finalState = finalState;
+    }
+
+    @Override
+    public void transition(Worker worker, WorkerEvent workerEvent) {
+
+      worker.rmContext.getWorkers().remove(worker.getWorkerId());
+      LOG.info("Deactivating Node " + worker.getWorkerId() + " as it is now " + finalState);
+      worker.rmContext.getInactiveWorkers().putIfAbsent(worker.getWorkerId(), worker);
+    }
+  }
+
+  public static class ReconnectNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+
+    @Override
+    public void transition(Worker worker, WorkerEvent workerEvent) {
+      WorkerReconnectEvent castedEvent = (WorkerReconnectEvent) workerEvent;
+
+      Worker newWorker = castedEvent.getWorker();
+      worker.rmContext.getWorkers().put(castedEvent.getWorkerId(), newWorker);
+      worker.rmContext.getDispatcher().getEventHandler().handle(
+          new WorkerEvent(worker.getWorkerId(), WorkerEventType.STARTED));
+    }
+  }
+
+  @Override
+  public void handle(WorkerEvent event) {
+    LOG.debug("Processing " + event.getWorkerId() + " of type " + event.getType());
+    try {
+      writeLock.lock();
+      WorkerState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Invalid event " + event.getType() + " on Worker  " + getWorkerId());
+      }
+      if (oldState != getState()) {
+        LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to " + getState());
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
new file mode 100644
index 0000000..389c3be
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * WorkerEvent describes all kinds of events which sent to {@link Worker}.
+ */
+public class WorkerEvent extends AbstractEvent<WorkerEventType> {
+  private final String workerId;
+
+  public WorkerEvent(String workerId, WorkerEventType workerEventType) {
+    super(workerEventType);
+    this.workerId = workerId;
+  }
+
+  public String getWorkerId() {
+    return workerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
new file mode 100644
index 0000000..0c97654
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+public enum WorkerEventType {
+
+  /** Source : {@link TajoResourceTracker}, Destination: {@link Worker} */
+  STARTED,
+  STATE_UPDATE,
+  RECONNECTED,
+
+  /** Source : {@link WorkerLivelinessMonitor}, Destination: {@link Worker} */
+  EXPIRE
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
new file mode 100644
index 0000000..e3524d6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+/**
+ * It periodically checks the latest heartbeat time of {@link Worker}.
+ * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}.
+ */
+public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+  private EventHandler dispatcher;
+
+  public WorkerLivelinessMonitor(Dispatcher d) {
+    super(WorkerLivelinessMonitor.class.getSimpleName(), new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf systemConf = (TajoConf) conf;
+    // milliseconds
+    int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT);
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(expireIntvl/3);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void expire(String id) {
+    dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
new file mode 100644
index 0000000..46f286d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and it's destination is {@link Worker}.
+ * This event occurs only when an inactive worker sends a ping again.
+ */
+public class WorkerReconnectEvent extends WorkerEvent {
+  private final Worker worker;
+  public WorkerReconnectEvent(String workerId, Worker worker) {
+    super(workerId, WorkerEventType.RECONNECTED);
+    this.worker = worker;
+  }
+
+  public Worker getWorker() {
+    return worker;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
new file mode 100644
index 0000000..bfe186c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Describe current resources of a worker.
+ *
+ * It includes various resource capability of a worker as follows:
+ * <ul>
+ *   <li>used/total disk slots</li>
+ *   <li>used/total core slots</li>
+ *   <li>used/total memory</li>
+ *   <li>the number of running tasks</li>
+ * </ul>
+ */
+public class WorkerResource {
+  private static final Log LOG = LogFactory.getLog(WorkerResource.class);
+
+  private float diskSlots;
+  private int cpuCoreSlots;
+  private int memoryMB;
+
+  private float usedDiskSlots;
+  private int usedMemoryMB;
+  private int usedCpuCoreSlots;
+
+  private long maxHeap;
+  private long freeHeap;
+  private long totalHeap;
+
+  private int numRunningTasks;
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Lock rlock = lock.readLock();
+  private final Lock wlock = lock.writeLock();
+
+  private boolean queryMasterMode;
+
+  private boolean taskRunnerMode;
+
+  private AtomicInteger numQueryMasterTasks = new AtomicInteger(0);
+
+  public float getDiskSlots() {
+    return diskSlots;
+  }
+
+  public void setDiskSlots(float diskSlots) {
+    this.diskSlots = diskSlots;
+  }
+
+  public int getCpuCoreSlots() {
+    return cpuCoreSlots;
+  }
+
+  public void setCpuCoreSlots(int cpuCoreSlots) {
+    this.cpuCoreSlots = cpuCoreSlots;
+  }
+
+  public int getMemoryMB() {
+    try {
+      rlock.lock();
+      return memoryMB;
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  public void setMemoryMB(int memoryMB) {
+    try {
+      wlock.lock();
+      this.memoryMB = memoryMB;
+    } finally {
+      wlock.unlock();
+    }
+  }
+
+  public float getAvailableDiskSlots() {
+    return diskSlots - usedDiskSlots;
+  }
+
+  public int getAvailableMemoryMB() {
+    return memoryMB - usedMemoryMB;
+  }
+
+  public int getAvailableCpuCoreSlots() {
+    return cpuCoreSlots - usedCpuCoreSlots;
+  }
+
+  @Override
+  public String toString() {
+    return "slots=m:" + memoryMB + ",d:" + diskSlots +
+        ",c:" + cpuCoreSlots + ", used=m:" + usedMemoryMB + ",d:" + usedDiskSlots + ",c:" + usedCpuCoreSlots;
+  }
+
+  public int getUsedMemoryMB() {
+    try {
+      rlock.lock();
+      return usedMemoryMB;
+    } finally {
+      rlock.unlock();
+    }
+  }
+
+  public void setUsedMemoryMB(int usedMemoryMB) {
+    try {
+      wlock.lock();
+      this.usedMemoryMB = usedMemoryMB;
+    } finally {
+      wlock.unlock();
+    }
+  }
+
+  public int getUsedCpuCoreSlots() {
+    return usedCpuCoreSlots;
+  }
+
+  public void setUsedCpuCoreSlots(int usedCpuCoreSlots) {
+    this.usedCpuCoreSlots = usedCpuCoreSlots;
+  }
+
+  public float getUsedDiskSlots() {
+    return usedDiskSlots;
+  }
+
+  public boolean isQueryMasterMode() {
+    return queryMasterMode;
+  }
+
+  public void setQueryMasterMode(boolean queryMasterMode) {
+    this.queryMasterMode = queryMasterMode;
+  }
+
+  public boolean isTaskRunnerMode() {
+    return taskRunnerMode;
+  }
+
+  public void setTaskRunnerMode(boolean taskRunnerMode) {
+    this.taskRunnerMode = taskRunnerMode;
+  }
+
+  public void releaseResource(float diskSlots, int memoryMB) {
+    try {
+      wlock.lock();
+      usedMemoryMB = usedMemoryMB - memoryMB;
+      usedDiskSlots -= diskSlots;
+      if(usedMemoryMB < 0) {
+        LOG.warn("Used memory can't be a minus: " + usedMemoryMB);
+        usedMemoryMB = 0;
+      }
+      if(usedDiskSlots < 0) {
+        LOG.warn("Used disk slot can't be a minus: " + usedDiskSlots);
+        usedDiskSlots = 0;
+      }
+    } finally {
+      wlock.unlock();
+    }
+  }
+
+  public void allocateResource(float diskSlots, int memoryMB) {
+    try {
+      wlock.lock();
+      usedMemoryMB += memoryMB;
+      usedDiskSlots += diskSlots;
+
+      if(usedMemoryMB > this.memoryMB) {
+        usedMemoryMB = this.memoryMB;
+      }
+
+      if(usedDiskSlots > this.diskSlots) {
+        usedDiskSlots = this.diskSlots;
+      }
+    } finally {
+      wlock.unlock();
+    }
+  }
+
+  public long getMaxHeap() {
+    return maxHeap;
+  }
+
+  public void setMaxHeap(long maxHeap) {
+    this.maxHeap = maxHeap;
+  }
+
+  public long getFreeHeap() {
+    return freeHeap;
+  }
+
+  public void setFreeHeap(long freeHeap) {
+    this.freeHeap = freeHeap;
+  }
+
+  public long getTotalHeap() {
+    return totalHeap;
+  }
+
+  public void setTotalHeap(long totalHeap) {
+    this.totalHeap = totalHeap;
+  }
+
+  public int getNumRunningTasks() {
+    return numRunningTasks;
+  }
+
+  public void setNumRunningTasks(int numRunningTasks) {
+    this.numRunningTasks = numRunningTasks;
+  }
+
+  public int getNumQueryMasterTasks() {
+    return numQueryMasterTasks.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
new file mode 100644
index 0000000..54fe11c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
+
+/**
+ * An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers
+ * and release the allocated containers.
+ */
+public interface WorkerResourceManager extends Service {
+
+  /**
+   * Request a resource container for a QueryMaster.
+   *
+   * @param queryInProgress QueryInProgress
+   * @return A allocated container resource
+   */
+  @Deprecated
+  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
+
+  /**
+   * Request one or more resource containers. You can set the number of containers and resource capabilities, such as
+   * memory, CPU cores, and disk slots. This is an asynchronous call. You should use a callback to get allocated
+   * resource containers. Each container is identified {@link org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto}.
+   *
+   * @param request Request description
+   * @param rpcCallBack Callback function
+   */
+  public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request,
+      RpcCallback<WorkerResourceAllocationResponse> rpcCallBack);
+
+  /**
+   * Release a container
+   *
+   * @param containerId ContainerIdProto to be released
+   */
+  public void releaseWorkerResource(ContainerIdProto containerId);
+
+  public String getSeedQueryId() throws IOException;
+
+  /**
+   * Check if a query master is stopped.
+   *
+   * @param queryId QueryId to be checked
+   * @return True if QueryMaster is stopped
+   */
+  public boolean isQueryMasterStopped(QueryId queryId);
+
+  /**
+   * Stop a query master
+   *
+   * @param queryId QueryId to be stopped
+   */
+  public void stopQueryMaster(QueryId queryId);
+
+  /**
+   *
+   * @return a Map instance containing active workers
+   */
+  public Map<String, Worker> getWorkers();
+
+  /**
+   *
+   * @return a Map instance containing inactive workers
+   */
+  public Map<String, Worker> getInactiveWorkers();
+
+  public void stop();
+
+  /**
+   *
+   * @return The overall summary of cluster resources
+   */
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
+
+  /**
+   *
+   * @return WorkerIds on which QueryMasters are running
+   */
+  Collection<String> getQueryMasters();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
new file mode 100644
index 0000000..a941008
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * It presents the states of {@link Worker}.
+ */
+public enum WorkerState {
+  /** New worker */
+  NEW,
+
+  /** Running worker */
+  RUNNING,
+
+  /** Worker is unhealthy */
+  UNHEALTHY,
+
+  /** worker is out of service */
+  DECOMMISSIONED,
+
+  /** worker has not sent a heartbeat for some configured time threshold */
+  LOST;
+
+  @SuppressWarnings("unused")
+  public boolean isUnusable() {
+    return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
new file mode 100644
index 0000000..8c3d7c1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and its destination is
+ * {@link org.apache.tajo.master.rm.Worker.StatusUpdateTransition} of {@link Worker}.
+ */
+public class WorkerStatusEvent extends WorkerEvent {
+  private final int runningTaskNum;
+  private final long maxHeap;
+  private final long freeHeap;
+  private final long totalHeap;
+
+  public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
+    super(workerId, WorkerEventType.STATE_UPDATE);
+    this.runningTaskNum = runningTaskNum;
+    this.maxHeap = maxHeap;
+    this.freeHeap = freeHeap;
+    this.totalHeap = totalHeap;
+  }
+
+  public int getRunningTaskNum() {
+    return runningTaskNum;
+  }
+
+  public long maxHeap() {
+    return maxHeap;
+  }
+
+  public long getFreeHeap() {
+    return freeHeap;
+  }
+
+  public long getTotalHeap() {
+    return totalHeap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
new file mode 100644
index 0000000..b9e132b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.master.event.ContainerAllocationEvent;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.querymaster.Query;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.SubQueryState;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class YarnRMContainerAllocator extends AMRMClientImpl
+    implements EventHandler<ContainerAllocationEvent> {
+
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(YarnRMContainerAllocator.
+      class.getName());
+
+  private QueryMasterTask.QueryMasterTaskContext context;
+  private ApplicationAttemptId appAttemptId;
+  private final EventHandler eventHandler;
+
+  public YarnRMContainerAllocator(QueryMasterTask.QueryMasterTaskContext context) {
+    super();
+    this.context = context;
+    this.appAttemptId = ApplicationIdUtils.createApplicationAttemptId(context.getQueryId());
+    this.eventHandler = context.getDispatcher().getEventHandler();
+  }
+
+  public void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
+  public void start() {
+    super.start();
+
+    RegisterApplicationMasterResponse response;
+    try {
+      response = registerApplicationMaster("localhost", 10080, "http://localhost:1234");
+
+      // If the number of cluster nodes is ZERO, it waits for available nodes.
+      AllocateResponse allocateResponse = allocate(0.0f);
+      while(allocateResponse.getNumClusterNodes() < 1) {
+        try {
+          Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
+          LOG.info("Waiting for Available Cluster Nodes");
+          allocateResponse = allocate(0);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+      context.getQueryMasterContext().getWorkerContext().setNumClusterNodes(allocateResponse.getNumClusterNodes());
+    } catch (IOException e) {
+      LOG.error(e);
+    } catch (YarnException e) {
+      LOG.error(e);
+    }
+
+    startAllocatorThread();
+  }
+
+  protected Thread allocatorThread;
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private int rmPollInterval = 100;//millis
+
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            try {
+              heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM. ", e);
+              // TODO: for other exceptions
+              if(stopped.get()) {
+                break;
+              }
+            }
+            Thread.sleep(rmPollInterval);
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.warn("Allocated thread interrupted. Returning.");
+            }
+            break;
+          }
+        }
+        LOG.info("Allocated thread stopped");
+      }
+    });
+    allocatorThread.setName("YarnRMContainerAllocator");
+    allocatorThread.start();
+  }
+
+  public void stop() {
+    if(stopped.get()) {
+      return;
+    }
+    LOG.info("un-registering ApplicationMaster(QueryMaster):" + appAttemptId);
+    stopped.set(true);
+
+    try {
+      FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
+      Query query = context.getQuery();
+      if (query != null) {
+        TajoProtos.QueryState state = query.getState();
+        if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+          status = FinalApplicationStatus.SUCCEEDED;
+        } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
+          status = FinalApplicationStatus.FAILED;
+        } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
+          status = FinalApplicationStatus.FAILED;
+        }
+      }
+      unregisterApplicationMaster(status, "tajo query finished", null);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    allocatorThread.interrupt();
+    LOG.info("un-registered ApplicationMAster(QueryMaster) stopped:" + appAttemptId);
+
+    super.stop();
+  }
+
+  private final Map<Priority, ExecutionBlockId> subQueryMap =
+      new HashMap<Priority, ExecutionBlockId>();
+
+  private AtomicLong prevReportTime = new AtomicLong(0);
+  private int reportInterval = 5 * 1000; // second
+
+  public void heartbeat() throws Exception {
+    AllocateResponse allocateResponse = allocate(context.getProgress());
+
+    List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
+
+    long currentTime = System.currentTimeMillis();
+    if ((currentTime - prevReportTime.longValue()) >= reportInterval) {
+      LOG.debug("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
+      LOG.debug("Num of Allocated Containers: " + allocatedContainers.size());
+      LOG.info("Available Resource: " + allocateResponse.getAvailableResources());
+      prevReportTime.set(currentTime);
+    }
+
+    if (allocatedContainers.size() > 0) {
+      LOG.info("================================================================");
+      for (Container container : allocateResponse.getAllocatedContainers()) {
+        LOG.info("> Container Id: " + container.getId());
+        LOG.info("> Node Id: " + container.getNodeId());
+        LOG.info("> Resource (Mem): " + container.getResource().getMemory());
+        LOG.info("> Priority: " + container.getPriority());
+      }
+      LOG.info("================================================================");
+
+      Map<ExecutionBlockId, List<Container>> allocated = new HashMap<ExecutionBlockId, List<Container>>();
+      for (Container container : allocatedContainers) {
+        ExecutionBlockId executionBlockId = subQueryMap.get(container.getPriority());
+        SubQueryState state = context.getSubQuery(executionBlockId).getState();
+        if (!(SubQuery.isRunningState(state))) {
+          releaseAssignedContainer(container.getId());
+        } else {
+          if (allocated.containsKey(executionBlockId)) {
+            allocated.get(executionBlockId).add(container);
+          } else {
+            allocated.put(executionBlockId, Lists.newArrayList(container));
+          }
+        }
+      }
+
+      for (Entry<ExecutionBlockId, List<Container>> entry : allocated.entrySet()) {
+        eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
+      }
+    }
+  }
+
+  @Override
+  public void handle(ContainerAllocationEvent event) {
+
+    if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
+      LOG.info(event);
+      subQueryMap.put(event.getPriority(), event.getExecutionBlockId());
+      addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
+          event.getPriority()));
+
+    } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
+      LOG.info(event);
+    } else {
+      LOG.info(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
new file mode 100644
index 0000000..6d5268c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.YarnContainerProxy;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.util.ApplicationIdUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
+
+public class YarnTajoResourceManager extends AbstractService implements WorkerResourceManager {
+  private static final Log LOG = LogFactory.getLog(YarnTajoResourceManager.class);
+
+  private YarnClient yarnClient;
+  private ApplicationMasterProtocol rmClient;
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private Configuration conf;
+  private TajoMaster.MasterContext masterContext;
+
+  public YarnTajoResourceManager() {
+    super(YarnTajoResourceManager.class.getSimpleName());
+  }
+
+  public YarnTajoResourceManager(TajoMaster.MasterContext masterContext) {
+    super(YarnTajoResourceManager.class.getSimpleName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public Map<String, Worker> getWorkers() {
+    return new HashMap<String, Worker>();
+  }
+
+  @Override
+  public Map<String, Worker> getInactiveWorkers() {
+    return new HashMap<String, Worker>();
+  }
+
+  public Collection<String> getQueryMasters() {
+    return new ArrayList<String>();
+  }
+
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+        .setNumWorkers(0)
+        .setTotalCpuCoreSlots(0)
+        .setTotalDiskSlots(0)
+        .setTotalMemoryMB(0)
+        .setTotalAvailableCpuCoreSlots(0)
+        .setTotalAvailableDiskSlots(0)
+        .setTotalAvailableMemoryMB(0)
+        .build();
+  }
+
+  @Override
+  public void releaseWorkerResource(YarnProtos.ContainerIdProto containerId) {
+    throw new UnimplementedException("releaseWorkerResource");
+  }
+
+  @Override
+  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) {
+    throw new UnimplementedException("allocateQueryMaster");
+  }
+
+  @Override
+  public void allocateWorkerResources(
+      TajoMasterProtocol.WorkerResourceAllocationRequest request,
+      RpcCallback<WorkerResourceAllocationResponse> rpcCallBack) {
+    throw new UnimplementedException("allocateWorkerResources");
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+    connectYarnClient();
+
+    final YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnRuntimeException(e);
+    }
+
+    rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+      @Override
+      public ApplicationMasterProtocol run() {
+        return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf);
+      }
+    });
+  }
+
+  @Override
+  public String getSeedQueryId() throws IOException {
+    try {
+      YarnClientApplication app = yarnClient.createApplication();
+      return app.getApplicationSubmissionContext().getApplicationId().toString();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void stopQueryMaster(QueryId queryId) {
+    try {
+      FinalApplicationStatus appStatus = FinalApplicationStatus.UNDEFINED;
+      QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+      if(queryInProgress == null) {
+        return;
+      }
+      TajoProtos.QueryState state = queryInProgress.getQueryInfo().getQueryState();
+      if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+        appStatus = FinalApplicationStatus.SUCCEEDED;
+      } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
+        appStatus = FinalApplicationStatus.FAILED;
+      } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
+        appStatus = FinalApplicationStatus.FAILED;
+      }
+      FinishApplicationMasterRequest request = recordFactory
+          .newRecordInstance(FinishApplicationMasterRequest.class);
+      request.setFinalApplicationStatus(appStatus);
+      request.setDiagnostics("QueryMaster shutdown by TajoMaster.");
+      rmClient.finishApplicationMaster(request);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  private void connectYarnClient() {
+    this.yarnClient = new YarnClientImpl();
+    this.yarnClient.init(conf);
+    this.yarnClient.start();
+  }
+
+  private ApplicationAttemptId allocateAndLaunchQueryMaster(QueryInProgress queryInProgress) throws IOException, YarnException {
+    QueryId queryId = queryInProgress.getQueryId();
+    ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
+
+    LOG.info("Allocate and launch ApplicationMaster for QueryMaster: queryId=" +
+        queryId + ", appId=" + appId);
+
+    ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Tajo");
+
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(5);
+    appContext.setPriority(pri);
+
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+
+    ContainerLaunchContext commonContainerLaunchContext =
+        YarnContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true);
+
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the local resources
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    //LOG.info("Setting up app master command");
+    vargs.add("${JAVA_HOME}" + "/bin/java");
+    // Set Xmx based on am memory size
+    String jvmOptions = masterContext.getConf().get("tajo.rm.yarn.querymaster.jvm.option", "-Xmx2000m");
+
+    for(String eachToken: jvmOptions.split((" "))) {
+      vargs.add(eachToken);
+    }
+    // Set Remote Debugging
+    //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
+    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    //}
+    // Set class name
+    vargs.add(TajoWorker.class.getCanonicalName());
+    vargs.add("qm");
+    vargs.add(queryId.toString()); // queryId
+    vargs.add(masterContext.getTajoMasterService().getBindAddress().getHostName() + ":" +
+        masterContext.getTajoMasterService().getBindAddress().getPort());
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+
+    final Resource resource = Records.newRecord(Resource.class);
+    // TODO - get default value from conf
+    resource.setMemory(2000);
+    resource.setVirtualCores(1);
+
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+
+    ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
+        commonContainerLaunchContext.getLocalResources(),
+        myEnv,
+        commands,
+        myServiceData,
+        null,
+        new HashMap<ApplicationAccessType, String>(2)
+    );
+
+    appContext.setAMContainerSpec(masterContainerContext);
+
+    LOG.info("Submitting QueryMaster to ResourceManager");
+    yarnClient.submitApplication(appContext);
+
+    ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
+    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+
+    LOG.info("Launching QueryMaster with appAttemptId: " + attemptId);
+
+    return attemptId;
+  }
+
+  private ApplicationReport monitorApplication(ApplicationId appId,
+                                               Set<YarnApplicationState> finalState) throws IOException, YarnException {
+
+    long sleepTime = 100;
+    int count = 1;
+    while (true) {
+      // Get application report for the appId we are interested in
+      ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+      LOG.info("Got application report from ASM for" + ", appId="
+          + appId.getId() + ", appAttemptId="
+          + report.getCurrentApplicationAttemptId() + ", clientToken="
+          + report.getClientToAMToken() + ", appDiagnostics="
+          + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+          + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+          + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+          + ", yarnAppState=" + report.getYarnApplicationState().toString()
+          + ", distributedFinalState="
+          + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+          + report.getTrackingUrl() + ", appUser=" + report.getUser());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      if (finalState.contains(state)) {
+        return report;
+      }
+      try {
+        Thread.sleep(sleepTime);
+        sleepTime = count * 100;
+        if(count < 10) {
+          count++;
+        }
+      } catch (InterruptedException e) {
+        //LOG.debug("Thread sleep in monitoring loop interrupted");
+      }
+    }
+  }
+
+  public boolean isQueryMasterStopped(QueryId queryId) {
+    ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
+    try {
+      ApplicationReport report = yarnClient.getApplicationReport(appId);
+      YarnApplicationState state = report.getYarnApplicationState();
+      return EnumSet.of(
+          YarnApplicationState.FINISHED,
+          YarnApplicationState.KILLED,
+          YarnApplicationState.FAILED).contains(state);
+    } catch (YarnException e) {
+      LOG.error(e.getMessage(), e);
+      return false;
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
new file mode 100644
index 0000000..3f48ca5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class InvalidSessionException extends Exception {
+  public InvalidSessionException(String sessionId) {
+    super("Invalid session id \"" + sessionId + "\"");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java b/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..686d860
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class NoSuchSessionVariableException extends Exception {
+  public NoSuchSessionVariableException(String varname) {
+    super("No such session variable \"" + varname + "\"");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
new file mode 100644
index 0000000..4d244bf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
+
+public class Session implements SessionConstants, ProtoObject<SessionProto> {
+  private final String sessionId;
+  private final String userName;
+  private final Map<String, String> sessionVariables;
+
+  // transient status
+  private volatile long lastAccessTime;
+  private volatile String currentDatabase;
+
+  public Session(String sessionId, String userName, String databaseName) {
+    this.sessionId = sessionId;
+    this.userName = userName;
+    this.lastAccessTime = System.currentTimeMillis();
+    this.sessionVariables = new HashMap<String, String>();
+    selectDatabase(databaseName);
+  }
+
+  public Session(SessionProto proto) {
+    sessionId = proto.getSessionId();
+    userName = proto.getUsername();
+    currentDatabase = proto.getCurrentDatabase();
+    lastAccessTime = proto.getLastAccessTime();
+    Options options = new Options(proto.getVariables());
+    sessionVariables = options.getAllKeyValus();
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public void updateLastAccessTime() {
+    lastAccessTime = System.currentTimeMillis();
+  }
+
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  public void setVariable(String name, String value) {
+    synchronized (sessionVariables) {
+      sessionVariables.put(name, value);
+    }
+  }
+
+  public String getVariable(String name) throws NoSuchSessionVariableException {
+    synchronized (sessionVariables) {
+      if (sessionVariables.containsKey(name)) {
+        return sessionVariables.get(name);
+      } else {
+        throw new NoSuchSessionVariableException(name);
+      }
+    }
+  }
+
+  public void removeVariable(String name) {
+    synchronized (sessionVariables) {
+      sessionVariables.remove(name);
+    }
+  }
+
+  public synchronized Map<String, String> getAllVariables() {
+    synchronized (sessionVariables) {
+      return ImmutableMap.copyOf(sessionVariables);
+    }
+  }
+
+  public void selectDatabase(String databaseName) {
+    this.currentDatabase = databaseName;
+  }
+
+  public String getCurrentDatabase() {
+    return this.currentDatabase;
+  }
+
+  @Override
+  public SessionProto getProto() {
+    SessionProto.Builder builder = SessionProto.newBuilder();
+    builder.setSessionId(sessionId);
+    builder.setUsername(userName);
+    builder.setCurrentDatabase(currentDatabase);
+    builder.setLastAccessTime(lastAccessTime);
+    Options variables = new Options();
+    variables.putAll(this.sessionVariables);
+    builder.setVariables(variables.getProto());
+    return builder.build();
+  }
+
+  public String toString() {
+    return "user=" + userName + ",id=" + sessionId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
new file mode 100644
index 0000000..46f49a2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionConstants.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public interface SessionConstants {
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
new file mode 100644
index 0000000..dce3ba6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SessionEvent extends AbstractEvent<SessionEventType> {
+  private final String sessionId;
+
+  public SessionEvent(String sessionId, SessionEventType sessionEventType) {
+    super(sessionEventType);
+    this.sessionId = sessionId;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+}


Mime
View raw message