hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rohithsharm...@apache.org
Subject hadoop git commit: YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.
Date Tue, 21 Jun 2016 05:43:11 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 392e7e2d0 -> ce2fcd932


YARN-5197. RM leaks containers if running container disappears from node update. Contributed
by Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce2fcd93
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce2fcd93
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce2fcd93

Branch: refs/heads/branch-2.6
Commit: ce2fcd932d2573a417e15cf0f87b398c6876df59
Parents: 392e7e2
Author: Rohith Sharma K S <rohithsharmaks@apache.org>
Authored: Tue Jun 21 11:04:05 2016 +0530
Committer: Rohith Sharma K S <rohithsharmaks@apache.org>
Committed: Tue Jun 21 11:04:05 2016 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../resourcemanager/rmnode/RMNodeImpl.java      | 36 +++++++++++++
 .../yarn/server/resourcemanager/MockNM.java     | 45 ++++++++++++++---
 .../resourcemanager/TestRMNodeTransitions.java  | 53 ++++++++++++++++++++
 4 files changed, 129 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce2fcd93/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 92e8797..b120e51 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -36,6 +36,9 @@ Release 2.6.5 - UNRELEASED
     YARN-5206. RegistrySecurity includes id:pass in exception text if
     considered invalid (Steve Loughran via jlowe)
 
+    YARN-5197. RM leaks containers if running container disappears from 
+    node update. Contributed by Jason Lowe.
+
 Release 2.6.4 - 2016-02-11
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce2fcd93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d2fe991..4403108 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -57,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -866,6 +869,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
         new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers =
         new ArrayList<ContainerStatus>();
+    int numRemoteRunningContainers = 0;
     for (ContainerStatus remoteContainer : containerStatuses) {
       ContainerId containerId = remoteContainer.getContainerId();
 
@@ -887,6 +891,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
 
       // Process running containers
       if (remoteContainer.getState() == ContainerState.RUNNING) {
+        ++numRemoteRunningContainers;
         if (!launchedContainers.contains(containerId)) {
           // Just launched container. RM knows about it the first time.
           launchedContainers.add(containerId);
@@ -898,10 +903,41 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
         completedContainers.add(remoteContainer);
       }
     }
+    completedContainers.addAll(findLostContainers(
+          numRemoteRunningContainers, containerStatuses));
+
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
       nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
           completedContainers));
     }
   }
 
+  private List<ContainerStatus> findLostContainers(int numRemoteRunning,
+      List<ContainerStatus> containerStatuses) {
+    if (numRemoteRunning >= launchedContainers.size()) {
+      return Collections.emptyList();
+    }
+    Set<ContainerId> nodeContainers =
+        new HashSet<ContainerId>(numRemoteRunning);
+    List<ContainerStatus> lostContainers = new ArrayList<ContainerStatus>(
+        launchedContainers.size() - numRemoteRunning);
+    for (ContainerStatus remoteContainer : containerStatuses) {
+      if (remoteContainer.getState() == ContainerState.RUNNING) {
+        nodeContainers.add(remoteContainer.getContainerId());
+      }
+    }
+    Iterator<ContainerId> iter = launchedContainers.iterator();
+    while (iter.hasNext()) {
+      ContainerId containerId = iter.next();
+      if (!nodeContainers.contains(containerId)) {
+        String diag = "Container " + containerId
+            + " was running but not reported from " + nodeId;
+        LOG.warn(diag);
+        lostContainers.add(SchedulerUtils.createAbnormalContainerStatus(
+            containerId, diag));
+        iter.remove();
+      }
+    }
+    return lostContainers;
+  }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce2fcd93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index c917f79..cbd83c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -55,6 +57,8 @@ public class MockNM {
   private MasterKey currentContainerTokenMasterKey;
   private MasterKey currentNMTokenMasterKey;
   private String version;
+  private Map<ContainerId, ContainerStatus> containerStats =
+      new HashMap<ContainerId, ContainerStatus>();
 
   public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
     // scale vcores based on the requested memory
@@ -129,18 +133,27 @@ public class MockNM {
     this.currentContainerTokenMasterKey =
         registrationResponse.getContainerTokenMasterKey();
     this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
+    containerStats.clear();
+    if (containerReports != null) {
+      for (NMContainerStatus report : containerReports) {
+        if (report.getContainerState() != ContainerState.COMPLETE) {
+          containerStats.put(report.getContainerId(),
+              ContainerStatus.newInstance(report.getContainerId(),
+                  report.getContainerState(), report.getDiagnostics(),
+                  report.getContainerExitStatus()));
+        }
+      }
+    }
     return registrationResponse;    
   }
   
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
-    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
+    return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
         isHealthy, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
       long containerId, ContainerState containerState) throws Exception {
-    HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
-        new HashMap<ApplicationId, List<ContainerStatus>>(1);
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         BuilderUtils.newContainerId(attemptId, containerId), containerState,
         "Success", 0);
@@ -148,8 +161,7 @@ public class MockNM {
         new ArrayList<ContainerStatus>(1);
     containerStatusList.add(containerStatus);
     Log.info("ContainerStatus: " + containerStatus);
-    nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
-    return nodeHeartbeat(nodeUpdate, true);
+    return nodeHeartbeat(containerStatusList, true, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -159,13 +171,30 @@ public class MockNM {
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception
{
+    ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
+    for (List<ContainerStatus> stats : conts.values()) {
+      updatedStats.addAll(stats);
+    }
+    return nodeHeartbeat(updatedStats, isHealthy, resId);
+  }
+
+  public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
+      boolean isHealthy, int resId) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setResponseId(resId);
     status.setNodeId(nodeId);
-    for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet())
{
-      Log.info("entry.getValue() " + entry.getValue());
-      status.setContainersStatuses(entry.getValue());
+    ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
+    for (ContainerStatus stat : updatedStats) {
+      if (stat.getState() == ContainerState.COMPLETE) {
+        completedContainers.add(stat.getContainerId());
+      }
+      containerStats.put(stat.getContainerId(), stat);
+    }
+    status.setContainersStatuses(
+        new ArrayList<ContainerStatus>(containerStats.values()));
+    for (ContainerId cid : completedContainers) {
+      containerStats.remove(cid);
     }
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
     healthStatus.setHealthReport("");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce2fcd93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index c6da3fd..08d724e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -32,7 +32,9 @@ import java.util.List;
 
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -147,6 +149,11 @@ public class TestRMNodeTransitions {
   }
   
   private RMNodeStatusEvent getMockRMNodeStatusEvent() {
+    return getMockRMNodeStatusEvent(null);
+  }
+
+  private RMNodeStatusEvent getMockRMNodeStatusEvent(
+      List<ContainerStatus> containerStatus) {
     NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
 
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
@@ -157,6 +164,9 @@ public class TestRMNodeTransitions {
     doReturn(healthStatus).when(event).getNodeHealthStatus();
     doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+    if (containerStatus != null) {
+      doReturn(containerStatus).when(event).getContainers();
+    }
     return event;
   }
   
@@ -617,4 +627,47 @@ public class TestRMNodeTransitions {
         null, null));
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
   }
+
+  @Test
+  public void testDisappearingContainer() {
+    ContainerId cid1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+    ContainerId cid2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(2, 2), 2), 2);
+    ArrayList<ContainerStatus> containerStats =
+        new ArrayList<ContainerStatus>();
+    containerStats.add(ContainerStatus.newInstance(cid1,
+        ContainerState.RUNNING, "", -1));
+    containerStats.add(ContainerStatus.newInstance(cid2,
+        ContainerState.RUNNING, "", -1));
+    node = getRunningNode();
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("unexpected number of running containers",
+        2, node.getLaunchedContainers().size());
+    Assert.assertTrue("first container not running",
+        node.getLaunchedContainers().contains(cid1));
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+    assertEquals("already completed containers",
+        0, completedContainers.size());
+    containerStats.remove(0);
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("expected one container to be completed",
+        1, completedContainers.size());
+    ContainerStatus cs = completedContainers.get(0);
+    assertEquals("first container not the one that completed",
+        cid1, cs.getContainerId());
+    assertEquals("completed container not marked complete",
+        ContainerState.COMPLETE, cs.getState());
+    assertEquals("completed container not marked aborted",
+        ContainerExitStatus.ABORTED, cs.getExitStatus());
+    Assert.assertTrue("completed container not marked missing",
+        cs.getDiagnostics().contains("not reported"));
+    assertEquals("unexpected number of running containers",
+        1, node.getLaunchedContainers().size());
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message