hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject [28/45] hadoop git commit: YARN-8451. Multiple NM heartbeat thread created when a slow NM resync with RM. Contributed by Botong Huang
Date Mon, 02 Jul 2018 20:32:45 GMT
YARN-8451. Multiple NM heartbeat thread created when a slow NM resync with RM. Contributed
by Botong Huang


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/10047014
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/10047014
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/10047014

Branch: refs/heads/HDDS-4
Commit: 100470140d86eede0fa240a9aa93226f274ee4f5
Parents: a820738
Author: Jason Lowe <jlowe@apache.org>
Authored: Fri Jun 29 13:06:28 2018 -0500
Committer: Jason Lowe <jlowe@apache.org>
Committed: Fri Jun 29 13:06:28 2018 -0500

----------------------------------------------------------------------
 .../yarn/server/nodemanager/NodeManager.java    | 66 +++++++++++++-------
 .../nodemanager/TestNodeManagerResync.java      | 56 +++++++++++++++++
 2 files changed, 98 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/10047014/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 2748a8f..c8234bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -128,6 +128,7 @@ public class NodeManager extends CompositeService
   // the NM collector service is set only if the timeline service v.2 is enabled
   private NMCollectorService nmCollectorService;
   private NodeStatusUpdater nodeStatusUpdater;
+  private AtomicBoolean resyncingWithRM = new AtomicBoolean(false);
   private NodeResourceMonitor nodeResourceMonitor;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook;
   private NMStateStoreService nmStore = null;
@@ -393,7 +394,7 @@ public class NodeManager extends CompositeService
     addService(del);
 
     // NodeManager level dispatcher
-    this.dispatcher = new AsyncDispatcher("NM Event dispatcher");
+    this.dispatcher = createNMDispatcher();
 
     nodeHealthChecker =
         new NodeHealthCheckerService(
@@ -517,31 +518,41 @@ public class NodeManager extends CompositeService
   }
 
   protected void resyncWithRM() {
-    //we do not want to block dispatcher thread here
-    new Thread() {
-      @Override
-      public void run() {
-        try {
-          if (!rmWorkPreservingRestartEnabled) {
-            LOG.info("Cleaning up running containers on resync");
-            containerManager.cleanupContainersOnNMResync();
-            // Clear all known collectors for resync.
-            if (context.getKnownCollectors() != null) {
-              context.getKnownCollectors().clear();
+    // Create a thread for resync because we do not want to block dispatcher
+    // thread here. Also use locking to make sure only one thread is running at
+    // a time.
+    if (this.resyncingWithRM.getAndSet(true)) {
+      // Some other thread is already created for resyncing, do nothing
+    } else {
+      // We have got the lock, create a new thread
+      new Thread() {
+        @Override
+        public void run() {
+          try {
+            if (!rmWorkPreservingRestartEnabled) {
+              LOG.info("Cleaning up running containers on resync");
+              containerManager.cleanupContainersOnNMResync();
+              // Clear all known collectors for resync.
+              if (context.getKnownCollectors() != null) {
+                context.getKnownCollectors().clear();
+              }
+            } else {
+              LOG.info("Preserving containers on resync");
+              // Re-register known timeline collectors.
+              reregisterCollectors();
             }
-          } else {
-            LOG.info("Preserving containers on resync");
-            // Re-register known timeline collectors.
-            reregisterCollectors();
+            ((NodeStatusUpdaterImpl) nodeStatusUpdater)
+                .rebootNodeStatusUpdaterAndRegisterWithRM();
+          } catch (YarnRuntimeException e) {
+            LOG.error("Error while rebooting NodeStatusUpdater.", e);
+            shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
+          } finally {
+            // Release lock
+            resyncingWithRM.set(false);
           }
-          ((NodeStatusUpdaterImpl) nodeStatusUpdater)
-            .rebootNodeStatusUpdaterAndRegisterWithRM();
-        } catch (YarnRuntimeException e) {
-          LOG.error("Error while rebooting NodeStatusUpdater.", e);
-          shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
         }
-      }
-    }.start();
+      }.start();
+    }
   }
 
   /**
@@ -946,7 +957,14 @@ public class NodeManager extends CompositeService
   ContainerManagerImpl getContainerManager() {
     return containerManager;
   }
-  
+
+  /**
+   * Unit test friendly.
+   */
+  protected AsyncDispatcher createNMDispatcher() {
+    return new AsyncDispatcher("NM Event dispatcher");
+  }
+
   //For testing
   Dispatcher getNMDispatcher(){
     return dispatcher;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10047014/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index cf33775..b3f4e1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -37,6 +37,7 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
@@ -64,7 +65,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -107,6 +110,7 @@ public class TestNodeManagerResync {
   private FileContext localFS;
   private CyclicBarrier syncBarrier;
   private CyclicBarrier updateBarrier;
+  private AtomicInteger resyncThreadCount;
   private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
   private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
   private final NodeManagerEvent resyncEvent =
@@ -125,6 +129,7 @@ public class TestNodeManagerResync {
     nmLocalDir.mkdirs();
     syncBarrier = new CyclicBarrier(2);
     updateBarrier = new CyclicBarrier(2);
+    resyncThreadCount = new AtomicInteger(0);
   }
 
   @After
@@ -186,6 +191,41 @@ public class TestNodeManagerResync {
   }
 
   @SuppressWarnings("resource")
+  @Test(timeout = 30000)
+  public void testNMMultipleResyncEvent()
+      throws IOException, InterruptedException {
+    TestNodeManager1 nm = new TestNodeManager1(false);
+    YarnConfiguration conf = createNMConfig();
+
+    int resyncEventCount = 4;
+    try {
+      nm.init(conf);
+      nm.start();
+      Assert.assertEquals(1, nm.getNMRegistrationCount());
+      for (int i = 0; i < resyncEventCount; i++) {
+        nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
+      }
+
+      DrainDispatcher dispatcher = (DrainDispatcher) nm.getNMDispatcher();
+      dispatcher.await();
+      LOG.info("NM dispatcher drained");
+
+      // Wait for the resync thread to finish
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      }
+      LOG.info("Barrier wait done for the resync thread");
+
+      // Resync should only happen once
+      Assert.assertEquals(2, nm.getNMRegistrationCount());
+      Assert.assertFalse("NM shutdown called.", isNMShutdownCalled.get());
+    } finally {
+      nm.stop();
+    }
+  }
+
+  @SuppressWarnings("resource")
   @Test(timeout=10000)
   public void testNMshutdownWhenResyncThrowException() throws IOException,
       InterruptedException, YarnException {
@@ -400,6 +440,11 @@ public class TestNodeManagerResync {
     }
 
     @Override
+    protected AsyncDispatcher createNMDispatcher() {
+      return new DrainDispatcher();
+    }
+
+    @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
       return new TestNodeStatusUpdaterImpl1(context, dispatcher,
@@ -410,6 +455,14 @@ public class TestNodeManagerResync {
       return registrationCount;
     }
 
+    @Override
+    protected void shutDown(int exitCode) {
+      synchronized (isNMShutdownCalled) {
+        isNMShutdownCalled.set(true);
+        isNMShutdownCalled.notify();
+      }
+    }
+
     class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater {
 
       public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher,
@@ -428,6 +481,9 @@ public class TestNodeManagerResync {
         ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
         .containermanager.container.Container> containers =
             getNMContext().getContainers();
+        if (resyncThreadCount.incrementAndGet() > 1) {
+          throw new YarnRuntimeException("Multiple resync thread created!");
+        }
         try {
           try {
             if (containersShouldBePreserved) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message