hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1129868 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ yarn/yarn-...
Date Tue, 31 May 2011 19:29:23 GMT
Author: acmurthy
Date: Tue May 31 19:29:22 2011
New Revision: 1129868

URL: http://svn.apache.org/viewvc?rev=1129868&view=rev
Log:
Ensure 'lost' NodeManagers are dealt appropriately, the containers are released correctly. 

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 31 19:29:22 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Ensure 'lost' NodeManagers are dealt appropriately, the containers are
+    released correctly. (acmurthy) 
+
     Making pipes work with YARN. Changed pipes to get log-locations from an
     environmental variable. (vinodkv)
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java Tue May 31 19:29:22 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.api;
 
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/HeartbeatResponse.java Tue May 31 19:29:22 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.api.records;
 
 import java.util.List;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java Tue May 31 19:29:22 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.api.records;
 
 public interface NodeHealthStatus {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Tue May 31 19:29:22 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.api.records;
 
 import java.util.List;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/RegistrationResponse.java Tue May 31 19:29:22 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.api.records;
 
 import java.nio.ByteBuffer;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue May 31 19:29:22 2011
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.avro.AvroRuntimeException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
@@ -69,7 +70,7 @@ public class ResourceManager extends Com
 
   
   private ResourceScheduler scheduler;
-  private RMResourceTrackerImpl rmResourceTracker;
+  private ResourceTrackerService resourceTracker;
   private ClientRMService clientRM;
   private ApplicationMasterService masterService;
   private AdminService adminService;
@@ -141,11 +142,13 @@ public class ResourceManager extends Com
     applicationsManager = createApplicationsManagerImpl();
     addService(applicationsManager);
     
-    rmResourceTracker = createRMResourceTracker();
-    addService(rmResourceTracker);
+    resourceTracker = createResourceTrackerService();
+    addService(resourceTracker);
   
     try {
-      this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager, rmResourceTracker);
+      this.scheduler.reinitialize(this.conf, 
+          this.containerTokenSecretManager, 
+          resourceTracker.getResourceTracker());
     } catch (IOException ioe) {
       throw new RuntimeException("Failed to initialize scheduler", ioe);
     }
@@ -211,8 +214,10 @@ public class ResourceManager extends Com
     super.stop();
   }
   
-  protected RMResourceTrackerImpl createRMResourceTracker() {
-    return new RMResourceTrackerImpl(this.containerTokenSecretManager, this.rmContext);
+  protected ResourceTrackerService createResourceTrackerService() {
+    return new ResourceTrackerService(
+        new RMResourceTrackerImpl(this.containerTokenSecretManager, 
+            this.rmContext));
   }
   
   protected ApplicationsManagerImpl createApplicationsManagerImpl() {
@@ -221,7 +226,8 @@ public class ResourceManager extends Com
   }
 
   protected ClientRMService createClientRMService() {
-    return new ClientRMService(applicationsManager, rmResourceTracker, scheduler);
+    return new ClientRMService(applicationsManager, 
+        resourceTracker.getResourceTracker(), scheduler);
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
@@ -239,6 +245,7 @@ public class ResourceManager extends Com
    * return applications manager.
    * @return
    */
+  @Private
   public ApplicationsManager getApplicationsManager() {
     return applicationsManager;
   }
@@ -247,6 +254,7 @@ public class ResourceManager extends Com
    * return the scheduler.
    * @return
    */
+  @Private
   public ResourceScheduler getResourceScheduler() {
     return this.scheduler;
   }
@@ -255,15 +263,16 @@ public class ResourceManager extends Com
    * return the resource tracking component.
    * @return
    */
+  @Private
   public RMResourceTrackerImpl getResourceTracker() {
-    return this.rmResourceTracker;
+    return this.resourceTracker.getResourceTracker();
   }
   
 
   @Override
   public void recover(RMState state) throws Exception {
     applicationsManager.recover(state);
-    rmResourceTracker.recover(state);
+    resourceTracker.recover(state);
     scheduler.recover(state);
   }
   

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1129868&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue May 31 19:29:22 2011
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class ResourceTrackerService extends AbstractService 
+implements ResourceTracker{
+
+  private static final Log LOG = LogFactory.getLog(ResourceTrackerService.class);
+
+  private static final RecordFactory recordFactory = 
+    RecordFactoryProvider.getRecordFactory(null);
+
+  private final RMResourceTrackerImpl resourceTracker;
+
+  private Server server;
+  private InetSocketAddress resourceTrackerAddress;
+
+  public ResourceTrackerService(RMResourceTrackerImpl resourceTracker) {
+    super(ResourceTrackerService.class.getName());
+    this.resourceTracker = resourceTracker;
+  }
+
+  public RMResourceTrackerImpl getResourceTracker() {
+    return resourceTracker;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+    String resourceTrackerBindAddress =
+      conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
+          YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
+    resourceTrackerAddress = NetUtils.createSocketAddr(resourceTrackerBindAddress);
+    resourceTracker.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
+    // ResourceTrackerServer authenticates NodeManager via Kerberos if
+    // security is enabled, so no secretManager.
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    Configuration rtServerConf = new Configuration(getConfig());
+    rtServerConf.setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        RMNMSecurityInfoClass.class, SecurityInfo.class);
+    this.server =
+      rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
+          rtServerConf, null,
+          rtServerConf.getInt(RMConfig.RM_RESOURCE_TRACKER_THREADS, 
+              RMConfig.DEFAULT_RM_RESOURCE_TRACKER_THREADS));
+    this.server.start();
+
+    resourceTracker.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    resourceTracker.stop();
+    if (this.server != null) {
+      this.server.close();
+    }
+    super.stop();
+  }
+
+  @Override
+  public RegisterNodeManagerResponse registerNodeManager(
+      RegisterNodeManagerRequest request) throws YarnRemoteException {
+    RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+        RegisterNodeManagerResponse.class);
+    try {
+      response.setRegistrationResponse(
+          resourceTracker.registerNodeManager(
+              request.getHost(), request.getContainerManagerPort(), 
+              request.getHttpPort(), request.getResource()));
+    } catch (IOException ioe) {
+      LOG.info("Exception in node registration from " + request.getHost(), ioe);
+      throw RPCUtil.getRemoteException(ioe);
+    }
+    return response;
+  }
+
+  @Override
+  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+      throws YarnRemoteException {
+    NodeHeartbeatResponse response = recordFactory.newRecordInstance(
+        NodeHeartbeatResponse.class);
+    try {
+    response.setHeartbeatResponse(
+        resourceTracker.nodeHeartbeat(request.getNodeStatus()));
+    } catch (IOException ioe) {
+      LOG.info("Exception in heartbeat from node " + 
+          request.getNodeStatus().getNodeId(), ioe);
+      throw RPCUtil.getRemoteException(ioe);
+    }
+    return response;
+  }
+
+  public void recover(RMState state) {
+    resourceTracker.recover(state);
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java Tue May 31 19:29:22 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.net.Node;
@@ -95,15 +96,37 @@ public interface NodeInfo {
       List<Container> containers);
   
   /**
-   * 
-   * @return
+   * Get running containers on this node.
+   * @return running containers
+   */
+  public List<Container> getRunningContainers();
+  
+  /**
+   * Get application which has a reserved container on this node.
+   * @return application which has a reserved container on this node
    */
   public Application getReservedApplication();
 
+  /**
+   * Get reserved resource on this node.
+   * @return reserved resource on this node
+   */
   Resource getReservedResource();
 
+  /**
+   * Reserve resources on this node for a given application
+   * @param application application for which to reserve
+   * @param priority application priority
+   * @param resource reserved resource
+   */
   public void reserveResource(Application application, Priority priority, 
       Resource resource);
 
+  /**
+   * Unreserve resource on this node for a given application
+   * @param application application for which to unreserve
+   * @param priority application priority
+   */
   public void unreserveResource(Application application, Priority priority);
+  
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java Tue May 31 19:29:22 2011
@@ -65,11 +65,11 @@ public class NodeInfoTracker {
     return this.lastHeartBeatResponse;
   }
 
-  public synchronized void refreshHeartBeatResponse(HeartbeatResponse heartBeatResponse) {
+  public synchronized void setLastHeartBeatResponse(HeartbeatResponse heartBeatResponse) {
     this.lastHeartBeatResponse = heartBeatResponse;
   }
   
-  public synchronized void refreshLastHeartBeat() {
+  public synchronized void setLastHeartBeatTime() {
    this.heartBeatStatus = new NodeHeartbeatStatus(System.currentTimeMillis()); 
   }
 }
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java?rev=1129868&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java Tue May 31 19:29:22 2011
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+
+public interface NodeTracker {
+  
+  public RegistrationResponse registerNodeManager(
+      String hostName, int cmPort, int httpPort, Resource capability) 
+  throws IOException;
+  
+  public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus) 
+  throws IOException;
+  
+  public void unregisterNodeManager(NodeId nodeId) throws IOException;
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Tue May 31 19:29:22 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -31,34 +30,20 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.crypto.SecretKey;
 
-import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
-import org.apache.hadoop.yarn.server.YarnServerConfig;
-import org.apache.hadoop.yarn.server.api.ResourceTracker;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
@@ -81,7 +66,7 @@ import org.apache.hadoop.yarn.service.Ab
  *`
  */
 public class RMResourceTrackerImpl extends AbstractService implements 
-ResourceTracker, ClusterTracker {
+NodeTracker, ClusterTracker {
   private static final Log LOG = LogFactory.getLog(RMResourceTrackerImpl.class);
   /* we dont garbage collect on nodes. A node can come back up again and re register,
    * so no use garbage collecting. Though admin can break the RM by bouncing 
@@ -113,13 +98,10 @@ ResourceTracker, ClusterTracker {
     );
 
   private ResourceListener resourceListener;
-  private InetSocketAddress resourceTrackerAddress;
-  private Server server;
   private final ContainerTokenSecretManager containerTokenSecretManager;
   private final AtomicInteger nodeCounter = new AtomicInteger(0);
   private static final HeartbeatResponse reboot = recordFactory.newRecordInstance(HeartbeatResponse.class);
   private long nmExpiryInterval;
-  private final RMContext rmContext;
   private final NodeStore nodeStore;
   
   public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager, RMContext context) {
@@ -127,16 +109,11 @@ ResourceTracker, ClusterTracker {
     reboot.setReboot(true);
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.nmLivelinessMonitor = new NMLivelinessMonitor();
-    this.rmContext = context;
     this.nodeStore = context.getNodeStore();
   }
 
   @Override
   public void init(Configuration conf) {
-    String resourceTrackerBindAddress =
-      conf.get(YarnServerConfig.RESOURCETRACKER_ADDRESS,
-          YarnServerConfig.DEFAULT_RESOURCETRACKER_BIND_ADDRESS);
-    resourceTrackerAddress = NetUtils.createSocketAddr(resourceTrackerBindAddress);
     this.nmExpiryInterval =  conf.getLong(RMConfig.NM_EXPIRY_INTERVAL, 
         RMConfig.DEFAULT_NM_EXPIRY_INTERVAL);
     this.nmLivelinessMonitor.setMonitoringInterval(conf.getLong(
@@ -152,19 +129,6 @@ ResourceTracker, ClusterTracker {
 
   @Override
   public void start() {
-    // ResourceTrackerServer authenticates NodeManager via Kerberos if
-    // security is enabled, so no secretManager.
-    YarnRPC rpc = YarnRPC.create(getConfig());
-    Configuration rtServerConf = new Configuration(getConfig());
-    rtServerConf.setClass(
-        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
-        RMNMSecurityInfoClass.class, SecurityInfo.class);
-    this.server =
-      rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
-          rtServerConf, null,
-          rtServerConf.getInt(RMConfig.RM_RESOURCE_TRACKER_THREADS, 
-              RMConfig.DEFAULT_RM_RESOURCE_TRACKER_THREADS));
-    this.server.start();
     this.nmLivelinessMonitor.start();
     LOG.info("Expiry interval of NodeManagers set to " + nmExpiryInterval);
     super.start();
@@ -180,7 +144,7 @@ ResourceTracker, ClusterTracker {
   }
   
   protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId,
-      String hostString, String httpAddress, Node node, Resource capability) throws IOException {
+      String hostString, String httpAddress, Node node, Resource capability) {
     NodeInfoTracker nTracker = null;
     
     synchronized(nodeManagers) {
@@ -191,9 +155,7 @@ ResourceTracker, ClusterTracker {
               node,
               capability);
         nodes.put(nodeManager.getNodeAddress(), nodeId);
-        nodeStore.storeNode(nodeManager);
-        /* Inform the listeners */
-        resourceListener.addNode(nodeManager);
+        addNode(nodeManager);
         HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
         response.setResponseId(0);
         nTracker = new NodeInfoTracker(nodeManager, response);
@@ -205,27 +167,30 @@ ResourceTracker, ClusterTracker {
     return nTracker;
   }
 
+  private void addNode(NodeManager node) {
+    /* Inform the listeners */
+    resourceListener.addNode(node);
+
+    try {
+      nodeStore.storeNode(node);
+    } catch (IOException ioe) {
+      LOG.warn("Failed to store node " + node.getNodeAddress() + " in nodestore");
+    }
+
+  }
   @Override
-  public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest
-      request) throws YarnRemoteException {
-    String host = request.getHost();
-    int cmPort = request.getContainerManagerPort();
+  public RegistrationResponse registerNodeManager(
+      String host, int cmPort, int httpPort, Resource capability) 
+  throws IOException {
     String node = host + ":" + cmPort;
-    String httpAddress = host + ":" + request.getHttpPort();
-    Resource capability = request.getResource();
+    String httpAddress = host + ":" + httpPort;
 
     NodeId nodeId = getNodeId(node);
     
     NodeInfoTracker nTracker = null;
-    try {
-    nTracker = getAndAddNodeInfoTracker(
-      nodeId, node.toString(), httpAddress,
-                resolve(node.toString()),
-                capability);
-          // Inform the scheduler
-    } catch(IOException io) {
-      throw  RPCUtil.getRemoteException(io);
-    }
+    nTracker = 
+      getAndAddNodeInfoTracker(nodeId, node, httpAddress, 
+          resolve(node), capability);
     addForTracking(nTracker.getlastHeartBeat());
     LOG.info("NodeManager from node " + node + "(web-url: " + httpAddress
         + ") registered with capability: " + capability.getMemory()
@@ -237,10 +202,7 @@ ResourceTracker, ClusterTracker {
     SecretKey secretKey =
       this.containerTokenSecretManager.createAndGetSecretKey(node);
     regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
-    RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
-        RegisterNodeManagerResponse.class);
-    response.setRegistrationResponse(regResponse);
-    return response;
+    return regResponse;
   }
 
   /**
@@ -268,88 +230,141 @@ ResourceTracker, ClusterTracker {
     return nodeManager.statusUpdate(containers);
   }
   
+  private boolean isValidNode(NodeId nodeId) {
+    return true;
+  }
+  
   @Override
-  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
-    org.apache.hadoop.yarn.server.api.records.NodeStatus remoteNodeStatus = request.getNodeStatus();
+  public HeartbeatResponse nodeHeartbeat(org.apache.hadoop.yarn.server.api.records.NodeStatus remoteNodeStatus) 
+  throws IOException {
+    /**
+     * Here is the node heartbeat sequence...
+     * 1. Check if it's a registered node
+     * 2. Check if it's a valid (i.e. not excluded) node
+     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
+     * 4. Check if it's healthy
+     *   a. If it just turned unhealthy, inform scheduler and ZK
+     *   b. It it just turned healthy, inform scheduler and ZK
+     * 5. If it's healthy, update node status and allow scheduler to schedule
+     */
+    
+    NodeId nodeId = remoteNodeStatus.getNodeId();
+    
+    // 1. Check if it's a registered node
     NodeInfoTracker nTracker = null;
-    NodeHeartbeatResponse nodeHbResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
     synchronized(nodeManagers) {
-      nTracker = nodeManagers.get(remoteNodeStatus.getNodeId());
+      nTracker = nodeManagers.get(nodeId);
     }
     if (nTracker == null) {
       /* node does not exist */
       LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
-      nodeHbResponse.setHeartbeatResponse(reboot);
-      return nodeHbResponse;
+      return reboot;
     }
     /* update the heart beat status */
-    nTracker.refreshLastHeartBeat();
+    nTracker.setLastHeartBeatTime();
+    
+    // 2. Check if it's a valid (i.e. not excluded) node
+    // TODO - Check for valid/invalid node from hosts list
+    if (!isValidNode(nodeId)) {
+      unregisterNodeManager(remoteNodeStatus.getNodeId());
+      throw new IOException("Disallowed NodeManager nodeId: " + 
+          remoteNodeStatus.getNodeId());
+    }
+    
+    // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     NodeManager nodeManager = nTracker.getNodeManager();
-    /* check to see if its an old heartbeat */    
     if (remoteNodeStatus.getResponseId() + 1 == nTracker
         .getLastHeartBeatResponse().getResponseId()) {
-      nodeHbResponse.setHeartbeatResponse(nTracker.getLastHeartBeatResponse());
-      return nodeHbResponse;
+      LOG.info("Received duplicate heartbeat from node " + 
+          nodeManager.getNodeAddress());
+      return nTracker.getLastHeartBeatResponse();
     } else if (remoteNodeStatus.getResponseId() + 1 < nTracker
         .getLastHeartBeatResponse().getResponseId()) {
       LOG.info("Too far behind rm response id:" +
           nTracker.lastHeartBeatResponse.getResponseId() + " nm response id:"
           + remoteNodeStatus.getResponseId());
-      nodeHbResponse.setHeartbeatResponse(reboot);
-      return nodeHbResponse;
+      unregisterNodeManager(remoteNodeStatus.getNodeId());
+      return reboot;
     }
     
+    // 4. Check if it's healthy
+    //   a. If it just turned unhealthy, inform scheduler and ZK
+    //   b. It it just turned healthy, inform scheduler and ZK
+    boolean prevHealthStatus =
+      nTracker.getNodeManager().getNodeHealthStatus().getIsNodeHealthy();
+    NodeHealthStatus currentNodeHealthStatus =
+      remoteNodeStatus.getNodeHealthStatus();
+    if (prevHealthStatus != currentNodeHealthStatus
+        .getIsNodeHealthy()) {
+      if (!currentNodeHealthStatus.getIsNodeHealthy()) {
+        // Node turned unhealthy
+        LOG.info("Node " + nodeManager.getNodeID()
+            + " has become unhealthy. Health-check report: "
+            + currentNodeHealthStatus.getHealthReport() + "."
+            + " Removing it from the scheduler.");
+        removeNode(nodeManager);
+      } else {
+        // Node turned healthy
+        LOG.info("Node " + nodeManager.getNodeID() +
+            " has become healthy back again. Health-check report: " +
+            remoteNodeStatus.getNodeHealthStatus().getHealthReport()  + 
+            ". Adding it to the scheduler.");
+        addNode(nodeManager);
+      }
+    }
+
     /** TODO This should be 3 step process.
      * nodemanager.statusupdate
      * listener.update()
      * nodemanager.getNodeResponse()
      * This will allow flexibility in updates/scheduling/premption
      */
-    NodeResponse nodeResponse = nodeManager.statusUpdate(remoteNodeStatus.getAllContainers());
-    /* inform any listeners of node heartbeats */
-    updateListener(
-        nodeManager, remoteNodeStatus.getAllContainers());
-  
-    
-    HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
-    response.addAllContainersToCleanup(nodeResponse.getContainersToCleanUp());
 
-    response.addAllApplicationsToCleanup(nodeResponse.getFinishedApplications());
+    // Heartbeat response
+    HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
     response.setResponseId(nTracker.getLastHeartBeatResponse().getResponseId() + 1);
-    
-    nTracker.refreshHeartBeatResponse(response);
-    boolean prevHealthStatus =
-      nTracker.getNodeManager().getNodeHealthStatus().getIsNodeHealthy();
-    NodeHealthStatus remoteNodeHealthStatus =
-      remoteNodeStatus.getNodeHealthStatus();
-    nTracker.getNodeManager().updateHealthStatus(
-        remoteNodeHealthStatus);
-    //    boolean prevHealthStatus = nodeHbResponse.
-    nodeHbResponse.setHeartbeatResponse(response);
 
-    // Take care of node-health
-    if (prevHealthStatus != remoteNodeHealthStatus
-        .getIsNodeHealthy()) {
-      // Node's health-status changed.
-      if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
-        // TODO: Node has become unhealthy, remove?
-        //        LOG.info("Node " + nodeManager.getNodeID()
-        //            + " has become unhealthy. Health-check report: "
-        //            + remoteNodeStatus.nodeHealthStatus.healthReport
-        //            + "Removing it from the scheduler.");
-        //        resourceListener.removeNode(nodeManager);
-      } else {
-        // TODO: Node has become healthy back again, add?
-        //        LOG.info("Node " + nodeManager.getNodeID()
-        //            + " has become healthy back again. Health-check report: "
-        //            + remoteNodeStatus.nodeHealthStatus.healthReport
-        //            + " Adding it to the scheduler.");
-        //        this.resourceListener.addNode(nodeManager);
-      }
+    // 5. If it's healthy, update node status and allow scheduler to schedule
+    NodeResponse nodeResponse = null;
+    if (currentNodeHealthStatus.getIsNodeHealthy()) {
+      nodeResponse = 
+        nodeManager.statusUpdate(remoteNodeStatus.getAllContainers());
+      response.addAllContainersToCleanup(nodeResponse.getContainersToCleanUp());
+      response.addAllApplicationsToCleanup(nodeResponse.getFinishedApplications());
+      
+      /* inform any listeners of node heartbeats */
+      updateListener(nodeManager, remoteNodeStatus.getAllContainers());
+    }
+  
+    // Save the response    
+    nTracker.setLastHeartBeatResponse(response);
+    nTracker.getNodeManager().updateHealthStatus(currentNodeHealthStatus);
+
+    return response;
+  }
+
+  @Override
+  public void unregisterNodeManager(NodeId nodeId) {
+    synchronized (nodeManagers) {
+      NodeManager node = nodeManagers.get(nodeId).getNodeManager();
+      removeNode(node);
+      nodeManagers.remove(nodeId);
+      nodes.remove(node.getNodeAddress());
     }
-    return nodeHbResponse;
   }
 
+  private void removeNode(NodeManager node) {
+    synchronized (nodeManagers) {
+      resourceListener.removeNode(node);
+      try {
+        nodeStore.removeNode(node);
+      } catch (IOException ioe) {
+        LOG.warn("Failed to remove node " + node.getNodeAddress() + 
+            " from nodeStore", ioe);
+      }
+    }
+  }
+  
   @Private
   public synchronized NodeInfo getNodeManager(NodeId nodeId) {
     NodeInfoTracker ntracker = nodeManagers.get(nodeId);
@@ -383,9 +398,6 @@ ResourceTracker, ClusterTracker {
       LOG.info(this.nmLivelinessMonitor.getName() + " interrupted during join ",
           ie);
     }
-    if (this.server != null) {
-      this.server.close();
-    }
     super.stop();
   }
 
@@ -408,11 +420,7 @@ ResourceTracker, ClusterTracker {
 
   protected void expireNMs(List<NodeId> nodes) {
     for (NodeId id: nodes) {
-      synchronized (nodeManagers) {
-        NodeInfo nInfo = nodeManagers.get(id).getNodeManager();
-        nodeManagers.remove(id);
-        resourceListener.removeNode(nInfo);
-      }
+      unregisterNodeManager(id);
     }
   }
 
@@ -522,12 +530,8 @@ ResourceTracker, ClusterTracker {
   public void recover(RMState state) {
     List<NodeManager> nodeManagers = state.getStoredNodeManagers();
     for (NodeManager nm: nodeManagers) {
-      try {
-        getAndAddNodeInfoTracker(nm.getNodeID(), nm.getNodeAddress(), nm.getHttpAddress(), 
-          nm.getNode(), nm.getTotalCapability());
-      } catch(IOException ie) {
-        //ignore
-      }
+      getAndAddNodeInfoTracker(nm.getNodeID(), nm.getNodeAddress(), 
+          nm.getHttpAddress(), nm.getNode(), nm.getTotalCapability());
     }
     for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
       List<Container> containers = entry.getValue().getContainers();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Tue May 31 19:29:22 2011
@@ -278,6 +278,14 @@ public class NodeManagerImpl implements 
     return this.usedResource;
   }
 
+  
+  @Override
+  public List<Container> getRunningContainers() {
+    List<Container> containers = new ArrayList<Container>();
+    containers.addAll(runningContainers.values());
+    return containers;
+  }
+
   public synchronized void addAvailableResource(Resource resource) {
     if (resource == null) {
       LOG.error("Invalid resource addition of null resource for "

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue May 31 19:29:22 2011
@@ -357,37 +357,33 @@ implements ResourceScheduler, CapacitySc
     // Sanity check
     normalizeRequests(ask);
 
-    List<Container> allocatedContainers = null;
-    Resource limit = null;
-    synchronized (application) {
-
-      LOG.info("DEBUG --- allocate: pre-update" +
-          " applicationId=" + applicationId + 
-          " application=" + application);
-      application.showRequests();
-
-      // Update application requests
-      application.updateResourceRequests(ask);
+    LOG.info("DEBUG --- allocate: pre-update" +
+        " applicationId=" + applicationId + 
+        " application=" + application);
+    application.showRequests();
 
-      // Release ununsed containers and update queue capacities
-      processReleasedContainers(application, release);
+    // Update application requests
+    application.updateResourceRequests(ask);
 
-      LOG.info("DEBUG --- allocate: post-update");
-      application.showRequests();
+    // Release ununsed containers and update queue capacities
+    processReleasedContainers(application, release);
+
+    LOG.info("DEBUG --- allocate: post-update");
+    application.showRequests();
+
+    // Acquire containers
+    List<Container> allocatedContainers = application.acquire();
+
+    // Resource limit
+    Resource limit = application.getResourceLimit();
+    
+    LOG.info("DEBUG --- allocate:" +
+        " applicationId=" + applicationId + 
+        " #ask=" + ask.size() + 
+        " #release=" + release.size() +
+        " #allocatedContainers=" + allocatedContainers.size() +
+        " limit=" + limit);
 
-      // Acquire containers
-      allocatedContainers = application.acquire();
-      
-      // Resource limit
-      limit = application.getResourceLimit();
-      LOG.info("DEBUG --- allocate:" +
-          " applicationId=" + applicationId + 
-          " #ask=" + ask.size() + 
-          " #release=" + release.size() +
-          " #allocatedContainers=" + allocatedContainers.size() +
-          " limit=" + limit);
-      
-    }
       
       return new Allocation(allocatedContainers, limit);
   }
@@ -490,6 +486,15 @@ implements ResourceScheduler, CapacitySc
 
   }
 
+  private synchronized void killRunningContainers(List<Container> containers) {
+    for (Container container : containers) {
+      container.setState(ContainerState.COMPLETE);
+      LOG.info("Killing running container " + container.getId());
+      Application application = applications.get(container.getId().getAppId());
+      processReleasedContainers(application, Collections.singletonList(container));
+    }
+  }
+  
   private synchronized void processCompletedContainers(
       List<Container> completedContainers) {
     for (Container container: completedContainers) {
@@ -596,8 +601,20 @@ implements ResourceScheduler, CapacitySc
   public synchronized void removeNode(NodeInfo nodeInfo) {
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     --numNodeManagers;
-  }
 
+    // Remove running containers
+    List<Container> runningContainers = nodeInfo.getRunningContainers();
+    killRunningContainers(runningContainers);
+    
+    // Remove reservations, if any
+    Application reservedApplication = nodeInfo.getReservedApplication();
+    if (reservedApplication != null) {
+      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
+      Resource released = nodeInfo.getReservedResource();
+      queue.completedContainer(clusterResource, null, released, reservedApplication);
+    }
+  }
+  
   public synchronized boolean releaseContainer(ApplicationId applicationId, 
       Container container) {
     // Reap containers

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue May 31 19:29:22 2011
@@ -482,6 +482,13 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
+  private synchronized void killContainers(List<Container> containers) {
+    for (Container container : containers) {
+      container.setState(ContainerState.COMPLETE);
+    }
+    applicationCompletedContainers(containers);
+  }
+  
   private synchronized void applicationCompletedContainers(
       List<Container> completedContainers) {
     for (Container c: completedContainers) {
@@ -566,7 +573,7 @@ public class FifoScheduler implements Re
   @Override
   public synchronized void removeNode(NodeInfo nodeInfo) {
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
-    //TODO inform the the applications that the containers are completed/failed
+    killContainers(nodeInfo.getRunningContainers());
   }
 
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Tue May 31 19:29:22 2011
@@ -161,6 +161,12 @@ public class MockNodes {
       @Override
       public void unreserveResource(Application application, Priority priority) {
       }
+
+      @Override
+      public List<Container> getRunningContainers() {
+        // TODO Auto-generated method stub
+        return null;
+      }
     };
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Tue May 31 19:29:22 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
@@ -86,13 +87,9 @@ public class NodeManager implements Cont
     this.capability = Resources.createResource(memory);
     Resources.addTo(available, capability);
 
-    RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    request.setHost(hostName);
-    request.setContainerManagerPort(containerManagerPort);
-    request.setHttpPort(httpPort);
-    request.setResource(capability);
     RegistrationResponse response =
-        resourceTracker.registerNodeManager(request).getRegistrationResponse();
+        resourceTracker.registerNodeManager(hostName, containerManagerPort, 
+            httpPort, capability);
     this.nodeId = response.getNodeId();
     this.nodeInfo = resourceTracker.getNodeManager(nodeId);
    
@@ -127,14 +124,12 @@ public class NodeManager implements Cont
   
   int responseID = 0;
   
-  public void heartbeat() throws YarnRemoteException {
+  public void heartbeat() throws IOException {
     NodeStatus nodeStatus = 
       org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeStatus.createNodeStatus(
           nodeId, containers);
     nodeStatus.setResponseId(responseID);
-    NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
-    request.setNodeStatus(nodeStatus);
-    HeartbeatResponse response = resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+    HeartbeatResponse response = resourceTracker.nodeHeartbeat(nodeStatus);
     responseID = response.getResponseId();
   }
 
@@ -196,7 +191,8 @@ public class NodeManager implements Cont
   }
   
   @Override
-  synchronized public StopContainerResponse stopContainer(StopContainerRequest request) throws YarnRemoteException {
+  synchronized public StopContainerResponse stopContainer(StopContainerRequest request) 
+  throws YarnRemoteException {
     ContainerId containerID = request.getContainerId();
     String applicationId = String.valueOf(containerID.getAppId().getId());
     
@@ -209,7 +205,11 @@ public class NodeManager implements Cont
     }
     
     // Send a heartbeat
-    heartbeat();
+    try {
+      heartbeat();
+    } catch (IOException ioe) {
+      throw RPCUtil.getRemoteException(ioe);
+    }
     
     // Remove container and update status
     int ctr = 0;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java Tue May 31 19:29:22 2011
@@ -94,7 +94,7 @@ public class TestApplicationCleanup exte
     
     @Override
     protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId,
-        String hostString, String httpAddress, Node node, Resource capability) throws IOException {
+        String hostString, String httpAddress, Node node, Resource capability) {
       return super.getAndAddNodeInfoTracker(nodeId, hostString, httpAddress, node, capability);
     }
     

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java Tue May 31 19:29:22 2011
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,26 +27,18 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.junit.Before;
@@ -128,11 +119,8 @@ public class TestNMExpiry extends TestCa
                   .newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
           nodeStatus.setNodeId(thirdNodeRegResponse.getNodeId());
           nodeStatus.setResponseId(lastResponseID);
-          NodeHeartbeatRequest request =
-              recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
-          request.setNodeStatus(nodeStatus);
           lastResponseID =
-              resourceTracker.nodeHeartbeat(request).getHeartbeatResponse()
+              resourceTracker.nodeHeartbeat(nodeStatus)
                   .getResponseId();
         } catch(Exception e) {
           LOG.info("failed to heartbeat ", e);
@@ -150,26 +138,10 @@ public class TestNMExpiry extends TestCa
     String hostname2 = "localhost2";
     String hostname3 = "localhost3";
     Resource capability = recordFactory.newRecordInstance(Resource.class);
-    RegisterNodeManagerRequest request1 = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    request1.setHost(hostname1);
-    request1.setContainerManagerPort(0);
-    request1.setHttpPort(0);
-    request1.setResource(capability);
-    RegisterNodeManagerRequest request2 = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    request2.setHost(hostname2);
-    request2.setContainerManagerPort(0);
-    request2.setHttpPort(0);
-    request2.setResource(capability);
-    RegisterNodeManagerRequest request3 = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
-    request3.setHost(hostname3);
-    request3.setContainerManagerPort(0);
-    request3.setHttpPort(0);
-    request3.setResource(capability);
-    resourceTracker.registerNodeManager(request1);
-    resourceTracker.registerNodeManager(request2);
+    resourceTracker.registerNodeManager(hostname1, 0, 0, capability);
+    resourceTracker.registerNodeManager(hostname2, 0, 0, capability);
     thirdNodeRegResponse =
-        resourceTracker.registerNodeManager(request3)
-            .getRegistrationResponse();
+        resourceTracker.registerNodeManager(hostname3, 0, 0, capability);
     /* test to see if hostanme 3 does not expire */
     stopT = false;
     new ThirdNodeHeartBeatThread().start();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Tue May 31 19:29:22 2011
@@ -19,18 +19,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -38,9 +34,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.junit.After;
@@ -94,24 +87,20 @@ public class TestRMNMRPCResponseId exten
     request.setContainerManagerPort(0);
     request.setHttpPort(0);
     request.setResource(capability);
-    rmResourceTrackerImpl.registerNodeManager(request);
+    rmResourceTrackerImpl.registerNodeManager(node, 0, 0, capability);
     org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
     nodeStatus.setNodeId(nodeid);
     nodeStatus.setResponseId(0);
-    NodeHeartbeatRequest hbRequest = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
-    hbRequest.setNodeStatus(nodeStatus);
-    HeartbeatResponse response = rmResourceTrackerImpl.nodeHeartbeat(hbRequest).getHeartbeatResponse();
+    HeartbeatResponse response = rmResourceTrackerImpl.nodeHeartbeat(nodeStatus);
     assertTrue(response.getResponseId() == 1);
     nodeStatus.setResponseId(response.getResponseId());
-    hbRequest.setNodeStatus(nodeStatus);
-    response = rmResourceTrackerImpl.nodeHeartbeat(hbRequest).getHeartbeatResponse();
+    response = rmResourceTrackerImpl.nodeHeartbeat(nodeStatus);
     assertTrue(response.getResponseId() == 2);   
     /* try calling with less response id */
-    response = rmResourceTrackerImpl.nodeHeartbeat(hbRequest).getHeartbeatResponse();
+    response = rmResourceTrackerImpl.nodeHeartbeat(nodeStatus);
     assertTrue(response.getResponseId() == 2);   
     nodeStatus.setResponseId(0);
-    hbRequest.setNodeStatus(nodeStatus);
-    response = rmResourceTrackerImpl.nodeHeartbeat(hbRequest).getHeartbeatResponse();
+    response = rmResourceTrackerImpl.nodeHeartbeat(nodeStatus);
     assertTrue(response.getReboot() == true);
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1129868&r1=1129867&r2=1129868&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Tue May 31 19:29:22 2011
@@ -29,7 +29,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
@@ -38,6 +46,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 
@@ -171,8 +180,47 @@ public class MiniYARNCluster extends Com
                 healthChecker, metrics) {
               @Override
               protected ResourceTracker getRMClient() {
+                final RMResourceTrackerImpl rt = resourceManager.getResourceTracker();
+                final RecordFactory recordFactory =
+                  RecordFactoryProvider.getRecordFactory(null);
+
                 // For in-process communication without RPC
-                return resourceManager.getResourceTracker();
+                return new ResourceTracker() {
+
+                  @Override
+                  public NodeHeartbeatResponse nodeHeartbeat(
+                      NodeHeartbeatRequest request) throws YarnRemoteException {
+                    NodeHeartbeatResponse response = recordFactory.newRecordInstance(
+                        NodeHeartbeatResponse.class);
+                    try {
+                    response.setHeartbeatResponse(
+                        rt.nodeHeartbeat(request.getNodeStatus()));
+                    } catch (IOException ioe) {
+                      LOG.info("Exception in heartbeat from node " + 
+                          request.getNodeStatus().getNodeId(), ioe);
+                      throw RPCUtil.getRemoteException(ioe);
+                    }
+                    return response;
+                  }
+
+                  @Override
+                  public RegisterNodeManagerResponse registerNodeManager(
+                      RegisterNodeManagerRequest request)
+                      throws YarnRemoteException {
+                    RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+                        RegisterNodeManagerResponse.class);
+                    try {
+                      response.setRegistrationResponse(
+                          rt.registerNodeManager(
+                              request.getHost(), request.getContainerManagerPort(), 
+                              request.getHttpPort(), request.getResource()));
+                    } catch (IOException ioe) {
+                      LOG.info("Exception in node registration from " + request.getHost(), ioe);
+                      throw RPCUtil.getRemoteException(ioe);
+                    }
+                    return response;
+                  }
+                };
               };
             };
           };



Mime
View raw message