tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [09/11] tajo git commit: TAJO-1397: Resource allocation should be fine grained. (jinho)
Date Mon, 20 Jul 2015 08:34:27 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index bc0d212..0ace53a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -41,8 +41,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.NodeStatus;
 import org.apache.tajo.plan.InvalidQueryException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
@@ -50,6 +49,8 @@ import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.IndexScanNode;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.session.InvalidSessionException;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
@@ -420,40 +421,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
     return tuples;
   }
   
-  private Tuple getQueryMasterTuple(Schema outSchema, Worker aWorker) {
+  private Tuple getQueryMasterTuple(Schema outSchema, NodeStatus aNodeStatus) {
     List<Column> columns = outSchema.getRootColumns();
     Tuple aTuple = new VTuple(outSchema.size());
-    WorkerResource aResource = aWorker.getResource();
-    
+
     for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
       Column column = columns.get(fieldId);
       
       if ("host".equalsIgnoreCase(column.getSimpleName())) {
-        if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) {
-          aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost()));
+        if (aNodeStatus.getConnectionInfo() != null && aNodeStatus.getConnectionInfo().getHost() != null) {
+          aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getConnectionInfo().getHost()));
         } else {
           aTuple.put(fieldId, DatumFactory.createNullDatum());
         }
       } else if ("port".equalsIgnoreCase(column.getSimpleName())) {
-        if (aWorker.getConnectionInfo() != null) {
-          aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getQueryMasterPort()));
+        if (aNodeStatus.getConnectionInfo() != null) {
+          aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getConnectionInfo().getQueryMasterPort()));
         } else {
           aTuple.put(fieldId, DatumFactory.createNullDatum());
         }
       } else if ("type".equalsIgnoreCase(column.getSimpleName())) {
         aTuple.put(fieldId, DatumFactory.createText("QueryMaster"));
       } else if ("status".equalsIgnoreCase(column.getSimpleName())) {
-        aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString()));
-      } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) {
+        aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getState().toString()));
+      } else if ("RUNNING".equalsIgnoreCase(aNodeStatus.getState().toString())) {
         if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumQueryMasterTasks()));
-        } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap()));
-        } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap()));
+          aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getNumRunningQueryMaster()));
         } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
-          if (aWorker.getLastHeartbeatTime() > 0) {
-            aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime()));
+          if (aNodeStatus.getLastHeartbeatTime() > 0) {
+            aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
           } else {
             aTuple.put(fieldId, DatumFactory.createNullDatum());
           }
@@ -466,50 +462,48 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
     return aTuple;
   }
   
-  private Tuple getWorkerTuple(Schema outSchema, Worker aWorker) {
+  private Tuple getWorkerTuple(Schema outSchema, NodeStatus aNodeStatus) {
     List<Column> columns = outSchema.getRootColumns();
     Tuple aTuple = new VTuple(outSchema.size());
-    WorkerResource aResource = aWorker.getResource();
+
+    NodeResource total = aNodeStatus.getTotalResourceCapability();
+    NodeResource used = NodeResources.subtract(total, aNodeStatus.getAvailableResource());
     
     for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
       Column column = columns.get(fieldId);
       
       if ("host".equalsIgnoreCase(column.getSimpleName())) {
-        if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) {
-          aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost()));
+        if (aNodeStatus.getConnectionInfo() != null && aNodeStatus.getConnectionInfo().getHost() != null) {
+          aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getConnectionInfo().getHost()));
         } else {
           aTuple.put(fieldId, DatumFactory.createNullDatum());
         }
       } else if ("port".equalsIgnoreCase(column.getSimpleName())) {
-        if (aWorker.getConnectionInfo() != null) {
-          aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getPeerRpcPort()));
+        if (aNodeStatus.getConnectionInfo() != null) {
+          aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getConnectionInfo().getPeerRpcPort()));
         } else {
           aTuple.put(fieldId, DatumFactory.createNullDatum());
         }
       } else if ("type".equalsIgnoreCase(column.getSimpleName())) {
         aTuple.put(fieldId, DatumFactory.createText("Worker"));
       } else if ("status".equalsIgnoreCase(column.getSimpleName())) {
-        aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString()));
-      } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) {
+        aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getState().toString()));
+      } else if ("RUNNING".equalsIgnoreCase(aNodeStatus.getState().toString())) {
         if ("total_cpu".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(aResource.getCpuCoreSlots()));
+          aTuple.put(fieldId, DatumFactory.createInt4(total.getVirtualCores()));
         } else if ("used_mem".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getUsedMemoryMB()*1048576l));
+          aTuple.put(fieldId, DatumFactory.createInt8(used.getMemory() * 1048576l));
         } else if ("total_mem".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMemoryMB()*1048576l));
-        } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap()));
-        } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap()));
-        } else if ("used_diskslots".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getUsedDiskSlots()));
-        } else if ("total_diskslots".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getDiskSlots()));
+          aTuple.put(fieldId, DatumFactory.createInt8(total.getMemory() * 1048576l));
+        } else if ("used_disk".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(used.getDisks()));
+        } else if ("total_disk".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(total.getDisks()));
         } else if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumRunningTasks()));
+          aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getNumRunningTasks()));
         } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) {
-          if (aWorker.getLastHeartbeatTime() > 0) {
-            aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime()));
+          if (aNodeStatus.getLastHeartbeatTime() > 0) {
+            aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime()));
           } else {
             aTuple.put(fieldId, DatumFactory.createNullDatum());
           }
@@ -524,23 +518,23 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
   }
   
   private List<Tuple> getClusterInfo(Schema outSchema) {
-    Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers();
+    Map<Integer, NodeStatus> workerMap = masterContext.getResourceManager().getNodes();
     List<Tuple> tuples;
-    List<Worker> queryMasterList = new ArrayList<Worker>();
-    List<Worker> workerList = new ArrayList<Worker>();
+    List<NodeStatus> queryMasterList = new ArrayList<NodeStatus>();
+    List<NodeStatus> nodeStatusList = new ArrayList<NodeStatus>();
     
-    for (Worker aWorker: workerMap.values()) {
-      queryMasterList.add(aWorker);
-      workerList.add(aWorker);
+    for (NodeStatus aNodeStatus : workerMap.values()) {
+      queryMasterList.add(aNodeStatus);
+      nodeStatusList.add(aNodeStatus);
     }
     
-    tuples = new ArrayList<Tuple>(queryMasterList.size() + workerList.size());
-    for (Worker queryMaster: queryMasterList) {
+    tuples = new ArrayList<Tuple>(queryMasterList.size() + nodeStatusList.size());
+    for (NodeStatus queryMaster: queryMasterList) {
       tuples.add(getQueryMasterTuple(outSchema, queryMaster));
     }
     
-    for (Worker worker: workerList) {
-      tuples.add(getWorkerTuple(outSchema, worker));
+    for (NodeStatus nodeStatus : nodeStatusList) {
+      tuples.add(getWorkerTuple(outSchema, nodeStatus));
     }
     
     return tuples;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java
new file mode 100644
index 0000000..430dc86
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * NodeEvent describes all kinds of events which sent to {@link NodeStatus}.
+ */
+public class NodeEvent extends AbstractEvent<NodeEventType> {
+  private final int workerId;
+
+  public NodeEvent(int workerId, NodeEventType nodeEventType) {
+    super(nodeEventType);
+    this.workerId = workerId;
+  }
+
+  public int getWorkerId() {
+    return workerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java
new file mode 100644
index 0000000..64de5d4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+public enum NodeEventType {
+
+  /** Source : {@link TajoResourceTracker}, Destination: {@link NodeStatus} */
+  STARTED,
+  STATE_UPDATE,
+  RECONNECTED,
+
+  /** Source : {@link NodeLivelinessMonitor}, Destination: {@link NodeStatus} */
+  EXPIRE
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java
new file mode 100644
index 0000000..55a24fc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+/**
+ * It periodically checks the latest heartbeat time of {@link NodeStatus}.
+ * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link NodeStatus}.
+ */
+public class NodeLivelinessMonitor extends AbstractLivelinessMonitor<Integer> {
+
+  private EventHandler dispatcher;
+
+  public NodeLivelinessMonitor(Dispatcher d) {
+    super(NodeLivelinessMonitor.class.getSimpleName(), new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf systemConf = (TajoConf) conf;
+    // milliseconds
+    int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT) * 1000;
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(expireIntvl/3);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void expire(Integer id) {
+    dispatcher.handle(new NodeEvent(id, NodeEventType.EXPIRE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java
new file mode 100644
index 0000000..e9ba077
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and it's destination is {@link NodeStatus}.
+ * This event occurs only when an inactive nodeStatus sends a ping again.
+ */
+public class NodeReconnectEvent extends NodeEvent {
+  private final NodeStatus nodeStatus;
+  public NodeReconnectEvent(int workerId, NodeStatus nodeStatus) {
+    super(workerId, NodeEventType.RECONNECTED);
+    this.nodeStatus = nodeStatus;
+  }
+
+  public NodeStatus getNodeStatus() {
+    return nodeStatus;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java
new file mode 100644
index 0000000..89f601d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * It presents the states of {@link NodeStatus}.
+ */
+public enum NodeState {
+  /** New node */
+  NEW,
+
+  /** Running node */
+  RUNNING,
+
+  /** Node is unhealthy */
+  UNHEALTHY,
+
+  /** Node is out of service */
+  DECOMMISSIONED,
+
+  /** Node has not sent a heartbeat for some configured time threshold */
+  LOST;
+
+  @SuppressWarnings("unused")
+  public boolean isUnusable() {
+    return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
new file mode 100644
index 0000000..63e4d52
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.util.TUtil;
+
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+/**
+ * It contains resource and various information for a node.
+ */
+public class NodeStatus implements EventHandler<NodeEvent>, Comparable<NodeStatus> {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(NodeStatus.class);
+
+  /** context of {@link TajoResourceManager} */
+  private final TajoRMContext rmContext;
+
+  /** last heartbeat time */
+  private volatile long lastHeartbeatTime;
+
+  @SuppressWarnings("unused")
+  private volatile int numRunningTasks;
+
+  @SuppressWarnings("unused")
+  private volatile int numRunningQueryMaster;
+
+  private static AtomicLongFieldUpdater HEARTBEAT_TIME_UPDATER;
+  private static AtomicIntegerFieldUpdater RUNNING_TASK_UPDATER;
+  private static AtomicIntegerFieldUpdater RUNNING_QM_UPDATER;
+
+  static {
+    HEARTBEAT_TIME_UPDATER = PlatformDependent.newAtomicLongFieldUpdater(NodeStatus.class, "lastHeartbeatTime");
+    if (HEARTBEAT_TIME_UPDATER == null) {
+      HEARTBEAT_TIME_UPDATER = AtomicLongFieldUpdater.newUpdater(NodeStatus.class, "lastHeartbeatTime");
+      RUNNING_TASK_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeStatus.class, "numRunningTasks");
+      RUNNING_QM_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeStatus.class, "numRunningQueryMaster");
+    } else {
+      RUNNING_TASK_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeStatus.class, "numRunningTasks");
+      RUNNING_QM_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeStatus.class, "numRunningQueryMaster");
+    }
+  }
+
+  /** Available resources on the node. */
+  private final NodeResource availableResource;
+
+  /** Total resources on the node. */
+  private final NodeResource totalResourceCapability;
+
+  /** Node connection information */
+  private WorkerConnectionInfo connectionInfo;
+
+  private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
+  private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
+
+  private static final StateMachineFactory<NodeStatus,
+      NodeState,
+      NodeEventType,
+      NodeEvent> stateMachineFactory
+      = new StateMachineFactory<NodeStatus,
+      NodeState,
+      NodeEventType,
+      NodeEvent>(NodeState.NEW)
+
+      // Transition from NEW
+      .addTransition(NodeState.NEW, NodeState.RUNNING,
+          NodeEventType.STARTED,
+          new AddNodeTransition())
+
+      // Transition from RUNNING
+      .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
+          NodeEventType.STATE_UPDATE,
+          STATUS_UPDATE_TRANSITION)
+      .addTransition(NodeState.RUNNING, NodeState.LOST,
+          NodeEventType.EXPIRE,
+          new DeactivateNodeTransition(NodeState.LOST))
+      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+          NodeEventType.RECONNECTED,
+          RECONNECT_NODE_TRANSITION)
+
+      // Transitions from UNHEALTHY state
+      .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
+          NodeEventType.STATE_UPDATE,
+          STATUS_UPDATE_TRANSITION)
+      .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
+          NodeEventType.EXPIRE,
+          new DeactivateNodeTransition(NodeState.LOST))
+      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+          NodeEventType.RECONNECTED,
+          RECONNECT_NODE_TRANSITION);
+
+  private final StateMachine<NodeState, NodeEventType, NodeEvent> stateMachine =
+      stateMachineFactory.make(this, NodeState.NEW);
+
+  public NodeStatus(TajoRMContext rmContext, NodeResource totalResourceCapability, WorkerConnectionInfo connectionInfo) {
+    this.rmContext = rmContext;
+
+    this.connectionInfo = connectionInfo;
+    this.lastHeartbeatTime = System.currentTimeMillis();
+    this.totalResourceCapability = totalResourceCapability;
+    this.availableResource = NodeResources.clone(totalResourceCapability);
+  }
+
+  public int getWorkerId() {
+    return connectionInfo.getId();
+  }
+
+  public WorkerConnectionInfo getConnectionInfo() {
+    return connectionInfo;
+  }
+
+  public void setLastHeartbeatTime(long lastHeartbeatTime) {
+    HEARTBEAT_TIME_UPDATER.lazySet(this, lastHeartbeatTime);
+  }
+
+  public void setNumRunningQueryMaster(int numRunningQueryMaster) {
+    RUNNING_QM_UPDATER.lazySet(this, numRunningQueryMaster);
+  }
+
+  public int getNumRunningQueryMaster() {
+    return numRunningQueryMaster;
+  }
+
+  public void setNumRunningTasks(int numRunningTasks) {
+    RUNNING_TASK_UPDATER.lazySet(this, numRunningTasks);
+  }
+
+  public int getNumRunningTasks() {
+    return numRunningTasks;
+  }
+
+  public long getLastHeartbeatTime() {
+    return this.lastHeartbeatTime;
+  }
+
+  /**
+   *
+   * @return the current state of node
+   */
+  public NodeState getState() {
+    return this.stateMachine.getCurrentState();
+  }
+
+  /**
+   * Get current resources on the node.
+   *
+   * @return current resources on the node.
+   */
+  public NodeResource getAvailableResource() {
+    return this.availableResource;
+  }
+
+  /**
+   * Get total resources on the node.
+   *
+   * @return total resources on the node.
+   */
+  public NodeResource getTotalResourceCapability() {
+    return totalResourceCapability;
+  }
+
+  @Override
+  public int compareTo(NodeStatus o) {
+    if(o == null) {
+      return 1;
+    }
+    return connectionInfo.compareTo(o.connectionInfo);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    NodeStatus nodeStatus = (NodeStatus) o;
+
+    if (connectionInfo != null ? !connectionInfo.equals(nodeStatus.connectionInfo) : nodeStatus.connectionInfo != null)
+      return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 0;
+    result = 31 * result + (connectionInfo != null ? connectionInfo.hashCode() : 0);
+    return result;
+  }
+
+  public static class AddNodeTransition implements SingleArcTransition<NodeStatus, NodeEvent> {
+    @Override
+    public void transition(NodeStatus nodeStatus, NodeEvent nodeEvent) {
+
+      nodeStatus.rmContext.getQueryMasterWorker().add(nodeStatus.getWorkerId());
+      LOG.info("Node with " + nodeStatus.getTotalResourceCapability() + " is joined to Tajo cluster");
+    }
+  }
+
+  public static class StatusUpdateTransition implements
+      MultipleArcTransition<NodeStatus, NodeEvent, NodeState> {
+
+    @Override
+    public NodeState transition(NodeStatus nodeStatus, NodeEvent event) {
+
+      NodeStatusEvent statusEvent = TUtil.checkTypeAndGet(event, NodeStatusEvent.class);
+      nodeStatus.updateStatus(statusEvent);
+
+      return NodeState.RUNNING;
+    }
+  }
+
+  private void updateStatus(NodeStatusEvent statusEvent) {
+    setLastHeartbeatTime(System.currentTimeMillis());
+    setNumRunningTasks(statusEvent.getRunningTaskNum());
+    setNumRunningQueryMaster(statusEvent.getRunningQMNum());
+    NodeResources.update(availableResource, statusEvent.getAvailableResource());
+
+    if(statusEvent.getTotalResource() != null) {
+      NodeResources.update(totalResourceCapability, statusEvent.getTotalResource());
+    }
+  }
+
+  public static class DeactivateNodeTransition implements SingleArcTransition<NodeStatus, NodeEvent> {
+    private final NodeState finalState;
+
+    public DeactivateNodeTransition(NodeState finalState) {
+      this.finalState = finalState;
+    }
+
+    @Override
+    public void transition(NodeStatus nodeStatus, NodeEvent nodeEvent) {
+
+      nodeStatus.rmContext.getNodes().remove(nodeStatus.getWorkerId());
+      LOG.info("Deactivating Node " + nodeStatus.getWorkerId() + " as it is now " + finalState);
+      nodeStatus.rmContext.getInactiveNodes().putIfAbsent(nodeStatus.getWorkerId(), nodeStatus);
+    }
+  }
+
+  public static class ReconnectNodeTransition implements SingleArcTransition<NodeStatus, NodeEvent> {
+
+    @Override
+    public void transition(NodeStatus nodeStatus, NodeEvent nodeEvent) {
+
+      NodeReconnectEvent castedEvent = TUtil.checkTypeAndGet(nodeEvent, NodeReconnectEvent.class);
+      NodeStatus newNodeStatus = castedEvent.getNodeStatus();
+      nodeStatus.rmContext.getNodes().put(castedEvent.getWorkerId(), newNodeStatus);
+      nodeStatus.rmContext.getDispatcher().getEventHandler().handle(
+          new NodeEvent(nodeStatus.getWorkerId(), NodeEventType.STARTED));
+    }
+  }
+
+  @Override
+  public void handle(NodeEvent event) {
+    LOG.debug("Processing " + event.getWorkerId() + " of type " + event.getType());
+    NodeState oldState = getState();
+    try {
+      stateMachine.doTransition(event.getType(), event);
+    } catch (InvalidStateTransitonException e) {
+      LOG.error("Can't handle this event at current state"
+          + ", eventType:" + event.getType().name()
+          + ", oldState:" + oldState.name()
+          + ", nextState:" + getState().name()
+          , e);
+      LOG.error("Invalid event " + event.getType() + " on NodeStatus  " + getWorkerId());
+    }
+    if (oldState != getState()) {
+      LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to " + getState());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java
new file mode 100644
index 0000000..db2f167
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.resource.NodeResource;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and its destination is
+ * {@link NodeStatus.StatusUpdateTransition} of {@link NodeStatus}.
+ */
+public class NodeStatusEvent extends NodeEvent {
+  private final int runningTaskNum;
+  private final int runningQMNum;
+  private final NodeResource available;
+  private final NodeResource total;
+
+  public NodeStatusEvent(int workerId, int runningTaskNum, int runningQMNum,
+                         NodeResource available, @Nullable NodeResource total) {
+    super(workerId, NodeEventType.STATE_UPDATE);
+    this.runningTaskNum = runningTaskNum;
+    this.runningQMNum = runningQMNum;
+    this.available = available;
+    this.total = total;
+  }
+
+  public int getRunningTaskNum() {
+    return runningTaskNum;
+  }
+
+  public int getRunningQMNum() {
+    return runningQMNum;
+  }
+
+  public NodeResource getAvailableResource() {
+    return available;
+  }
+
+  public NodeResource getTotalResource() {
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index bb8cc12..9dfae7e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -20,8 +20,6 @@ package org.apache.tajo.master.rm;
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.ipc.ContainerProtocol;
 
 import java.util.Collections;
 import java.util.Set;
@@ -29,27 +27,21 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager.
+ * It's a worker resource manager context. It contains all context data about TajoResourceManager.
  */
 public class TajoRMContext {
 
   final Dispatcher rmDispatcher;
 
-  /** map between workerIds and running workers */
-  private final ConcurrentMap<Integer, Worker> workers = Maps.newConcurrentMap();
+  /** map between workerIds and running nodes */
+  private final ConcurrentMap<Integer, NodeStatus> nodes = Maps.newConcurrentMap();
 
-  /** map between workerIds and inactive workers */
-  private final ConcurrentMap<Integer, Worker> inactiveWorkers = Maps.newConcurrentMap();
-
-  /** map between queryIds and query master ContainerId */
-  private final ConcurrentMap<QueryId, ContainerProtocol.TajoContainerIdProto> qmContainerMap = Maps
-    .newConcurrentMap();
+  /** map between workerIds and inactive nodes */
+  private final ConcurrentMap<Integer, NodeStatus> inactiveNodes = Maps.newConcurrentMap();
 
   private final Set<Integer> liveQueryMasterWorkerResources =
       Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
 
-  private final Set<QueryId> stoppedQueryIds =
-      Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>());
 
   public TajoRMContext(Dispatcher dispatcher) {
     this.rmDispatcher = dispatcher;
@@ -60,32 +52,20 @@ public class TajoRMContext {
   }
 
   /**
-   * @return The Map for active workers
+   * @return The Map for active nodes
    */
-  public ConcurrentMap<Integer, Worker> getWorkers() {
-    return workers;
+  public ConcurrentMap<Integer, NodeStatus> getNodes() {
+    return nodes;
   }
 
   /**
-   * @return The Map for inactive workers
+   * @return The Map for inactive nodes
    */
-  public ConcurrentMap<Integer, Worker> getInactiveWorkers() {
-    return inactiveWorkers;
-  }
-
-  /**
-   *
-   * @return The Map for query master containers
-   */
-  public ConcurrentMap<QueryId, ContainerProtocol.TajoContainerIdProto> getQueryMasterContainer() {
-    return qmContainerMap;
+  public ConcurrentMap<Integer, NodeStatus> getInactiveNodes() {
+    return inactiveNodes;
   }
 
   public Set<Integer> getQueryMasterWorker() {
     return liveQueryMasterWorkerResources;
   }
-
-  public Set<QueryId> getStoppedQueryIds() {
-    return stoppedQueryIds;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java
new file mode 100644
index 0000000..c91938a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.scheduler.AbstractQueryScheduler;
+import org.apache.tajo.master.scheduler.QuerySchedulingInfo;
+import org.apache.tajo.master.scheduler.event.SchedulerEventType;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * It manages all resources of tajo workers.
+ */
+public class TajoResourceManager extends CompositeService {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(TajoResourceManager.class);
+
+  protected static final Map<String, Class<? extends AbstractQueryScheduler>> SCHEDULER_CLASS_CACHE = Maps.newHashMap();
+
+  private TajoMaster.MasterContext masterContext;
+
+  private TajoRMContext rmContext;
+
+  private String queryIdSeed;
+
+  /**
+   * Node Liveliness monitor
+   */
+  private NodeLivelinessMonitor nodeLivelinessMonitor;
+
+  private TajoConf systemConf;
+  private AbstractQueryScheduler scheduler;
+
+  /** It receives status messages from workers and their resources. */
+  private TajoResourceTracker resourceTracker;
+
+  public TajoResourceManager(TajoMaster.MasterContext masterContext) {
+    super(TajoResourceManager.class.getSimpleName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    addIfService(dispatcher);
+
+    rmContext = new TajoRMContext(dispatcher);
+
+    this.queryIdSeed = String.valueOf(System.currentTimeMillis());
+
+    this.nodeLivelinessMonitor = new NodeLivelinessMonitor(this.rmContext.getDispatcher());
+    addIfService(this.nodeLivelinessMonitor);
+
+    // Register event handler for Workers
+    rmContext.getDispatcher().register(NodeEventType.class, new WorkerEventDispatcher(rmContext));
+
+    resourceTracker = new TajoResourceTracker(this, nodeLivelinessMonitor);
+    addIfService(resourceTracker);
+
+    String schedulerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_SCHEDULER_CLASS);
+    scheduler = loadScheduler(schedulerClassName);
+    LOG.info("Loaded resource scheduler : " + scheduler.getClass());
+    addIfService(scheduler);
+    rmContext.getDispatcher().register(SchedulerEventType.class, scheduler);
+
+    super.serviceInit(systemConf);
+  }
+
+  protected synchronized AbstractQueryScheduler loadScheduler(String schedulerClassName) throws Exception {
+    Class<? extends AbstractQueryScheduler> schedulerClass;
+    if (SCHEDULER_CLASS_CACHE.containsKey(schedulerClassName)) {
+      schedulerClass = SCHEDULER_CLASS_CACHE.get(schedulerClassName);
+    } else {
+      schedulerClass = (Class<? extends AbstractQueryScheduler>) Class.forName(schedulerClassName);
+      SCHEDULER_CLASS_CACHE.put(schedulerClassName, schedulerClass);
+    }
+
+    Constructor<? extends AbstractQueryScheduler>
+        constructor = schedulerClass.getDeclaredConstructor(new Class[]{TajoMaster.MasterContext.class});
+    constructor.setAccessible(true);
+
+    return constructor.newInstance(new Object[]{masterContext});
+  }
+
+  @InterfaceAudience.Private
+  public static final class WorkerEventDispatcher implements EventHandler<NodeEvent> {
+
+    private final TajoRMContext rmContext;
+
+    public WorkerEventDispatcher(TajoRMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(NodeEvent event) {
+      int workerId = event.getWorkerId();
+      NodeStatus node = this.rmContext.getNodes().get(workerId);
+      if (node != null) {
+        try {
+          node.handle(event);
+        } catch (Throwable t) {
+          LOG.error("Error in handling event type " + event.getType() + " for node " + workerId, t);
+        }
+      }
+    }
+  }
+
+
+  public Map<Integer, NodeStatus> getNodes() {
+    return ImmutableMap.copyOf(rmContext.getNodes());
+  }
+
+  public Map<Integer, NodeStatus> getInactiveNodes() {
+    return ImmutableMap.copyOf(rmContext.getInactiveNodes());
+  }
+
+  public Collection<Integer> getQueryMasters() {
+    return Collections.unmodifiableSet(rmContext.getQueryMasterWorker());
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  /**
+   *
+   * @return The prefix of queryId. It is generated when a TajoMaster starts up.
+   */
+  public String getSeedQueryId() throws IOException {
+    return queryIdSeed;
+  }
+
+  @VisibleForTesting
+  TajoResourceTracker getResourceTracker() {
+    return resourceTracker;
+  }
+
+  public AbstractQueryScheduler getScheduler() {
+    return scheduler;
+  }
+
+  public void submitQuery(QuerySchedulingInfo schedulingInfo) {
+    scheduler.submitQuery(schedulingInfo);
+  }
+
+  public TajoRMContext getRMContext() {
+    return rmContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 2a18de7..3e3e3b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -20,79 +20,76 @@ package org.apache.tajo.master.rm;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.common.exception.NotImplementedException;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.event.SchedulerEvent;
+import org.apache.tajo.master.scheduler.event.SchedulerEventType;
+import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.TUtil;
 
-import java.io.IOError;
 import java.net.InetSocketAddress;
 
-import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
-import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
+import static org.apache.tajo.ResourceProtos.*;
 
 /**
- * It receives pings that workers periodically send. The ping messages contains the worker resources and their statuses.
- * From ping messages, {@link TajoResourceTracker} tracks the recent status of all workers.
+ * It receives pings that nodes periodically send. The ping messages contains the node resources and their statuses.
+ * From ping messages, {@link TajoResourceTracker} tracks the recent status of all nodes.
  *
  * In detail, it has two main roles as follows:
  *
  * <ul>
  *   <li>Membership management for nodes which join to a Tajo cluster</li>
  *   <ul>
- *    <li>Register - It receives the ping from a new worker. It registers the worker.</li>
- *    <li>Unregister - It unregisters a worker who does not send ping for some expiry time.</li>
+ *    <li>Register - It receives the ping from a new node. It registers the node.</li>
+ *    <li>Unregister - It unregisters a node who does not send ping for some expiry time.</li>
  *   <ul>
- *   <li>Status Update - It updates the status of all participating workers</li>
+ *   <li>Status Update - It updates the status of all participating nodes</li>
  * </ul>
  */
 public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface {
   /** Class logger */
   private Log LOG = LogFactory.getLog(TajoResourceTracker.class);
 
-  private final WorkerResourceManager manager;
-  /** the context of TajoWorkerResourceManager */
+  private final TajoResourceManager manager;
+  /** the context of TajoResourceManager */
   private final TajoRMContext rmContext;
-  /** Liveliness monitor which checks ping expiry times of workers */
-  private final WorkerLivelinessMonitor workerLivelinessMonitor;
+  /** Liveliness monitor which checks ping expiry times of nodes */
+  private final NodeLivelinessMonitor nodeLivelinessMonitor;
 
-  /** RPC server for worker resource tracker */
+  /** RPC server for node resource tracker */
   private AsyncRpcServer server;
-  /** The bind address of RPC server of worker resource tracker */
+  /** The bind address of RPC server of node resource tracker */
   private InetSocketAddress bindAddress;
 
-  public TajoResourceTracker(WorkerResourceManager manager, WorkerLivelinessMonitor workerLivelinessMonitor) {
+  /** node heartbeat interval in query running */
+  private int activeInterval;
+
+  public TajoResourceTracker(TajoResourceManager manager, NodeLivelinessMonitor nodeLivelinessMonitor) {
     super(TajoResourceTracker.class.getSimpleName());
     this.manager = manager;
     this.rmContext = manager.getRMContext();
-    this.workerLivelinessMonitor = workerLivelinessMonitor;
+    this.nodeLivelinessMonitor = nodeLivelinessMonitor;
   }
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
-    TajoConf systemConf = (TajoConf) conf;
+
+    TajoConf systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+    activeInterval = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_ACTIVE_INTERVAL);
 
     String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
 
-    try {
-      server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, 3);
-    } catch (Exception e) {
-      LOG.error(e);
-      throw new IOError(e);
-    }
+    int workerNum = systemConf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+    server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, workerNum);
 
     server.start();
     bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
@@ -113,103 +110,97 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     super.serviceStop();
   }
 
-  /** The response builder */
-  private static final TajoHeartbeatResponse.Builder builder =
-      TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
-
-  private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
-    return new WorkerStatusEvent(
-        workerId,
-        heartbeat.getServerStatus().getRunningTaskNum(),
-        heartbeat.getServerStatus().getJvmHeap().getMaxHeap(),
-        heartbeat.getServerStatus().getJvmHeap().getFreeHeap(),
-        heartbeat.getServerStatus().getJvmHeap().getTotalHeap());
+  private static NodeStatusEvent createStatusEvent(NodeHeartbeatRequest heartbeat) {
+    return new NodeStatusEvent(
+        heartbeat.getWorkerId(),
+        heartbeat.getRunningTasks(),
+        heartbeat.getRunningQueryMasters(),
+        new NodeResource(heartbeat.getAvailableResource()),
+        heartbeat.hasTotalResource() ? new NodeResource(heartbeat.getTotalResource()) : null);
   }
 
   @Override
-  public void heartbeat(
+  public void nodeHeartbeat(
       RpcController controller,
-      NodeHeartbeat heartbeat,
-      RpcCallback<TajoHeartbeatResponse> done) {
+      NodeHeartbeatRequest heartbeat,
+      RpcCallback<NodeHeartbeatResponse> done) {
 
+    NodeHeartbeatResponse.Builder response = NodeHeartbeatResponse.newBuilder();
+    ResponseCommand responseCommand = ResponseCommand.NORMAL;
     try {
       // get a workerId from the heartbeat
-      int workerId = heartbeat.getConnectionInfo().getId();
+      int workerId = heartbeat.getWorkerId();
 
-      if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running
+      if(rmContext.getNodes().containsKey(workerId)) { // if node is running
 
         // status update
-        rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(workerId, heartbeat));
+        rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(heartbeat));
+
+        //refresh scheduler resource
+        rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
+
         // refresh ping
-        workerLivelinessMonitor.receivedPing(workerId);
+        nodeLivelinessMonitor.receivedPing(workerId);
 
-      } else if (rmContext.getInactiveWorkers().containsKey(workerId)) { // worker was inactive
+      } else if (rmContext.getInactiveNodes().containsKey(workerId)) { // node was inactive
+        if (!heartbeat.hasConnectionInfo()) {
+          // request membership to worker node
+          responseCommand = ResponseCommand.MEMBERSHIP;
+        } else {
 
-        // remove the inactive worker from the list of inactive workers.
-        Worker worker = rmContext.getInactiveWorkers().remove(workerId);
-        workerLivelinessMonitor.unregister(worker.getWorkerId());
+          // remove the inactive nodeStatus from the list of inactive nodes.
+          NodeStatus nodeStatus = rmContext.getInactiveNodes().remove(workerId);
+          nodeLivelinessMonitor.unregister(nodeStatus.getWorkerId());
 
-        // create new worker instance
-        Worker newWorker = createWorkerResource(heartbeat);
-        int newWorkerId = newWorker.getWorkerId();
-        // add the new worker to the list of active workers
-        rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker);
+          // create new nodeStatus instance
+          NodeStatus newNodeStatus = createNodeStatus(heartbeat);
+          int newWorkerId = newNodeStatus.getWorkerId();
+          // add the new nodeStatus to the list of active nodes
+          rmContext.getNodes().putIfAbsent(newWorkerId, newNodeStatus);
 
-        // Transit the worker to RUNNING
-        rmContext.getDispatcher().getEventHandler().handle(new WorkerEvent(newWorkerId, WorkerEventType.STARTED));
-        // register the worker to the liveliness monitor
-        workerLivelinessMonitor.register(newWorkerId);
+          // Transit the nodeStatus to RUNNING
+          rmContext.getDispatcher().getEventHandler().handle(new NodeEvent(newWorkerId, NodeEventType.STARTED));
+          // register the nodeStatus to the liveliness monitor
+          nodeLivelinessMonitor.register(newWorkerId);
 
-      } else { // if new worker pings firstly
+          rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
+        }
 
-        // create new worker instance
-        Worker newWorker = createWorkerResource(heartbeat);
-        Worker oldWorker = rmContext.getWorkers().putIfAbsent(workerId, newWorker);
+      } else { // if new node pings firstly
 
-        if (oldWorker == null) {
-          // Transit the worker to RUNNING
-          rmContext.rmDispatcher.getEventHandler().handle(new WorkerEvent(workerId, WorkerEventType.STARTED));
+        // The pings have not membership information
+        if (!heartbeat.hasConnectionInfo()) {
+          // request membership to node
+          responseCommand = ResponseCommand.MEMBERSHIP;
         } else {
-          LOG.info("Reconnect from the node at: " + workerId);
-          workerLivelinessMonitor.unregister(workerId);
-          rmContext.getDispatcher().getEventHandler().handle(new WorkerReconnectEvent(workerId, newWorker));
-        }
 
-        workerLivelinessMonitor.register(workerId);
+          // create new node instance
+          NodeStatus newNodeStatus = createNodeStatus(heartbeat);
+          NodeStatus oldNodeStatus = rmContext.getNodes().putIfAbsent(workerId, newNodeStatus);
+
+          if (oldNodeStatus == null) {
+            // Transit the worker to RUNNING
+            rmContext.rmDispatcher.getEventHandler().handle(new NodeEvent(workerId, NodeEventType.STARTED));
+          } else {
+            LOG.info("Reconnect from the node at: " + workerId);
+            nodeLivelinessMonitor.unregister(workerId);
+            rmContext.getDispatcher().getEventHandler().handle(new NodeReconnectEvent(workerId, newNodeStatus));
+          }
+
+          nodeLivelinessMonitor.register(workerId);
+          rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
+        }
       }
-
     } finally {
-      builder.setClusterResourceSummary(manager.getClusterResourceSummary());
-      done.run(builder.build());
+      if(manager.getScheduler().getRunningQuery() > 0) {
+        response.setHeartBeatInterval(activeInterval);
+      }
+      done.run(response.setCommand(responseCommand).build());
     }
   }
 
-  @Override
-  public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request,
-                            RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) {
-    //TODO implement with ResourceManager for scheduler
-    TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.Builder
-        response = TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.newBuilder();
-    done.run(response.setCommand(TajoResourceTrackerProtocol.ResponseCommand.NORMAL).build());
-  }
-
-  private Worker createWorkerResource(NodeHeartbeat request) {
-    WorkerResource workerResource = new WorkerResource();
-
-    if(request.getServerStatus() != null) {
-      workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());
-      workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
-      workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
-      workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
-      workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
-      workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
-      workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
-    } else {
-      workerResource.setMemoryMB(4096);
-      workerResource.setDiskSlots(4);
-      workerResource.setCpuCoreSlots(4);
-    }
-
-    return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
+  private NodeStatus createNodeStatus(NodeHeartbeatRequest request) {
+    return new NodeStatus(rmContext, new NodeResource(request.getTotalResource()),
+        new WorkerConnectionInfo(request.getConnectionInfo()));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
deleted file mode 100644
index 8c5b96c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
-
-
-public class TajoWorkerContainer extends TajoContainer {
-  TajoContainerId id;
-  NodeId nodeId;
-  Worker worker;
-
-  public Worker getWorkerResource() {
-    return worker;
-  }
-
-  public void setWorkerResource(Worker workerResource) {
-    this.worker = workerResource;
-  }
-
-  @Override
-  public TajoContainerId getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(TajoContainerId id) {
-    this.id = id;
-  }
-
-  @Override
-  public NodeId getNodeId() {
-    return nodeId;
-  }
-
-  @Override
-  public void setNodeId(NodeId nodeId) {
-    this.nodeId = nodeId;
-  }
-
-  @Override
-  public String getNodeHttpAddress() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public void setNodeHttpAddress(String nodeHttpAddress) {
-    //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public Resource getResource() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public void setResource(Resource resource) {
-    //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public Priority getPriority() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public void setPriority(Priority priority) {
-    //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public Token getContainerToken() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public void setContainerToken(Token containerToken) {
-    //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public int compareTo(TajoContainer container) {
-    return getId().compareTo(container.getId());
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    TajoWorkerContainer that = (TajoWorkerContainer) o;
-
-    if (id != null ? !id.equals(that.id) : that.id != null) return false;
-    if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) return false;
-    if (worker != null ? !worker.equals(that.worker) : that.worker != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = id != null ? id.hashCode() : 0;
-    result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
-    result = 31 * result + (worker != null ? worker.hashCode() : 0);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
deleted file mode 100644
index 184de71..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.master.container.TajoContainerId;
-
-public class TajoWorkerContainerId extends TajoContainerId {
-  ApplicationAttemptId applicationAttemptId;
-  int id;
-
-  @Override
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return applicationAttemptId;
-  }
-
-  @Override
-  public void setApplicationAttemptId(ApplicationAttemptId atId) {
-    this.applicationAttemptId = atId;
-  }
-
-  @Override
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public void setId(int id) {
-    this.id = id;
-  }
-
-  public ContainerProtocol.TajoContainerIdProto getProto() {
-    YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
-      .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
-      .setId(applicationAttemptId.getApplicationId().getId())
-      .build();
-
-    YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
-      .setAttemptId(applicationAttemptId.getAttemptId())
-      .setApplicationId(appIdProto)
-      .build();
-
-    return ContainerProtocol.TajoContainerIdProto.newBuilder()
-      .setAppAttemptId(attemptIdProto)
-      .setAppId(appIdProto)
-      .setId(id)
-      .build();
-  }
-
-  public static ContainerProtocol.TajoContainerIdProto getContainerIdProto(TajoContainerId containerId) {
-    if(containerId instanceof TajoWorkerContainerId) {
-      return ((TajoWorkerContainerId)containerId).getProto();
-    } else {
-      YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
-        .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
-        .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
-        .build();
-
-      YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
-        .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
-        .setApplicationId(appIdProto)
-        .build();
-
-      return ContainerProtocol.TajoContainerIdProto.newBuilder()
-        .setAppAttemptId(attemptIdProto)
-        .setAppId(appIdProto)
-        .setId(containerId.getId())
-        .build();
-    }
-  }
-
-  @Override
-  protected void build() {
-
-  }
-}


Mime
View raw message