hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject git commit: YARN-2561. MR job client cannot reconnect to AM after NM restart. Contributed by Junping Du (cherry picked from commit a337f0e3549351344bce70cb23ddc0a256c894b0)
Date Thu, 18 Sep 2014 21:40:05 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 4b4e44a8a -> d9273a954


YARN-2561. MR job client cannot reconnect to AM after NM restart. Contributed by Junping Du
(cherry picked from commit a337f0e3549351344bce70cb23ddc0a256c894b0)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9273a95
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9273a95
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9273a95

Branch: refs/heads/branch-2
Commit: d9273a95476371b01cc366251a6c67ec2c07f4c4
Parents: 4b4e44a
Author: Jason Lowe <jlowe@apache.org>
Authored: Thu Sep 18 21:34:40 2014 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Thu Sep 18 21:36:20 2014 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../resourcemanager/rmnode/RMNodeImpl.java      | 47 +++++++++++++++++---
 .../TestResourceTrackerService.java             | 11 +++++
 3 files changed, 55 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9273a95/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a537749..0baf910 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -361,6 +361,9 @@ Release 2.6.0 - UNRELEASED
 
     YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe)
 
+    YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping
+    Du via jlowe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9273a95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 3ce6416..1265958 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -544,12 +544,47 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
       RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
       RMNode newNode = reconnectEvent.getReconnectedNode();
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
-      rmNode.httpPort = newNode.getHttpPort();
-      rmNode.httpAddress = newNode.getHttpAddress();
-      rmNode.totalCapability = newNode.getTotalCapability();
+      List<ApplicationId> runningApps = reconnectEvent.getRunningApplications();
+      boolean noRunningApps = 
+          (runningApps == null) || (runningApps.size() == 0);
       
-      // Reset heartbeat ID since node just restarted.
-      rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+      // No application running on the node, so send node-removal event with 
+      // cleaning up old container info.
+      if (noRunningApps) {
+        rmNode.nodeUpdateQueue.clear();
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new NodeRemovedSchedulerEvent(rmNode));
+        
+        if (rmNode.getHttpPort() == newNode.getHttpPort()) {
+          // Reset heartbeat ID since node just restarted.
+          rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+          if (rmNode.getState() != NodeState.UNHEALTHY) {
+            // Only add new node if old state is not UNHEALTHY
+            rmNode.context.getDispatcher().getEventHandler().handle(
+                new NodeAddedSchedulerEvent(newNode));
+          }
+        } else {
+          // Reconnected node differs, so replace old node and start new node
+          switch (rmNode.getState()) {
+            case RUNNING:
+              ClusterMetrics.getMetrics().decrNumActiveNodes();
+              break;
+            case UNHEALTHY:
+              ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
+              break;
+            }
+            rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
+            rmNode.context.getDispatcher().getEventHandler().handle(
+                new RMNodeStartedEvent(newNode.getNodeID(), null, null));
+        }
+      } else {
+        rmNode.httpPort = newNode.getHttpPort();
+        rmNode.httpAddress = newNode.getHttpAddress();
+        rmNode.totalCapability = newNode.getTotalCapability();
+      
+        // Reset heartbeat ID since node just restarted.
+        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+      }
 
       if (null != reconnectEvent.getRunningApplications()) {
         for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
@@ -564,7 +599,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
         // Update scheduler node's capacity for reconnect node.
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeResourceUpdateSchedulerEvent(rmNode, 
-                ResourceOption.newInstance(rmNode.totalCapability, -1)));
+                ResourceOption.newInstance(newNode.getTotalCapability(), -1)));
       }
       
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9273a95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 4827620..115f0b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
@@ -599,6 +600,16 @@ public class TestResourceTrackerService {
     dispatcher.await();
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
+    
+    // reconnect of node with changed capability and running applications
+    List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
+    runningApps.add(ApplicationId.newInstance(1, 0));
+    nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
+    dispatcher.await();
+    response = nm1.nodeHeartbeat(true);
+    dispatcher.await();
+    Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
+    Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
   }
 
   private void writeToHostsFile(String... hosts) throws IOException {


Mime
View raw message