Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B2B611C2F for ; Thu, 18 Sep 2014 21:40:06 +0000 (UTC) Received: (qmail 6300 invoked by uid 500); 18 Sep 2014 21:40:05 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 6228 invoked by uid 500); 18 Sep 2014 21:40:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 6219 invoked by uid 99); 18 Sep 2014 21:40:05 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Sep 2014 21:40:05 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 833BFA1BDC1; Thu, 18 Sep 2014 21:40:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jlowe@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: YARN-2561. MR job client cannot reconnect to AM after NM restart. Contributed by Junping Du (cherry picked from commit a337f0e3549351344bce70cb23ddc0a256c894b0) Date: Thu, 18 Sep 2014 21:40:05 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 4b4e44a8a -> d9273a954 YARN-2561. MR job client cannot reconnect to AM after NM restart. Contributed by Junping Du (cherry picked from commit a337f0e3549351344bce70cb23ddc0a256c894b0) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9273a95 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9273a95 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9273a95 Branch: refs/heads/branch-2 Commit: d9273a95476371b01cc366251a6c67ec2c07f4c4 Parents: 4b4e44a Author: Jason Lowe Authored: Thu Sep 18 21:34:40 2014 +0000 Committer: Jason Lowe Committed: Thu Sep 18 21:36:20 2014 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../resourcemanager/rmnode/RMNodeImpl.java | 47 +++++++++++++++++--- .../TestResourceTrackerService.java | 11 +++++ 3 files changed, 55 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9273a95/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a537749..0baf910 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -361,6 +361,9 @@ Release 2.6.0 - UNRELEASED YARN-2363. Submitted applications occasionally lack a tracking URL (jlowe) + YARN-2561. MR job client cannot reconnect to AM after NM restart. (Junping + Du via jlowe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9273a95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3ce6416..1265958 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -544,12 +544,47 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); - rmNode.httpPort = newNode.getHttpPort(); - rmNode.httpAddress = newNode.getHttpAddress(); - rmNode.totalCapability = newNode.getTotalCapability(); + List runningApps = reconnectEvent.getRunningApplications(); + boolean noRunningApps = + (runningApps == null) || (runningApps.size() == 0); - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + // No application running on the node, so send node-removal event with + // cleaning up old container info. + if (noRunningApps) { + rmNode.nodeUpdateQueue.clear(); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeRemovedSchedulerEvent(rmNode)); + + if (rmNode.getHttpPort() == newNode.getHttpPort()) { + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + if (rmNode.getState() != NodeState.UNHEALTHY) { + // Only add new node if old state is not UNHEALTHY + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeAddedSchedulerEvent(newNode)); + } + } else { + // Reconnected node differs, so replace old node and start new node + switch (rmNode.getState()) { + case RUNNING: + ClusterMetrics.getMetrics().decrNumActiveNodes(); + break; + case UNHEALTHY: + ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); + break; + } + rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); + rmNode.context.getDispatcher().getEventHandler().handle( + new RMNodeStartedEvent(newNode.getNodeID(), null, null)); + } + } else { + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.totalCapability = newNode.getTotalCapability(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + } if (null != reconnectEvent.getRunningApplications()) { for (ApplicationId appId : reconnectEvent.getRunningApplications()) { @@ -564,7 +599,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // Update scheduler node's capacity for reconnect node. rmNode.context.getDispatcher().getEventHandler().handle( new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(rmNode.totalCapability, -1))); + ResourceOption.newInstance(newNode.getTotalCapability(), -1))); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9273a95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4827620..115f0b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -599,6 +600,16 @@ public class TestResourceTrackerService { dispatcher.await(); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); + + // reconnect of node with changed capability and running applications + List runningApps = new ArrayList(); + runningApps.add(ApplicationId.newInstance(1, 0)); + nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps); + dispatcher.await(); + response = nm1.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); + Assert.assertEquals(5120 + 15360, metrics.getAvailableMB()); } private void writeToHostsFile(String... hosts) throws IOException {