hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haiboc...@apache.org
Subject [5/5] hadoop git commit: YARN-7835. Race condition in NM while publishing events if second attempt is launched on the same node. (Rohith Sharma K S via Haibo Chen)
Date Thu, 01 Mar 2018 05:11:05 GMT
YARN-7835. Race condition in NM while publishing events if second attempt is launched on the
same node. (Rohith Sharma K S via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1274c3b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1274c3b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1274c3b

Branch: refs/heads/trunk
Commit: d1274c3b71549cb000868500c293cafd880b3713
Parents: 5e37ca5
Author: Haibo Chen <haibochen@apache.org>
Authored: Wed Feb 28 21:06:42 2018 -0800
Committer: Haibo Chen <haibochen@apache.org>
Committed: Wed Feb 28 21:10:42 2018 -0800

----------------------------------------------------------------------
 .../PerNodeTimelineCollectorsAuxService.java    | 51 +++++++++--
 ...TestPerNodeTimelineCollectorsAuxService.java | 93 ++++++++++++++++----
 2 files changed, 120 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1274c3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
index 66f9aab..c15f99d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -19,7 +19,12 @@
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -31,6 +36,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@@ -59,6 +65,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService
{
   private final NodeTimelineCollectorManager collectorManager;
   private long collectorLingerPeriod;
   private ScheduledExecutorService scheduler;
+  private Map<ApplicationId, Set<ContainerId>> appIdToContainerId =
+      new ConcurrentHashMap<>();
 
   public PerNodeTimelineCollectorsAuxService() {
     this(new NodeTimelineCollectorManager(true));
@@ -148,7 +156,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService
{
     if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
       ApplicationId appId = context.getContainerId().
           getApplicationAttemptId().getApplicationId();
-      addApplication(appId, context.getUser());
+      synchronized (appIdToContainerId) {
+        Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
+        if (masterContainers == null) {
+          masterContainers = new HashSet<>();
+          appIdToContainerId.put(appId, masterContainers);
+        }
+        masterContainers.add(context.getContainerId());
+        addApplication(appId, context.getUser());
+      }
     }
   }
 
@@ -162,17 +178,36 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService
{
     // intercept the event of the AM container being stopped and remove the app
     // level collector service
     if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
-      final ApplicationId appId =
-          context.getContainerId().getApplicationAttemptId().getApplicationId();
-      scheduler.schedule(new Runnable() {
-        public void run() {
-          removeApplication(appId);
-        }
-      }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
+      final ContainerId containerId = context.getContainerId();
+      removeApplicationCollector(containerId);
     }
   }
 
   @VisibleForTesting
+  protected Future removeApplicationCollector(final ContainerId containerId) {
+    final ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+    return scheduler.schedule(new Runnable() {
+      public void run() {
+        synchronized (appIdToContainerId) {
+          Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
+          if (masterContainers == null) {
+            LOG.info("Stop container for " + containerId
+                + " is called before initializing container.");
+            return;
+          }
+          masterContainers.remove(containerId);
+          if (masterContainers.size() == 0) {
+            // remove only if it is last master container
+            removeApplication(appId);
+            appIdToContainerId.remove(appId);
+          }
+        }
+      }
+    }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
+  }
+
+  @VisibleForTesting
   boolean hasApplication(ApplicationId appId) {
     return collectorManager.containsTimelineCollector(appId);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1274c3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
index cb9ced0..f27bf63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ExitUtil;
@@ -47,16 +48,17 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestPerNodeTimelineCollectorsAuxService {
   private ApplicationAttemptId appAttemptId;
   private PerNodeTimelineCollectorsAuxService auxService;
   private Configuration conf;
+  private ApplicationId appId;
 
   public TestPerNodeTimelineCollectorsAuxService() {
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
     appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     conf = new YarnConfiguration();
     // enable timeline service v.2
@@ -107,15 +109,6 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerType()).thenReturn(
         ContainerType.APPLICATION_MASTER);
     auxService.stopContainer(context);
-    // auxService should have the app's collector and need to remove only after
-    // a configured period
-    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
-    for (int i = 0; i < 4; i++) {
-      Thread.sleep(500L);
-      if (!auxService.hasApplication(appAttemptId.getApplicationId())) {
-        break;
-      }
-    }
 
     // auxService should not have that app
     assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
@@ -155,21 +148,53 @@ public class TestPerNodeTimelineCollectorsAuxService {
   private PerNodeTimelineCollectorsAuxService
       createCollectorAndAddApplication() {
     PerNodeTimelineCollectorsAuxService service = createCollector();
+
+    ContainerInitializationContext context =
+        createContainerInitalizationContext(1);
+    service.initializeContainer(context);
+    return service;
+  }
+
+  ContainerInitializationContext createContainerInitalizationContext(
+      int attempt) {
+    appAttemptId = ApplicationAttemptId.newInstance(appId, attempt);
     // create an AM container
     ContainerId containerId = getAMContainerId();
     ContainerInitializationContext context =
         mock(ContainerInitializationContext.class);
     when(context.getContainerId()).thenReturn(containerId);
-    when(context.getContainerType()).thenReturn(
-        ContainerType.APPLICATION_MASTER);
-    service.initializeContainer(context);
-    return service;
+    when(context.getContainerType())
+        .thenReturn(ContainerType.APPLICATION_MASTER);
+    return context;
+  }
+
+  ContainerTerminationContext createContainerTerminationContext(int attempt) {
+    appAttemptId = ApplicationAttemptId.newInstance(appId, attempt);
+    // create an AM container
+    ContainerId containerId = getAMContainerId();
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    when(context.getContainerType())
+        .thenReturn(ContainerType.APPLICATION_MASTER);
+    return context;
   }
 
   private PerNodeTimelineCollectorsAuxService createCollector() {
     NodeTimelineCollectorManager collectorManager = createCollectorManager();
     PerNodeTimelineCollectorsAuxService service =
-        spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
+        spy(new PerNodeTimelineCollectorsAuxService(collectorManager) {
+          @Override
+          protected Future removeApplicationCollector(ContainerId containerId) {
+            Future future = super.removeApplicationCollector(containerId);
+            try {
+              future.get();
+            } catch (Exception e) {
+              Assert.fail("Expeption thrown while removing collector");
+            }
+            return future;
+          }
+        });
     service.init(conf);
     service.start();
     return service;
@@ -200,4 +225,40 @@ public class TestPerNodeTimelineCollectorsAuxService {
   private ContainerId getContainerId(long id) {
     return ContainerId.newContainerId(appAttemptId, id);
   }
+
+  @Test(timeout = 60000)
+  public void testRemoveAppWhenSecondAttemptAMCotainerIsLaunchedSameNode()
+      throws Exception {
+    // add first attempt collector
+    auxService = createCollectorAndAddApplication();
+    // auxService should have a single app
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
+
+    // add second attempt collector before first attempt master container stop
+    ContainerInitializationContext containerInitalizationContext =
+        createContainerInitalizationContext(2);
+    auxService.initializeContainer(containerInitalizationContext);
+
+    assertTrue("Applicatin not found in collectors.",
+        auxService.hasApplication(appAttemptId.getApplicationId()));
+
+    // first attempt stop container
+    ContainerTerminationContext context = createContainerTerminationContext(1);
+    auxService.stopContainer(context);
+
+    // 2nd attempt container removed, still collector should hold application id
+    assertTrue("collector has removed application though 2nd attempt"
+            + " is running this node",
+        auxService.hasApplication(appAttemptId.getApplicationId()));
+
+    // second attempt stop container
+    context = createContainerTerminationContext(2);
+    auxService.stopContainer(context);
+
+    // auxService should not have that app
+    assertFalse("Application is not removed from collector",
+        auxService.hasApplication(appAttemptId.getApplicationId()));
+    auxService.close();
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message