tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [02/24] tajo git commit: TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)
Date Sat, 30 May 2015 04:05:04 GMT
TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)

Closes #577


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/25bd5cb4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/25bd5cb4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/25bd5cb4

Branch: refs/heads/index_support
Commit: 25bd5cb44a03ee425b02e2bc2553f7d0f8affff5
Parents: 4b2ab61
Author: Jinho Kim <jhkim@apache.org>
Authored: Tue May 26 16:46:17 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Tue May 26 16:46:17 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   9 +-
 tajo-common/src/main/proto/tajo_protos.proto    |   6 +
 .../tajo/master/rm/TajoResourceTracker.java     |   9 +
 .../resource/DefaultResourceCalculator.java     | 109 ++++++++
 .../org/apache/tajo/resource/NodeResource.java  | 188 +++++++++++++
 .../org/apache/tajo/resource/NodeResources.java | 195 +++++++++++++
 .../tajo/resource/ResourceCalculator.java       | 169 ++++++++++++
 .../apache/tajo/worker/NodeResourceManager.java | 148 ++++++++++
 .../apache/tajo/worker/NodeStatusUpdater.java   | 274 +++++++++++++++++++
 .../tajo/worker/WorkerHeartbeatService.java     |   1 +
 .../worker/event/NodeResourceAllocateEvent.java |  46 ++++
 .../event/NodeResourceDeallocateEvent.java      |  40 +++
 .../worker/event/NodeResourceManagerEvent.java  |  34 +++
 .../tajo/worker/event/NodeStatusEvent.java      |  40 +++
 .../main/proto/ResourceTrackerProtocol.proto    |  27 ++
 .../src/main/proto/TajoWorkerProtocol.proto     |  16 ++
 .../org/apache/tajo/resource/TestResources.java |  48 ++++
 .../tajo/worker/MockNodeStatusUpdater.java      | 105 +++++++
 .../tajo/worker/TestNodeResourceManager.java    | 235 ++++++++++++++++
 .../tajo/worker/TestNodeStatusUpdater.java      | 115 ++++++++
 .../java/org/apache/tajo/storage/DiskUtil.java  |   4 +-
 22 files changed, 1816 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0d71d2f..c79a185 100644
--- a/CHANGES
+++ b/CHANGES
@@ -310,6 +310,8 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)
+
     TAJO-1613: Rename StorageManager to Tablespace. (hyunsik)
 
     TAJO-1359: Add nested field projector and language extension to project 

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 59b1f43..e20658b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -171,9 +171,14 @@ public class TajoConf extends Configuration {
     WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false, Validators.bool()),
 
     // Tajo Worker Resources
-    WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1, Validators.min("1")),
+    WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores",
+        Runtime.getRuntime().availableProcessors(), Validators.min("1")),
     WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")),
+    @Deprecated
     WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
+    WORKER_RESOURCE_AVAILABLE_DISKS_NUM("tajo.worker.resource.disks.num", 1, Validators.min("1")),
+    WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2,
+        Validators.min("1")),
     WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
     WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()),
 
@@ -186,7 +191,7 @@ public class TajoConf extends Configuration {
     WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours
     QUERYMASTER_HISTORY_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 6 * 60), // 6 hours
 
-    WORKER_HEARTBEAT_TIMEOUT("tajo.worker.heartbeat.timeout", 120 * 1000),  // 120 sec
+    WORKER_HEARTBEAT_INTERVAL("tajo.worker.heartbeat.interval", 10 * 1000),  // 10 sec
 
     // Resource Manager
     RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager",

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index b6cd9ef..8474f54 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -62,4 +62,10 @@ message WorkerConnectionInfoProto {
     optional int32 queryMasterPort = 5;
     required int32 clientPort = 6;
     required int32 httpInfoPort = 7;
+}
+
+message NodeResourceProto {
+  optional int32 memory = 1;
+  optional int32 virtual_cores = 2;
+  optional int32 disks = 3;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 4f3b66a..af28886 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -20,10 +20,12 @@ package org.apache.tajo.master.rm;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.common.exception.NotImplementedException;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
@@ -182,6 +184,13 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     }
   }
 
+  @Override
+  public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request,
+                            RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) {
+    //TODO implement with ResourceManager for scheduler
+    throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage()));
+  }
+
   private Worker createWorkerResource(NodeHeartbeat request) {
     WorkerResource workerResource = new WorkerResource();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
new file mode 100644
index 0000000..58b8a26
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
@@ -0,0 +1,109 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tajo.resource;
+
+
+public class DefaultResourceCalculator extends ResourceCalculator {
+  
+  @Override
+  public int compare(NodeResource unused, NodeResource lhs, NodeResource rhs) {
+    return lhs.compareTo(rhs);
+  }
+
+  @Override
+  public int computeAvailableContainers(NodeResource available, NodeResource required) {
+    return Math.min(Math.min(
+        available.getMemory() / required.getMemory(),
+        available.getDisks() / required.getDisks()),
+        available.getVirtualCores() / required.getVirtualCores());
+  }
+
+  @Override
+  public float divide(NodeResource unused,
+                      NodeResource numerator, NodeResource denominator) {
+    return ratio(numerator, denominator);
+  }
+  
+  public boolean isInvalidDivisor(NodeResource r) {
+    if (r.getMemory() == 0.0f) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public float ratio(NodeResource a, NodeResource b) {
+    return (float)a.getMemory() / b.getMemory();
+  }
+
+  @Override
+  public NodeResource divideAndCeil(NodeResource numerator, int denominator) {
+    return NodeResources.createResource(
+        divideAndCeil(numerator.getMemory(), denominator));
+  }
+
+  @Override
+  public NodeResource normalize(NodeResource r, NodeResource minimumResource,
+                                NodeResource maximumResource, NodeResource stepFactor) {
+    int normalizedMemory = Math.min(
+        roundUp(
+            Math.max(r.getMemory(), minimumResource.getMemory()),
+            stepFactor.getMemory()),
+            maximumResource.getMemory());
+    return NodeResources.createResource(normalizedMemory);
+  }
+
+  @Override
+  public NodeResource normalize(NodeResource r, NodeResource minimumResource,
+                                NodeResource maximumResource) {
+    return normalize(r, minimumResource, maximumResource, minimumResource);
+  }
+
+  @Override
+  public NodeResource roundUp(NodeResource r, NodeResource stepFactor) {
+    return NodeResources.createResource(
+        roundUp(r.getMemory(), stepFactor.getMemory())
+    );
+  }
+
+  @Override
+  public NodeResource roundDown(NodeResource r, NodeResource stepFactor) {
+    return NodeResources.createResource(
+        roundDown(r.getMemory(), stepFactor.getMemory()));
+  }
+
+  @Override
+  public NodeResource multiplyAndNormalizeUp(NodeResource r, double by,
+      NodeResource stepFactor) {
+    return NodeResources.createResource(
+        roundUp((int) (r.getMemory() * by + 0.5), stepFactor.getMemory())
+    );
+  }
+
+  @Override
+  public NodeResource multiplyAndNormalizeDown(NodeResource r, double by,
+      NodeResource stepFactor) {
+    return NodeResources.createResource(
+        roundDown(
+            (int) (r.getMemory() * by),
+            stepFactor.getMemory()
+        )
+    );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
new file mode 100644
index 0000000..f51fc07
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.resource;
+
+import com.google.common.base.Objects;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * <p><code>NodeResource</code> models a set of computer resources in the
+ * cluster.</p>
+ * <p/>
+ * <p>Currently it models  <em>memory</em> and <em>disk</em> and <em>CPU</em>.</p>
+ * <p/>
+ * <p>The unit for memory is megabytes. The unit for disks is the number of disk.
+ * CPU is modeled with virtual cores (vcores), a unit for expressing parallelism.
+ * A node's capacity should be configured with virtual cores equal to its number of physical cores.
+ * A task should be requested with the number of cores it can saturate.</p>
+ * <p/>
+ */
+
+public class NodeResource implements ProtoObject<TajoProtos.NodeResourceProto>, Comparable<NodeResource> {
+
+  private volatile int memory;
+  private volatile int disks;
+  private volatile int vCores;
+
+  private static AtomicIntegerFieldUpdater MEMORY_UPDATER;
+  private static AtomicIntegerFieldUpdater DISKS_UPDATER;
+  private static AtomicIntegerFieldUpdater VCORES_UPDATER;
+
+  static {
+    MEMORY_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "memory");
+    if (MEMORY_UPDATER == null) {
+      MEMORY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "memory");
+      DISKS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "disks");
+      VCORES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "vCores");
+    } else {
+      DISKS_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "disks");
+      VCORES_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "vCores");
+    }
+  }
+
+  public NodeResource(TajoProtos.NodeResourceProto proto) {
+    setMemory(proto.getMemory());
+    setDisks(proto.getDisks());
+    setVirtualCores(proto.getVirtualCores());
+  }
+
+  private NodeResource() {
+
+  }
+
+  public static NodeResource createResource(int memory,  int disks, int vCores) {
+    return new NodeResource().setMemory(memory).setDisks(disks).setVirtualCores(vCores);
+  }
+
+  /**
+   * Get <em>memory</em> of the resource.
+   *
+   * @return <em>memory</em> of the resource
+   */
+  public int getMemory() {
+    return memory;
+  }
+
+  /**
+   * Set <em>memory</em> of the resource.
+   *
+   * @param memory <em>memory</em> of the resource
+   */
+  @SuppressWarnings("unchecked")
+  public NodeResource setMemory(int memory) {
+    MEMORY_UPDATER.lazySet(this, memory);
+    return this;
+  }
+
+
+  /**
+   * Get <em>number of disks</em> of the resource.
+   *
+   * @return <em>number of disks</em> of the resource
+   */
+  public int getDisks() {
+    return disks;
+  }
+
+  /**
+   * Set <em>number of disks </em> of the resource.
+   *
+   * @param disks <em>number of disks</em> of the resource
+   */
+  @SuppressWarnings("unchecked")
+  public NodeResource setDisks(int disks) {
+    DISKS_UPDATER.lazySet(this, disks);
+    return this;
+  }
+
+  /**
+   * Get <em>number of virtual cpu cores</em> of the resource.
+   * Virtual cores are a unit for expressing CPU parallelism. A node's capacity
+   * should be configured with virtual cores equal to its number of physical cores.
+   *
+   * @return <em>num of virtual cpu cores</em> of the resource
+   */
+  public int getVirtualCores() {
+    return vCores;
+  }
+
+
+  /**
+   * Set <em>number of virtual cpu cores</em> of the resource.
+   *
+   * @param vCores <em>number of virtual cpu cores</em> of the resource
+   */
+  @SuppressWarnings("unchecked")
+  public NodeResource setVirtualCores(int vCores) {
+    VCORES_UPDATER.lazySet(this, vCores);
+    return this;
+  }
+
+  @Override
+  public TajoProtos.NodeResourceProto getProto() {
+    TajoProtos.NodeResourceProto.Builder builder = TajoProtos.NodeResourceProto.newBuilder();
+    builder.setMemory(memory)
+        .setDisks(disks)
+        .setVirtualCores(vCores);
+    return builder.build();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(getMemory(), getDisks(), getVirtualCores());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof NodeResource))
+      return false;
+    NodeResource other = (NodeResource) obj;
+    if (getMemory() != other.getMemory() ||
+        getDisks() != other.getDisks() ||
+        getVirtualCores() != other.getVirtualCores()) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int compareTo(NodeResource other) {
+    int diff = this.getMemory() - other.getMemory();
+    if (diff == 0) {
+      diff = this.getDisks() - other.getDisks();
+    }
+    if (diff == 0) {
+      diff = this.getVirtualCores() - other.getVirtualCores();
+    }
+    return diff;
+  }
+
+  @Override
+  public String toString() {
+    return "<memory:" + getMemory() + ", disks:" + getDisks() + ", vCores:" + getVirtualCores() + ">";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
new file mode 100644
index 0000000..01e9dcf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java
@@ -0,0 +1,195 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tajo.resource;
+
+
+public class NodeResources {
+
+  public static NodeResource createResource(int memory) {
+    return createResource(memory, 0);
+  }
+
+  public static NodeResource createResource(int memory, int disks) {
+    return NodeResource.createResource(memory, disks, (memory > 0) ? 1 : 0);
+  }
+
+  public static NodeResource createResource(int memory, int disks, int vCores) {
+    return NodeResource.createResource(memory, disks, vCores);
+  }
+
+  public static NodeResource clone(NodeResource res) {
+    return NodeResource.createResource(res.getMemory(), res.getDisks(), res.getVirtualCores());
+  }
+
+  public static NodeResource update(NodeResource lhs, NodeResource rhs) {
+    return lhs.setMemory(rhs.getMemory()).setDisks(rhs.getDisks()).setVirtualCores(rhs.getVirtualCores());
+  }
+
+  public static NodeResource addTo(NodeResource lhs, NodeResource rhs) {
+    lhs.setMemory(lhs.getMemory() + rhs.getMemory())
+        .setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores())
+        .setDisks(lhs.getDisks() + rhs.getDisks());
+    return lhs;
+  }
+
+  public static NodeResource add(NodeResource lhs, NodeResource rhs) {
+    return addTo(clone(lhs), rhs);
+  }
+
+  public static NodeResource subtractFrom(NodeResource lhs, NodeResource rhs) {
+    lhs.setMemory(lhs.getMemory() - rhs.getMemory())
+        .setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores())
+        .setDisks(lhs.getDisks() - rhs.getDisks());
+    return lhs;
+  }
+
+  public static NodeResource subtract(NodeResource lhs, NodeResource rhs) {
+    return subtractFrom(clone(lhs), rhs);
+  }
+
+  public static NodeResource multiplyTo(NodeResource lhs, double by) {
+    lhs.setMemory((int) (lhs.getMemory() * by))
+        .setVirtualCores((int) (lhs.getVirtualCores() * by))
+        .setDisks((int) (lhs.getDisks() * by));
+    return lhs;
+  }
+
+  public static NodeResource multiply(NodeResource lhs, double by) {
+    return multiplyTo(clone(lhs), by);
+  }
+  
+  public static NodeResource multiplyAndNormalizeUp(
+      ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) {
+    return calculator.multiplyAndNormalizeUp(lhs, by, factor);
+  }
+
+  public static NodeResource multiplyAndNormalizeDown(
+      ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) {
+    return calculator.multiplyAndNormalizeDown(lhs, by, factor);
+  }
+
+  public static NodeResource multiplyAndRoundDown(NodeResource lhs, double by) {
+    NodeResource out = clone(lhs);
+    out.setMemory((int)(lhs.getMemory() * by));
+    out.setDisks((int)(lhs.getDisks() * by));
+    out.setVirtualCores((int)(lhs.getVirtualCores() * by));
+    return out;
+  }
+
+  public static NodeResource normalize(
+      ResourceCalculator calculator, NodeResource lhs, NodeResource min,
+      NodeResource max, NodeResource increment) {
+    return calculator.normalize(lhs, min, max, increment);
+  }
+
+  public static NodeResource roundUp(
+      ResourceCalculator calculator, NodeResource lhs, NodeResource factor) {
+    return calculator.roundUp(lhs, factor);
+  }
+
+  public static NodeResource roundDown(
+      ResourceCalculator calculator, NodeResource lhs, NodeResource factor) {
+    return calculator.roundDown(lhs, factor);
+  }
+
+  public static boolean isInvalidDivisor(
+      ResourceCalculator resourceCalculator, NodeResource divisor) {
+    return resourceCalculator.isInvalidDivisor(divisor);
+  }
+
+  public static float ratio(
+      ResourceCalculator resourceCalculator, NodeResource lhs, NodeResource rhs) {
+    return resourceCalculator.ratio(lhs, rhs);
+  }
+
+  public static float divide(
+      ResourceCalculator resourceCalculator,
+      NodeResource clusterResource, NodeResource lhs, NodeResource rhs) {
+    return resourceCalculator.divide(clusterResource, lhs, rhs);
+  }
+
+  public static NodeResource divideAndCeil(
+      ResourceCalculator resourceCalculator, NodeResource lhs, int rhs) {
+    return resourceCalculator.divideAndCeil(lhs, rhs);
+  }
+
+  public static boolean equals(NodeResource lhs, NodeResource rhs) {
+    return lhs.equals(rhs);
+  }
+
+  public static boolean lessThan(
+      ResourceCalculator resourceCalculator,
+      NodeResource clusterResource,
+      NodeResource lhs, NodeResource rhs) {
+    return (resourceCalculator.compare(clusterResource, lhs, rhs) < 0);
+  }
+
+  public static boolean lessThanOrEqual(
+      ResourceCalculator resourceCalculator,
+      NodeResource clusterResource,
+      NodeResource lhs, NodeResource rhs) {
+    return (resourceCalculator.compare(clusterResource, lhs, rhs) <= 0);
+  }
+
+  public static boolean greaterThan(
+      ResourceCalculator resourceCalculator,
+      NodeResource clusterResource,
+      NodeResource lhs, NodeResource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) > 0;
+  }
+
+  public static boolean greaterThanOrEqual(
+      ResourceCalculator resourceCalculator,
+      NodeResource clusterResource,
+      NodeResource lhs, NodeResource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0;
+  }
+
+  public static NodeResource min(
+      ResourceCalculator resourceCalculator,
+      NodeResource clusterResource,
+      NodeResource lhs, NodeResource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) <= 0 ? lhs : rhs;
+  }
+
+  public static NodeResource max(
+      ResourceCalculator resourceCalculator,
+      NodeResource clusterResource,
+      NodeResource lhs, NodeResource rhs) {
+    return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs;
+  }
+
+  public static boolean fitsIn(NodeResource smaller, NodeResource bigger) {
+    return smaller.getMemory() <= bigger.getMemory() &&
+        smaller.getDisks() <= bigger.getDisks() &&
+        smaller.getVirtualCores() <= bigger.getVirtualCores();
+  }
+
+  public static NodeResource componentwiseMin(NodeResource lhs, NodeResource rhs) {
+    return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
+        Math.min(lhs.getDisks(), rhs.getDisks()),
+        Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
+  }
+
+  public static NodeResource componentwiseMax(NodeResource lhs, NodeResource rhs) {
+    return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
+        Math.max(lhs.getDisks(), rhs.getDisks()),
+        Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java
new file mode 100644
index 0000000..b08228f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java
@@ -0,0 +1,169 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.tajo.resource;
+
+
+/**
+ * A set of {@link NodeResource} comparison and manipulation interfaces.
+ */
+
+public abstract class ResourceCalculator {
+
+  public abstract int 
+  compare(NodeResource clusterResource, NodeResource lhs, NodeResource rhs);
+  
+  public static int divideAndCeil(int a, int b) {
+    if (b == 0) {
+      return 0;
+    }
+    return (a + (b - 1)) / b;
+  }
+
+  public static int roundUp(int a, int b) {
+    return divideAndCeil(a, b) * b;
+  }
+
+  public static int roundDown(int a, int b) {
+    return (a / b) * b;
+  }
+
+  /**
+   * Compute the number of containers which can be allocated given
+   * <code>available</code> and <code>required</code> resources.
+   * 
+   * @param available available resources
+   * @param required required resources
+   * @return number of containers which can be allocated
+   */
+  public abstract int computeAvailableContainers(
+      NodeResource available, NodeResource required);
+  /**
+   * Multiply resource <code>r</code> by factor <code>by</code> 
+   * and normalize up using step-factor <code>stepFactor</code>.
+   * 
+   * @param r resource to be multiplied
+   * @param by multiplier
+   * @param stepFactor factor by which to normalize up 
+   * @return resulting normalized resource
+   */
+  public abstract NodeResource multiplyAndNormalizeUp(
+      NodeResource r, double by, NodeResource stepFactor);
+  
+  /**
+   * Multiply resource <code>r</code> by factor <code>by</code> 
+   * and normalize down using step-factor <code>stepFactor</code>.
+   * 
+   * @param r resource to be multiplied
+   * @param by multiplier
+   * @param stepFactor factor by which to normalize down 
+   * @return resulting normalized resource
+   */
+  public abstract NodeResource multiplyAndNormalizeDown(
+      NodeResource r, double by, NodeResource stepFactor);
+  
+  /**
+   * Normalize resource <code>r</code> given the base 
+   * <code>minimumResource</code> and verify against max allowed
+   * <code>maximumResource</code>
+   * 
+   * @param r resource
+   * @param minimumResource step-factor
+   * @param maximumResource the upper bound of the resource to be allocated
+   * @return normalized resource
+   */
+  public NodeResource normalize(NodeResource r, NodeResource minimumResource,
+      NodeResource maximumResource) {
+    return normalize(r, minimumResource, maximumResource, minimumResource);
+  }
+
+  /**
+   * Normalize resource <code>r</code> given the base 
+   * <code>minimumResource</code> and verify against max allowed
+   * <code>maximumResource</code> using a step factor for hte normalization.
+   *
+   * @param r resource
+   * @param minimumResource minimum value
+   * @param maximumResource the upper bound of the resource to be allocated
+   * @param stepFactor the increment for resources to be allocated
+   * @return normalized resource
+   */
+  public abstract NodeResource normalize(NodeResource r, NodeResource minimumResource,
+                                     NodeResource maximumResource,
+                                     NodeResource stepFactor);
+
+
+  /**
+   * Round-up resource <code>r</code> given factor <code>stepFactor</code>.
+   * 
+   * @param r resource
+   * @param stepFactor step-factor
+   * @return rounded resource
+   */
+  public abstract NodeResource roundUp(NodeResource r, NodeResource stepFactor);
+  
+  /**
+   * Round-down resource <code>r</code> given factor <code>stepFactor</code>.
+   * 
+   * @param r resource
+   * @param stepFactor step-factor
+   * @return rounded resource
+   */
+  public abstract NodeResource roundDown(NodeResource r, NodeResource stepFactor);
+  
+  /**
+   * Divide resource <code>numerator</code> by resource <code>denominator</code>
+   * using specified policy (domination, average, fairness etc.); hence overall
+   * <code>clusterResource</code> is provided for context.
+   *  
+   * @param clusterResource cluster resources
+   * @param numerator numerator
+   * @param denominator denominator
+   * @return <code>numerator</code>/<code>denominator</code> 
+   *         using specific policy
+   */
+  public abstract float divide(
+      NodeResource clusterResource, NodeResource numerator, NodeResource denominator);
+  
+  /**
+   * Determine if a resource is not suitable for use as a divisor
+   * (will result in divide by 0, etc)
+   *
+   * @param r resource
+   * @return true if divisor is invalid (should not be used), false else
+   */
+  public abstract boolean isInvalidDivisor(NodeResource r);
+
+  /**
+   * Ratio of resource <code>a</code> to resource <code>b</code>.
+   * 
+   * @param a resource 
+   * @param b resource
+   * @return ratio of resource <code>a</code> to resource <code>b</code>
+   */
+  public abstract float ratio(NodeResource a, NodeResource b);
+
+  /**
+   * Divide-and-ceil <code>numerator</code> by <code>denominator</code>.
+   * 
+   * @param numerator numerator resource
+   * @param denominator denominator
+   * @return resultant resource
+   */
+  public abstract NodeResource divideAndCeil(NodeResource numerator, int denominator);
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
new file mode 100644
index 0000000..20eec6b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.storage.DiskUtil;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.apache.tajo.worker.event.NodeResourceManagerEvent;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceManagerEvent> {
+  private static final Log LOG = LogFactory.getLog(NodeResourceManager.class);
+
+  private final Dispatcher dispatcher;
+  private NodeResource totalResource;
+  private NodeResource availableResource;
+  private AtomicInteger allocatedSize;
+  private TajoConf tajoConf;
+
+  public NodeResourceManager(Dispatcher dispatcher){
+    super(NodeResourceManager.class.getName());
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
+    this.tajoConf = (TajoConf)conf;
+    this.totalResource = createWorkerResource(tajoConf);
+    this.availableResource = NodeResources.clone(totalResource);
+    this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this);
+    this.allocatedSize = new AtomicInteger();
+    super.serviceInit(conf);
+    LOG.info("Initialized NodeResourceManager for " + totalResource);
+  }
+
+  @Override
+  public void handle(NodeResourceManagerEvent event) {
+
+    if (event instanceof NodeResourceAllocateEvent) {
+      NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event;
+      BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder();
+      for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) {
+        NodeResource resource = new NodeResource(request.getResource());
+        if (allocate(resource)) {
+          allocatedSize.incrementAndGet();
+          //TODO send task event to taskExecutor
+        } else {
+          response.addCancellationTask(request);
+        }
+      }
+      allocateEvent.getCallback().run(response.build());
+
+    } else if (event instanceof NodeResourceDeallocateEvent) {
+      allocatedSize.decrementAndGet();
+      NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event;
+      release(deallocateEvent.getResource());
+
+      // send current resource to ResourceTracker
+      getDispatcher().getEventHandler().handle(
+          new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource()));
+    }
+  }
+
+  protected Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  protected NodeResource getTotalResource() {
+    return totalResource;
+  }
+
+  protected NodeResource getAvailableResource() {
+    return availableResource;
+  }
+
+  public int getAllocatedSize() {
+    return allocatedSize.get();
+  }
+
+  private boolean allocate(NodeResource resource) {
+    //TODO consider the jvm free memory
+    if (NodeResources.fitsIn(resource, availableResource)) {
+      NodeResources.subtractFrom(availableResource, resource);
+      return true;
+    }
+    return false;
+  }
+
+  private void release(NodeResource resource) {
+    NodeResources.addTo(availableResource, resource);
+  }
+
+  private NodeResource createWorkerResource(TajoConf conf) {
+    int memoryMb;
+
+    if (conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+      memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+    } else {
+      memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB),
+          conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB));
+    }
+
+    int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+    int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM);
+
+    int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize();
+    if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) {
+      disks = dataNodeStorageSize;
+    }
+
+    int diskParallels = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM);
+    return NodeResource.createResource(memoryMb, disks * diskParallels, vCores);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
new file mode 100644
index 0000000..84ac419
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.rpc.*;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+
+import java.net.ConnectException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*;
+
+/**
+ * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
+ */
+public class NodeStatusUpdater extends AbstractService implements EventHandler<NodeStatusEvent> {
+
+  private final static Log LOG = LogFactory.getLog(NodeStatusUpdater.class);
+
+  private TajoConf tajoConf;
+  private StatusUpdaterThread updaterThread;
+  private volatile boolean isStopped;
+  private volatile long heartBeatInterval;
+  private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue;
+  private final WorkerConnectionInfo connectionInfo;
+  private final NodeResourceManager nodeResourceManager;
+  private AsyncRpcClient rmClient;
+  private ServiceTracker serviceTracker;
+  private TajoResourceTrackerProtocolService.Interface resourceTracker;
+  private int queueingLimit;
+
+  public NodeStatusUpdater(WorkerConnectionInfo connectionInfo, NodeResourceManager resourceManager) {
+    super(NodeStatusUpdater.class.getSimpleName());
+    this.connectionInfo = connectionInfo;
+    this.nodeResourceManager = resourceManager;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
+    this.tajoConf = (TajoConf) conf;
+    this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue();
+    this.serviceTracker = ServiceTrackerFactory.get(tajoConf);
+    this.nodeResourceManager.getDispatcher().register(NodeStatusEvent.EventType.class, this);
+    this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL);
+    this.updaterThread = new StatusUpdaterThread();
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    // if resource changed over than 50%, send reports
+    this.queueingLimit = nodeResourceManager.getTotalResource().getVirtualCores() / 2;
+
+    updaterThread.start();
+    super.serviceStart();
+    LOG.info("NodeStatusUpdater started.");
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    this.isStopped = true;
+
+    synchronized (updaterThread) {
+      updaterThread.notifyAll();
+    }
+    super.serviceStop();
+    LOG.info("NodeStatusUpdater stopped.");
+  }
+
+  @Override
+  public void handle(NodeStatusEvent event) {
+    switch (event.getType()) {
+      case REPORT_RESOURCE:
+        heartBeatRequestQueue.add(event); //batch report to ResourceTracker
+        break;
+      case FLUSH_REPORTS:
+        heartBeatRequestQueue.add(event); //flush report to ResourceTracker
+        break;
+    }
+  }
+
+  public int getQueueSize() {
+    return heartBeatRequestQueue.size();
+  }
+
+  public int getQueueingLimit() {
+    return queueingLimit;
+  }
+
+  private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) {
+    NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
+    requestProto.setAvailableResource(resource.getProto());
+    requestProto.setWorkerId(connectionInfo.getId());
+    return requestProto.build();
+  }
+
+  private NodeHeartbeatRequestProto createHeartBeatReport() {
+    NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
+    requestProto.setWorkerId(connectionInfo.getId());
+    return requestProto.build();
+  }
+
+  private NodeHeartbeatRequestProto createNodeStatusReport() {
+    NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
+    requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto());
+    requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto());
+    requestProto.setWorkerId(connectionInfo.getId());
+    requestProto.setConnectionInfo(connectionInfo.getProto());
+
+    //TODO set node status to requestProto.setStatus()
+    return requestProto.build();
+  }
+
+  protected TajoResourceTrackerProtocolService.Interface newStub()
+      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+    RpcClientManager.cleanup(rmClient);
+
+    RpcClientManager rpcManager = RpcClientManager.getInstance();
+    rmClient = rpcManager.newClient(serviceTracker.getResourceTrackerAddress(),
+        TajoResourceTrackerProtocol.class, true, rpcManager.getRetries(),
+        rpcManager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+    return rmClient.getStub();
+  }
+
+  protected NodeHeartbeatResponseProto sendHeartbeat(NodeHeartbeatRequestProto requestProto)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException, ExecutionException {
+    if (resourceTracker == null) {
+      resourceTracker = newStub();
+    }
+
+    NodeHeartbeatResponseProto response = null;
+    try {
+      CallFuture<NodeHeartbeatResponseProto> callBack = new CallFuture<NodeHeartbeatResponseProto>();
+
+      resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack);
+      response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn(e.getMessage());
+    } catch (TimeoutException te) {
+      LOG.warn("Heartbeat response is being delayed.", te);
+    } catch (ExecutionException ee) {
+      LOG.warn("TajoMaster failure: " + ee.getMessage());
+      resourceTracker = null;
+      throw ee;
+    }
+    return response;
+  }
+
+  class StatusUpdaterThread extends Thread {
+
+    public StatusUpdaterThread() {
+      super("NodeStatusUpdater");
+    }
+
+    private int drain(Collection<NodeStatusEvent> buffer, int numElements,
+                          long timeout, TimeUnit unit) throws InterruptedException {
+
+      long deadline = System.nanoTime() + unit.toNanos(timeout);
+      int added = 0;
+      while (added < numElements) {
+        added += heartBeatRequestQueue.drainTo(buffer, numElements - added);
+        if (added < numElements) { // not enough elements immediately available; will have to wait
+          NodeStatusEvent e = heartBeatRequestQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
+          if (e == null) {
+            break; // we already waited enough, and there are no more elements in sight
+          }
+          buffer.add(e);
+          added++;
+
+          if (e.getType() == NodeStatusEvent.EventType.FLUSH_REPORTS) {
+            break;
+          }
+        }
+      }
+      return added;
+    }
+
+    /* Node sends a heartbeats with its resource and status periodically to master. */
+    @Override
+    public void run() {
+      NodeHeartbeatResponseProto lastResponse = null;
+      while (!isStopped && !Thread.interrupted()) {
+
+        try {
+          if (lastResponse != null) {
+            if (lastResponse.getCommand() == ResponseCommand.NORMAL) {
+              List<NodeStatusEvent> events = Lists.newArrayList();
+              try {
+                /* batch update to ResourceTracker */
+                drain(events, Math.max(queueingLimit, 1), heartBeatInterval, TimeUnit.MILLISECONDS);
+              } catch (InterruptedException e) {
+                break;
+              }
+
+              if (!events.isEmpty()) {
+                // send last available resource;
+                lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource()));
+              } else {
+                // send ping;
+                lastResponse = sendHeartbeat(createHeartBeatReport());
+              }
+
+            } else if (lastResponse.getCommand() == ResponseCommand.MEMBERSHIP) {
+              // Membership changed
+              lastResponse = sendHeartbeat(createNodeStatusReport());
+            } else if (lastResponse.getCommand() == ResponseCommand.ABORT_QUERY) {
+              //TODO abort failure queries
+            }
+          } else {
+            // Node registration on startup
+            lastResponse = sendHeartbeat(createNodeStatusReport());
+          }
+        } catch (NoSuchMethodException nsme) {
+          LOG.fatal(nsme.getMessage(), nsme);
+          Runtime.getRuntime().halt(1);
+        } catch (ClassNotFoundException cnfe) {
+          LOG.fatal(cnfe.getMessage(), cnfe);
+          Runtime.getRuntime().halt(1);
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+          if (!isStopped) {
+            synchronized (updaterThread) {
+              try {
+                updaterThread.wait(heartBeatInterval);
+              } catch (InterruptedException ie) {
+                // Do Nothing
+              }
+            }
+          }
+        }
+      }
+
+      LOG.info("Heartbeat Thread stopped.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index bd70d59..050e2b5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -47,6 +47,7 @@ import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
 /**
  * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
  */
+@Deprecated
 public class WorkerHeartbeatService extends AbstractService {
   /** class logger */
   private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
new file mode 100644
index 0000000..2f411e8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+
+import com.google.protobuf.RpcCallback;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto;
+
+public class NodeResourceAllocateEvent extends NodeResourceManagerEvent {
+
+  private BatchAllocationRequestProto request;
+  private RpcCallback<BatchAllocationResponseProto> callback;
+
+  public NodeResourceAllocateEvent(BatchAllocationRequestProto request,
+                                   RpcCallback<BatchAllocationResponseProto> callback) {
+    super(EventType.ALLOCATE);
+    this.callback = callback;
+    this.request = request;
+  }
+
+  public BatchAllocationRequestProto getRequest() {
+    return request;
+  }
+
+  public RpcCallback<BatchAllocationResponseProto> getCallback() {
+    return callback;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
new file mode 100644
index 0000000..a298d77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.resource.NodeResource;
+
+public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent {
+
+  private NodeResource resource;
+
+  public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) {
+    this(new NodeResource(proto));
+  }
+
+  public NodeResourceDeallocateEvent(NodeResource resource) {
+    super(EventType.DEALLOCATE);
+    this.resource = resource;
+  }
+
+  public NodeResource getResource() {
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
new file mode 100644
index 0000000..bcb3448
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.resource.NodeResource;
+
+public class NodeResourceManagerEvent extends AbstractEvent<NodeResourceManagerEvent.EventType> {
+  public enum EventType {
+    ALLOCATE,
+    DEALLOCATE
+  }
+
+  public NodeResourceManagerEvent(EventType eventType) {
+    super(eventType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
new file mode 100644
index 0000000..58ab74a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.resource.NodeResource;
+
+public class NodeStatusEvent extends AbstractEvent<NodeStatusEvent.EventType> {
+  private final NodeResource resource;
+
+  public enum EventType {
+    REPORT_RESOURCE,
+    FLUSH_REPORTS
+  }
+
+  public NodeStatusEvent(EventType eventType, NodeResource resource) {
+    super(eventType);
+    this.resource = resource;
+  }
+
+  public NodeResource getResource() {
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index 40aeab7..dffd8c9 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -25,15 +25,42 @@ option java_generate_equals_and_hash = true;
 import "QueryCoordinatorProtocol.proto";
 import "ContainerProtocol.proto";
 import "tajo_protos.proto";
+import "TajoIdProtos.proto";
 
 package hadoop.yarn;
 
+// deprecated
 message NodeHeartbeat {
   required WorkerConnectionInfoProto connectionInfo = 1;
   optional ServerStatusProto serverStatus = 2;
   optional string statusMessage = 3;
 }
 
+message NodeHeartbeatRequestProto {
+  required int32 workerId = 1;
+  optional NodeResourceProto totalResource = 2;
+  optional NodeResourceProto availableResource = 3;
+  optional WorkerConnectionInfoProto connectionInfo = 4;
+  optional NodeStatusProto status = 5;
+}
+
+message NodeHeartbeatResponseProto {
+  required ResponseCommand command = 1 [default = NORMAL];
+  repeated QueryIdProto queryId = 2;
+}
+
+enum ResponseCommand {
+  NORMAL = 1; //ping
+  MEMBERSHIP = 2; // request membership to worker node
+  ABORT_QUERY = 3; //query master failure
+  SHUTDOWN = 4; // black list
+}
+
+//TODO add node health information
+message NodeStatusProto {
+}
+
 service TajoResourceTrackerProtocolService {
   rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse);
+  rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index bf9bbde..2324596 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -116,6 +116,7 @@ message ExecutionBlockReport {
     repeated IntermediateEntryProto intermediateEntries = 5;
 }
 
+// deprecated
 message TaskResponseProto {
     required string id = 1;
     required QueryState status = 2;
@@ -161,6 +162,7 @@ message QueryExecutionRequestProto {
     optional StringProto logicalPlanJson = 6;
 }
 
+// deprecated
 message GetTaskRequestProto {
     required int32 workerId = 1;
     required TajoContainerIdProto containerId = 2;
@@ -198,6 +200,20 @@ message ExecutionBlockListProto {
     repeated ExecutionBlockIdProto executionBlockId = 1;
 }
 
+message TaskAllocationRequestProto {
+    required TaskRequestProto taskRequest = 1;
+    required NodeResourceProto resource = 2;
+}
+
+message BatchAllocationRequestProto {
+    required ExecutionBlockIdProto executionBlockId = 1;
+    repeated TaskAllocationRequestProto taskRequest = 2;
+}
+
+message BatchAllocationResponseProto {
+    repeated TaskAllocationRequestProto cancellationTask = 2;
+}
+
 service TajoWorkerProtocolService {
   rpc ping (TaskAttemptIdProto) returns (BoolProto);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java
new file mode 100644
index 0000000..eb0d732
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.resource;
+
+import org.junit.Test;
+
+import static org.apache.tajo.resource.NodeResources.componentwiseMin;
+import static org.apache.tajo.resource.NodeResources.createResource;
+import static org.apache.tajo.resource.NodeResources.fitsIn;
+import static org.junit.Assert.*;
+
+public class TestResources {
+  @Test
+  public void testFitsIn() {
+    assertTrue(fitsIn(createResource(512, 1, 1), createResource(1024, 2, 1)));
+    assertTrue(fitsIn(createResource(1024, 2, 1), createResource(1024, 2, 1)));
+    assertFalse(fitsIn(createResource(1024, 2, 1), createResource(512, 1, 1)));
+    assertFalse(fitsIn(createResource(512, 2, 1), createResource(1024, 1, 1)));
+    assertFalse(fitsIn(createResource(1024, 1, 1), createResource(512, 2, 1)));
+    assertFalse(fitsIn(createResource(512, 1, 2), createResource(512, 1, 1)));
+  }
+
+  @Test
+  public void testComponentwiseMin() {
+    assertEquals(createResource(1, 1),
+        componentwiseMin(createResource(1, 1), createResource(2, 2)));
+    assertEquals(createResource(1, 1),
+        componentwiseMin(createResource(2, 2), createResource(1, 1)));
+    assertEquals(createResource(1, 1),
+        componentwiseMin(createResource(1, 2), createResource(2, 1)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
new file mode 100644
index 0000000..2d7d0be
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*;
+
+public class MockNodeStatusUpdater extends NodeStatusUpdater {
+
+  private CountDownLatch barrier;
+  private Map<Integer, NodeResource> membership = Maps.newHashMap();
+  private Map<Integer, NodeResource> resources = Maps.newHashMap();
+  private MockResourceTracker resourceTracker;
+
+  public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo connectionInfo,
+                               NodeResourceManager resourceManager) {
+    super(connectionInfo, resourceManager);
+    this.barrier = barrier;
+    this.resourceTracker = new MockResourceTracker();
+  }
+
+  @Override
+  protected TajoResourceTrackerProtocolService.Interface newStub()
+      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+
+    return resourceTracker;
+  }
+
+  protected MockResourceTracker getResourceTracker() {
+    return resourceTracker;
+  }
+
+  class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface {
+    private NodeHeartbeatRequestProto lastRequest;
+
+    protected Map<Integer, NodeResource> getTotalResource() {
+      return membership;
+    }
+
+    protected Map<Integer, NodeResource> getAvailableResource() {
+      return membership;
+    }
+
+    protected NodeHeartbeatRequestProto getLastRequest() {
+      return lastRequest;
+    }
+
+    @Override
+    public void heartbeat(RpcController controller, NodeHeartbeat request,
+                          RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+
+    }
+
+    @Override
+    public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequestProto request,
+                              RpcCallback<NodeHeartbeatResponseProto> done) {
+
+      NodeHeartbeatResponseProto.Builder response = NodeHeartbeatResponseProto.newBuilder();
+      if (membership.containsKey(request.getWorkerId())) {
+        if (request.hasAvailableResource()) {
+          NodeResource resource = resources.get(request.getWorkerId());
+          NodeResources.update(resource, new NodeResource(request.getAvailableResource()));
+        }
+        done.run(response.setCommand(ResponseCommand.NORMAL).build());
+      } else {
+        if (request.hasConnectionInfo()) {
+          membership.put(request.getWorkerId(), new NodeResource(request.getTotalResource()));
+          resources.put(request.getWorkerId(), new NodeResource(request.getAvailableResource()));
+          done.run(response.setCommand(ResponseCommand.NORMAL).build());
+        } else {
+          done.run(response.setCommand(ResponseCommand.MEMBERSHIP).build());
+        }
+      }
+      lastRequest = request;
+      barrier.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
new file mode 100644
index 0000000..7407acc
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.junit.*;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+public class TestNodeResourceManager {
+
+  private NodeResourceManager resourceManager;
+  private MockNodeStatusUpdater statusUpdater;
+  private AsyncDispatcher dispatcher;
+  private int taskMemory;
+  private TajoConf conf;
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+    taskMemory = 512;
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
+        taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1);
+
+    dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+
+    resourceManager = new NodeResourceManager(dispatcher);
+    resourceManager.init(conf);
+    resourceManager.start();
+
+    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, resourceManager);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+  }
+
+  @After
+  public void tearDown() {
+    resourceManager.stop();
+    statusUpdater.stop();
+    dispatcher.stop();
+  }
+
+  @Test
+  public void testNodeResourceAllocateEvent() throws Exception {
+    int requestSize = 4;
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    requestProto.setExecutionBlockId(ebId.getProto());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    BatchAllocationResponseProto responseProto = callFuture.get();
+    assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    assertEquals(0, responseProto.getCancellationTaskCount());
+    assertEquals(requestSize, resourceManager.getAllocatedSize());
+  }
+
+
+  @Test
+  public void testNodeResourceCancellation() throws Exception {
+    int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+    int overSize = 10;
+
+    CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    requestProto.setExecutionBlockId(ebId.getProto());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+    BatchAllocationResponseProto responseProto = callFuture.get();
+
+    assertEquals(overSize, responseProto.getCancellationTaskCount());
+    assertEquals(requestSize, resourceManager.getAllocatedSize());
+  }
+
+  @Test
+  public void testNodeResourceDeallocateEvent() throws Exception {
+    int requestSize = 4;
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    requestProto.setExecutionBlockId(ebId.getProto());
+
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize));
+
+    dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    BatchAllocationResponseProto responseProto = callFuture.get();
+    assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+    assertEquals(0, responseProto.getCancellationTaskCount());
+    assertEquals(requestSize, resourceManager.getAllocatedSize());
+
+    //deallocate
+    for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) {
+      // direct invoke handler for testing
+      resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource()));
+    }
+    assertEquals(0, resourceManager.getAllocatedSize());
+    assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
+  }
+
+  @Test(timeout = 30000)
+  public void testParallelRequest() throws Exception {
+    final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
+    final int taskSize = 100000;
+    final AtomicInteger totalComplete = new AtomicInteger();
+    final AtomicInteger totalCanceled = new AtomicInteger();
+
+    final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    final Queue<TaskAllocationRequestProto> totalTasks = createTaskRequests(taskMemory, taskSize);
+
+    ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+    List<Future> futureList = Lists.newArrayList();
+
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < parallelCount; i++) {
+      futureList.add(executor.submit(new Runnable() {
+            @Override
+            public void run() {
+              int complete = 0;
+              while (true) {
+                TaskAllocationRequestProto task = totalTasks.poll();
+                if (task == null) break;
+
+
+                BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder();
+                requestProto.addTaskRequest(task);
+                requestProto.setExecutionBlockId(ebId.getProto());
+
+                CallFuture<BatchAllocationResponseProto> callFuture = new CallFuture<BatchAllocationResponseProto>();
+                dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
+                try {
+                  BatchAllocationResponseProto proto = callFuture.get();
+                  if (proto.getCancellationTaskCount() > 0) {
+                    totalTasks.addAll(proto.getCancellationTaskList());
+                    totalCanceled.addAndGet(proto.getCancellationTaskCount());
+                  } else {
+                    complete++;
+                    dispatcher.getEventHandler().handle(new NodeResourceDeallocateEvent(task.getResource()));
+                  }
+                } catch (Exception e) {
+                  fail(e.getMessage());
+                }
+              }
+              System.out.println(Thread.currentThread().getName() + " complete requests: " + complete);
+              totalComplete.addAndGet(complete);
+            }
+          })
+      );
+    }
+
+    for (Future future : futureList) {
+      future.get();
+    }
+
+    System.out.println(parallelCount + " Thread, completed requests: " + totalComplete.get() + ", canceled requests:"
+        + totalCanceled.get() + ", " + +(System.currentTimeMillis() - startTime) + " ms elapsed");
+    executor.shutdown();
+    assertEquals(taskSize, totalComplete.get());
+  }
+
+  protected static Queue<TaskAllocationRequestProto> createTaskRequests(int memory, int size) {
+    Queue<TaskAllocationRequestProto> requestProtoList = new LinkedBlockingQueue<TaskAllocationRequestProto>();
+    for (int i = 0; i < size; i++) {
+
+      ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+      TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0);
+
+      TajoWorkerProtocol.TaskRequestProto.Builder builder =
+          TajoWorkerProtocol.TaskRequestProto.newBuilder();
+      builder.setId(taskAttemptId.getProto());
+      builder.setShouldDie(true);
+      builder.setOutputTable("");
+      builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+      builder.setClusteredOutput(false);
+
+
+      requestProtoList.add(TaskAllocationRequestProto.newBuilder()
+          .setResource(NodeResources.createResource(memory).getProto())
+          .setTaskRequest(builder.build()).build());
+    }
+    return requestProtoList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
new file mode 100644
index 0000000..fb3c77e
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.NodeStatusEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import static org.junit.Assert.*;
+
+public class TestNodeStatusUpdater {
+
+  private NodeResourceManager resourceManager;
+  private MockNodeStatusUpdater statusUpdater;
+  private AsyncDispatcher dispatcher;
+  private TajoConf conf;
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+
+    conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000);
+    dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+
+    resourceManager = new NodeResourceManager(dispatcher);
+    resourceManager.init(conf);
+    resourceManager.start();
+  }
+
+  @After
+  public void tearDown() {
+    resourceManager.stop();
+    if (statusUpdater != null) statusUpdater.stop();
+    dispatcher.stop();
+  }
+
+  @Test(timeout = 20000)
+  public void testNodeMembership() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(1);
+    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+    statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+    barrier.await();
+
+    assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId()));
+    assertEquals(resourceManager.getTotalResource(),
+        resourceTracker.getTotalResource().get(worker.getId()));
+
+    assertEquals(resourceManager.getAvailableResource(),
+        resourceTracker.getAvailableResource().get(worker.getId()));
+  }
+
+  @Test(timeout = 20000)
+  public void testPing() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(2);
+    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+    statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker();
+    barrier.await();
+
+    TajoResourceTrackerProtocol.NodeHeartbeatRequestProto lastRequest = resourceTracker.getLastRequest();
+    assertTrue(lastRequest.hasWorkerId());
+    assertFalse(lastRequest.hasAvailableResource());
+    assertFalse(lastRequest.hasTotalResource());
+    assertFalse(lastRequest.hasConnectionInfo());
+  }
+
+  @Test(timeout = 20000)
+  public void testResourceReport() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(2);
+    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+    statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) {
+      dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE,
+          resourceManager.getAvailableResource()));
+    }
+    barrier.await();
+    assertEquals(0, statusUpdater.getQueueSize());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/25bd5cb4/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
index 0bcd5ec..19e08e8 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java
@@ -189,10 +189,10 @@ public class DiskUtil {
 	}
 
   public static int getDataNodeStorageSize(){
-    return getStorageDirs().size();
+    return getDataNodeStorageDirs().size();
   }
 
-  public static List<URI> getStorageDirs(){
+  public static List<URI> getDataNodeStorageDirs(){
     Configuration conf = new HdfsConfiguration();
     Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
     return Util.stringCollectionAsURIs(dirNames);


Mime
View raw message