hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [20/21] hadoop git commit: YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du via mingma)
Date Fri, 10 Jul 2015 15:42:25 GMT
YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du
via mingma)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08244264
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08244264
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08244264

Branch: refs/heads/HADOOP-12111
Commit: 08244264c0583472b9c4e16591cfde72c6db62a2
Parents: b489080
Author: Ming Ma <mingma@apache.org>
Authored: Fri Jul 10 08:30:10 2015 -0700
Committer: Ming Ma <mingma@apache.org>
Committed: Fri Jul 10 08:30:10 2015 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |  8 +++-
 .../yarn/sls/scheduler/RMNodeWrapper.java       |  5 +++
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../server/resourcemanager/rmnode/RMNode.java   |  2 +
 .../resourcemanager/rmnode/RMNodeImpl.java      | 43 ++++++++++++++++----
 .../yarn/server/resourcemanager/MockNodes.java  |  5 +++
 .../resourcemanager/TestRMNodeTransitions.java  | 36 ++++++++++++++--
 7 files changed, 91 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index ee6eb7b..440779c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -62,7 +62,8 @@ public class NodeInfo {
     private NodeState state;
     private List<ContainerId> toCleanUpContainers;
     private List<ApplicationId> toCleanUpApplications;
-    
+    private List<ApplicationId> runningApplications;
+
     public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
         Resource perNode, String rackName, String healthReport,
         int cmdPort, String hostName, NodeState state) {
@@ -77,6 +78,7 @@ public class NodeInfo {
       this.state = state;
       toCleanUpApplications = new ArrayList<ApplicationId>();
       toCleanUpContainers = new ArrayList<ContainerId>();
+      runningApplications = new ArrayList<ApplicationId>();
     }
 
     public NodeId getNodeID() {
@@ -135,6 +137,10 @@ public class NodeInfo {
       return toCleanUpApplications;
     }
 
+    public List<ApplicationId> getRunningApps() {
+      return runningApplications;
+    }
+
     public void updateNodeHeartbeatResponseForCleanup(
             NodeHeartbeatResponse response) {
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index b64be1b..a6633ae 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -119,6 +119,11 @@ public class RMNodeWrapper implements RMNode {
   }
 
   @Override
+  public List<ApplicationId> getRunningApps() {
+    return node.getRunningApps();
+  }
+
+  @Override
   public void updateNodeHeartbeatResponseForCleanup(
           NodeHeartbeatResponse nodeHeartbeatResponse) {
     node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2a9ff98..db000d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1678,6 +1678,9 @@ Release 2.6.0 - 2014-11-18
     YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max
     share (Siqi Li via Sandy Ryza)
 
+    YARN-3445. Cache runningApps in RMNode for getting running apps on given
+    NodeId. (Junping Du via mingma)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 95eeaf6..0386be6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -119,6 +119,8 @@ public interface RMNode {
 
   public List<ApplicationId> getAppsToCleanup();
 
+  List<ApplicationId> getRunningApps();
+
   /**
    * Update a {@link NodeHeartbeatResponse} with the list of containers and
    * applications to clean up for this node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d1e6190..9bc91c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -123,11 +123,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
       new HashSet<ContainerId>();
 
   /* the list of applications that have finished and need to be purged */
-  private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
+  private final List<ApplicationId> finishedApplications =
+      new ArrayList<ApplicationId>();
+
+  /* the list of applications that are running on this node */
+  private final List<ApplicationId> runningApplications =
+      new ArrayList<ApplicationId>();
 
   private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
-  
+
   private static final StateMachineFactory<RMNodeImpl,
                                            NodeState,
                                            RMNodeEventType,
@@ -136,7 +141,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
                                            NodeState,
                                            RMNodeEventType,
                                            RMNodeEvent>(NodeState.NEW)
-  
+
      //Transitions from NEW state
      .addTransition(NodeState.NEW, NodeState.RUNNING, 
          RMNodeEventType.STARTED, new AddNodeTransition())
@@ -383,6 +388,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
   }
   
   @Override
+  public List<ApplicationId> getRunningApps() {
+    this.readLock.lock();
+    try {
+      return new ArrayList<ApplicationId>(this.runningApplications);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public List<ContainerId> getContainersToCleanUp() {
 
     this.readLock.lock();
@@ -519,9 +534,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
       LOG.warn("Cannot get RMApp by appId=" + appId
           + ", just added it to finishedApplications list for cleanup");
       rmNode.finishedApplications.add(appId);
+      rmNode.runningApplications.remove(appId);
       return;
     }
 
+    // Add running applications back due to Node add or Node reconnection.
+    rmNode.runningApplications.add(appId);
     context.getDispatcher().getEventHandler()
         .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
   }
@@ -707,8 +725,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      rmNode.finishedApplications.add(((
-          RMNodeCleanAppEvent) event).getAppId());
+      ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId();
+      rmNode.finishedApplications.add(appId);
+      rmNode.runningApplications.remove(appId);
     }
   }
 
@@ -910,12 +929,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
             + "cleanup, no further processing");
         continue;
       }
-      if (finishedApplications.contains(containerId.getApplicationAttemptId()
-          .getApplicationId())) {
+
+      ApplicationId containerAppId =
+          containerId.getApplicationAttemptId().getApplicationId();
+
+      if (finishedApplications.contains(containerAppId)) {
         LOG.info("Container " + containerId
             + " belongs to an application that is already killed,"
             + " no further processing");
         continue;
+      } else if (!runningApplications.contains(containerAppId)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Container " + containerId
+              + " is the first container get launched for application "
+              + containerAppId);
+        }
+        runningApplications.add(containerAppId);
       }
 
       // Process running containers

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 2d863d1..095fe28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -187,6 +187,11 @@ public class MockNodes {
     }
 
     @Override
+    public List<ApplicationId> getRunningApps() {
+      return null;
+    }
+
+    @Override
     public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 01f4357..ece896b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -33,6 +33,7 @@ import java.util.List;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -485,9 +486,9 @@ public class TestRMNodeTransitions {
     NodeId nodeId = node.getNodeID();
 
     // Expire a container
-		ContainerId completedContainerId = BuilderUtils.newContainerId(
-				BuilderUtils.newApplicationAttemptId(
-						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
     node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
 
@@ -512,6 +513,35 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
   }
 
+  @Test(timeout=20000)
+  public void testUpdateHeartbeatResponseForAppLifeCycle() {
+    RMNodeImpl node = getRunningNode();
+    NodeId nodeId = node.getNodeID();
+
+    ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
+    // Create a running container
+    ContainerId runningContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+        runningAppId, 0), 0);
+
+    ContainerStatus status = ContainerStatus.newInstance(runningContainerId,
+        ContainerState.RUNNING, "", 0);
+    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status);
+    NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
+        "", System.currentTimeMillis());
+    node.handle(new RMNodeStatusEvent(nodeId, nodeHealth,
+        statusList, null, null));
+
+    Assert.assertEquals(1, node.getRunningApps().size());
+
+    // Finish an application
+    ApplicationId finishedAppId = runningAppId;
+    node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
+    Assert.assertEquals(1, node.getAppsToCleanup().size());
+    Assert.assertEquals(0, node.getRunningApps().size());
+  }
+
   private RMNodeImpl getRunningNode() {
     return getRunningNode(null, 0);
   }


Mime
View raw message