Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B0DEFA6D for ; Wed, 3 Apr 2013 16:57:34 +0000 (UTC) Received: (qmail 88290 invoked by uid 500); 3 Apr 2013 16:57:34 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 88186 invoked by uid 500); 3 Apr 2013 16:57:31 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 88153 invoked by uid 99); 3 Apr 2013 16:57:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Apr 2013 16:57:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Apr 2013 16:57:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6A74E23889FD; Wed, 3 Apr 2013 16:57:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1464105 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/... Date: Wed, 03 Apr 2013 16:57:08 -0000 To: yarn-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130403165708.6A74E23889FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Wed Apr 3 16:57:07 2013 New Revision: 1464105 URL: http://svn.apache.org/r1464105 Log: YARN-101. Fix NodeManager heartbeat processing to not lose track of completed containers in case of dropped heartbeats. Contributed by Xuan Gong. Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1464105&r1=1464104&r2=1464105&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Apr 3 16:57:07 2013 @@ -183,6 +183,9 @@ Release 2.0.5-beta - UNRELEASED local directory hits unix file count limits and thus prevent job failures. (Omkar Vinit Joshi via vinodkv) + YARN-101. Fix NodeManager heartbeat processing to not lose track of completed + containers in case of dropped heartbeats. (Xuan Gong via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1464105&r1=1464104&r2=1464105&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Apr 3 16:57:07 2013 @@ -119,6 +119,10 @@ public class NodeManager extends Composi return new DeletionService(exec); } + protected NMContext createNMContext(NMContainerTokenSecretManager containerTokenSecretManager) { + return new NMContext(containerTokenSecretManager); + } + protected void doSecureLogin() throws IOException { SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB, YarnConfiguration.NM_PRINCIPAL); @@ -137,7 +141,7 @@ public class NodeManager extends Composi containerTokenSecretManager = new NMContainerTokenSecretManager(conf); } - this.context = new NMContext(containerTokenSecretManager); + this.context = createNMContext(containerTokenSecretManager); this.aclsManager = new ApplicationACLsManager(conf); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1464105&r1=1464104&r2=1464105&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Apr 3 16:57:07 2013 @@ -88,6 +88,10 @@ public class NodeStatusUpdaterImpl exten private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; + private boolean previousHeartBeatSucceeded; + private List previousContainersStatuses = + new ArrayList(); + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -95,6 +99,7 @@ public class NodeStatusUpdaterImpl exten this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; + this.previousHeartBeatSucceeded = true; } @Override @@ -314,8 +319,14 @@ public class NodeStatusUpdaterImpl exten NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); - int numActiveContainers = 0; List containersStatuses = new ArrayList(); + if(previousHeartBeatSucceeded) { + previousContainersStatuses.clear(); + } else { + containersStatuses.addAll(previousContainersStatuses); + } + + int numActiveContainers = 0; for (Iterator> i = this.context.getContainers().entrySet().iterator(); i.hasNext();) { Entry e = i.next(); @@ -330,6 +341,7 @@ public class NodeStatusUpdaterImpl exten LOG.info("Sending out status for container: " + containerStatus); if (containerStatus.getState() == ContainerState.COMPLETE) { + previousContainersStatuses.add(containerStatus); // Remove i.remove(); @@ -404,6 +416,7 @@ public class NodeStatusUpdaterImpl exten } NodeHeartbeatResponse response = resourceTracker.nodeHeartbeat(request); + previousHeartBeatSucceeded = true; //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); // See if the master-key has rolled over @@ -449,6 +462,7 @@ public class NodeStatusUpdaterImpl exten new CMgrCompletedAppsEvent(appsToCleanup)); } } catch (Throwable e) { + previousHeartBeatSucceeded = false; // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1464105&r1=1464104&r2=1464105&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Apr 3 16:57:07 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetSocketAddress; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,11 +61,13 @@ import org.apache.hadoop.yarn.server.api import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.service.Service; @@ -92,6 +97,8 @@ public class TestNodeStatusUpdater { private final Configuration conf = createNMConfig(); private NodeManager nm; protected NodeManager rebootedNodeManager; + private boolean containerStatusBackupSuccessfully = true; + private List completedContainerStatusList = new ArrayList(); @After public void tearDown() { @@ -237,6 +244,22 @@ public class TestNodeStatusUpdater { } } + private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl { + public ResourceTracker resourceTracker; + + public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + resourceTracker = new MyResourceTracker4(context); + } + + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + } + private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { public ResourceTracker resourceTracker; private Context context; @@ -384,6 +407,104 @@ public class TestNodeStatusUpdater { } } + private class MyResourceTracker4 implements ResourceTracker { + + public NodeAction registerNodeAction = NodeAction.NORMAL; + public NodeAction heartBeatNodeAction = NodeAction.NORMAL; + private Context context; + + public MyResourceTracker4(Context context) { + this.context = context; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(registerNodeAction); + return response; + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + try { + if (heartBeatID == 0) { + Assert.assertEquals(request.getNodeStatus().getContainersStatuses() + .size(), 0); + Assert.assertEquals(context.getContainers().size(), 0); + } else if (heartBeatID == 1) { + Assert.assertEquals(request.getNodeStatus().getContainersStatuses() + .size(), 5); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(0).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(0) + .getContainerId().getId() == 1); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(1).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(1) + .getContainerId().getId() == 2); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(2).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(2) + .getContainerId().getId() == 3); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(3).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(3) + .getContainerId().getId() == 4); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(4).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(4) + .getContainerId().getId() == 5); + throw new YarnException("Lost the heartbeat response"); + } else if (heartBeatID == 2) { + Assert.assertEquals(request.getNodeStatus().getContainersStatuses() + .size(), 7); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(0).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(0) + .getContainerId().getId() == 3); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(1).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(1) + .getContainerId().getId() == 4); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(2).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(2) + .getContainerId().getId() == 1); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(3).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(3) + .getContainerId().getId() == 2); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(4).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(4) + .getContainerId().getId() == 5); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(5).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(5) + .getContainerId().getId() == 6); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(6).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(6) + .getContainerId().getId() == 7); + } + } catch (AssertionError error) { + LOG.info(error); + containerStatusBackupSuccessfully = false; + } finally { + heartBeatID++; + } + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID); + NodeHeartbeatResponse nhResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, + heartBeatNodeAction, null, null, null, 1000L); + return nhResponse; + } + } + @Before public void clearError() { nmStartError = null; @@ -725,6 +846,127 @@ public class TestNodeStatusUpdater { } } + /** + * Test completed containerStatus get back up when heart beat lost + */ + @Test(timeout = 20000) + public void testCompletedContainerStatusBackup() throws Exception { + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + MyNodeStatusUpdater2 myNodeStatusUpdater = + new MyNodeStatusUpdater2(context, dispatcher, healthChecker, + metrics); + return myNodeStatusUpdater; + } + + @Override + protected NMContext createNMContext( + NMContainerTokenSecretManager containerTokenSecretManager) { + return new MyNMContext(containerTokenSecretManager); + } + + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + int waitCount = 0; + while (heartBeatID <= 3 && waitCount++ != 20) { + Thread.sleep(500); + } + if(!containerStatusBackupSuccessfully) { + Assert.fail("ContainerStatus Backup failed"); + } + nm.stop(); + } + + private class MyNMContext extends NMContext { + ConcurrentMap containers = + new ConcurrentSkipListMap(); + + public MyNMContext(NMContainerTokenSecretManager + containerTokenSecretManager) { + super(containerTokenSecretManager); + } + + @Override + public ConcurrentMap getContainers() { + if (heartBeatID == 0) { + return containers; + } else if (heartBeatID == 1) { + ContainerStatus containerStatus1 = + createContainerStatus(1, ContainerState.RUNNING); + Container container1 = getMockContainer(containerStatus1); + containers.put(containerStatus1.getContainerId(), container1); + + ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + Container container2 = getMockContainer(containerStatus2); + containers.put(containerStatus2.getContainerId(), container2); + + ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + Container container3 = getMockContainer(containerStatus3); + containers.put(containerStatus3.getContainerId(), container3); + completedContainerStatusList.add(containerStatus3); + + ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.COMPLETE); + Container container4 = getMockContainer(containerStatus4); + containers.put(containerStatus4.getContainerId(), container4); + completedContainerStatusList.add(containerStatus4); + + ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.RUNNING); + Container container5 = getMockContainer(containerStatus5); + containers.put(containerStatus5.getContainerId(), container5); + + return containers; + } else if (heartBeatID == 2) { + ContainerStatus containerStatus6 = + createContainerStatus(6, ContainerState.RUNNING); + Container container6 = getMockContainer(containerStatus6); + containers.put(containerStatus6.getContainerId(), container6); + + ContainerStatus containerStatus7 = + createContainerStatus(7, ContainerState.COMPLETE); + Container container7 = getMockContainer(containerStatus7); + containers.put(containerStatus7.getContainerId(), container7); + completedContainerStatusList.add(containerStatus7); + + return containers; + } else { + containers.clear(); + + return containers; + } + } + + private ContainerStatus createContainerStatus(int id, + ContainerState containerState) { + ApplicationId applicationId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), id); + ApplicationAttemptId applicationAttemptId = + BuilderUtils.newApplicationAttemptId(applicationId, id); + ContainerId contaierId = + BuilderUtils.newContainerId(applicationAttemptId, id); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(contaierId, containerState, + "test_containerStatus: id=" + id + ", containerState: " + + containerState, 0); + return containerStatus; + } + + private Container getMockContainer(ContainerStatus containerStatus) { + Container container = mock(Container.class); + when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus); + return container; + } + } + private void verifyNodeStartFailure(String errMessage) { YarnConfiguration conf = createNMConfig(); nm.init(conf);