hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1429797 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-...
Date Mon, 07 Jan 2013 13:45:31 GMT
Author: vinodkv
Date: Mon Jan  7 13:45:31 2013
New Revision: 1429797

URL: http://svn.apache.org/viewvc?rev=1429797&view=rev
Log:
YARN-170. Change NodeManager stop to be reentrant. Contributed by Sandy Ryza.
svn merge --ignore-ancestry -c 1429796 ../../trunk/

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEvent.java
      - copied unchanged from r1429796, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEvent.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java
      - copied unchanged from r1429796, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1429797&r1=1429796&r2=1429797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Jan  7 13:45:31 2013
@@ -75,6 +75,8 @@ Release 2.0.3-alpha - Unreleased
     YARN-315. Using the common security token protobuf definition from hadoop
     common. (Suresh Srinivas via vinodkv) 
 
+    YARN-170. Change NodeManager stop to be reentrant. (Sandy Ryza via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1429797&r1=1429796&r2=1429797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
Mon Jan  7 13:45:31 2013
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -54,11 +56,10 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
 import org.apache.hadoop.yarn.util.Records;
 
-public class NodeManager extends CompositeService implements
-    ServiceStateChangeListener {
+public class NodeManager extends CompositeService 
+    implements EventHandler<NodeManagerEvent> {
 
   /**
    * Priority of the NodeManager shutdown hook.
@@ -82,6 +83,8 @@ public class NodeManager extends Composi
   
   private long waitForContainersOnShutdownMillis;
   
+  private AtomicBoolean isStopping = new AtomicBoolean(false);
+  
   public NodeManager() {
     super(NodeManager.class.getName());
   }
@@ -152,7 +155,6 @@ public class NodeManager extends Composi
 
     NodeStatusUpdater nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
-    nodeStatusUpdater.register(this);
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
     addService(nodeResourceMonitor);
@@ -167,6 +169,7 @@ public class NodeManager extends Composi
     addService(webServer);
 
     dispatcher.register(ContainerManagerEventType.class, containerManager);
+    dispatcher.register(NodeManagerEventType.class, this);
     addService(dispatcher);
     
     DefaultMetricsSystem.initialize("NodeManager");
@@ -198,13 +201,17 @@ public class NodeManager extends Composi
 
   @Override
   public void stop() {
+    if (isStopping.getAndSet(true)) {
+      return;
+    }
+    
     cleanupContainers();
     super.stop();
     DefaultMetricsSystem.shutdown();
   }
   
   @SuppressWarnings("unchecked")
-  private void cleanupContainers() {
+  protected void cleanupContainers() {
     Map<ContainerId, Container> containers = context.getContainers();
     if (containers.isEmpty()) {
       return;
@@ -293,24 +300,10 @@ public class NodeManager extends Composi
     return nodeHealthChecker;
   }
 
-  @Override
-  public void stateChanged(Service service) {
-    if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
-        && STATE.STOPPED.equals(service.getServiceState())) {
-
-      boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
-
-      // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.      
-      stop();
-
-      // Reboot the whole node-manager if NodeStatusUpdater got a reboot command
-      // from the RM.
-      if (hasToReboot) {
-        LOG.info("Rebooting the node manager.");
-        NodeManager nodeManager = createNewNodeManager();
-        nodeManager.initAndStartNodeManager(this.getConfig(), hasToReboot);
-      }
-    }
+  private void reboot() {
+    LOG.info("Rebooting the node manager.");
+    NodeManager nodeManager = createNewNodeManager();
+    nodeManager.initAndStartNodeManager(this.getConfig(), true);
   }
   
   private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
@@ -333,6 +326,21 @@ public class NodeManager extends Composi
     }
   }
 
+  @Override
+  public void handle(NodeManagerEvent event) {
+    switch (event.getType()) {
+    case SHUTDOWN:
+      stop();
+      break;
+    case REBOOT:
+      stop();
+      reboot();
+      break;
+    default:
+      LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
+    }
+  }
+  
   // For testing
   NodeManager createNewNodeManager() {
     return new NodeManager();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1429797&r1=1429796&r2=1429797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
Mon Jan  7 13:45:31 2013
@@ -88,8 +88,6 @@ public class NodeStatusUpdaterImpl exten
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
 
-  private boolean hasToRebootNode;
-  
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
     super(NodeStatusUpdaterImpl.class.getName());
@@ -149,18 +147,6 @@ public class NodeStatusUpdaterImpl exten
     super.stop();
   }
   
-  private synchronized void reboot() {
-    this.hasToRebootNode = true;
-    // Stop the status-updater. This will trigger a sub-service state change in
-    // the NodeManager which will then decide to reboot or not based on
-    // isRebooted.
-    this.stop();
-  }
-
-  synchronized boolean hasToRebootNode() {
-    return this.hasToRebootNode;
-  }
-
   private boolean isSecurityEnabled() {
     return UserGroupInformation.isSecurityEnabled();
   }
@@ -348,13 +334,15 @@ public class NodeStatusUpdaterImpl exten
               LOG
                   .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat,"
+
                   		" hence shutting down.");
-              NodeStatusUpdaterImpl.this.stop();
+              dispatcher.getEventHandler().handle(
+                  new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
               break;
             }
             if (response.getNodeAction() == NodeAction.REBOOT) {
               LOG.info("Node is out of sync with ResourceManager,"
                   + " hence rebooting.");
-              NodeStatusUpdaterImpl.this.reboot();
+              dispatcher.getEventHandler().handle(
+                  new NodeManagerEvent(NodeManagerEventType.REBOOT));
               break;
             }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1429797&r1=1429796&r2=1429797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Mon Jan  7 13:45:31 2013
@@ -29,6 +29,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -97,7 +98,7 @@ public class TestNodeStatusUpdater {
   public void tearDown() {
     this.registeredNodes.clear();
     heartBeatID = 0;
-    if (nm != null) {
+    if (nm != null && nm.getServiceState() == STATE.STARTED) {
       nm.stop();
     }
     DefaultMetricsSystem.shutdown();
@@ -447,6 +448,52 @@ public class TestNodeStatusUpdater {
   }
   
   @Test
+  public void testStopReentrant() throws Exception {
+    final AtomicInteger numCleanups = new AtomicInteger(0);
+    nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+        MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
+            context, dispatcher, healthChecker, metrics);
+        MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+        myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN;
+        myNodeStatusUpdater.resourceTracker = myResourceTracker2;
+        return myNodeStatusUpdater;
+      }
+      
+      @Override
+      protected void cleanupContainers() {
+        super.cleanupContainers();
+        numCleanups.incrementAndGet();
+      }
+    };
+
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+    
+    int waitCount = 0;
+    while (heartBeatID < 1 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    Assert.assertFalse(heartBeatID < 1);
+
+    // Meanwhile call stop directly as the shutdown hook would
+    nm.stop();
+    
+    // NM takes a while to reach the STOPPED state.
+    waitCount = 0;
+    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+      LOG.info("Waiting for NM to stop..");
+      Thread.sleep(1000);
+    }
+
+    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+    Assert.assertEquals(numCleanups.get(), 1);
+  }
+  
+  @Test
   public void testNodeDecommision() throws Exception {
     nm = getNodeManager(NodeAction.SHUTDOWN);
     YarnConfiguration conf = createNMConfig();



Mime
View raw message