hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException. Contributed by Zhihai Xu (cherry picked from commit fdf02d1f26cea372bf69e071f57b8bfc09c092c4)
Date Fri, 02 Oct 2015 20:24:58 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 b5d08e29e -> 3e3733437


YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException.
Contributed by Zhihai Xu
(cherry picked from commit fdf02d1f26cea372bf69e071f57b8bfc09c092c4)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3e373343
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3e373343
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3e373343

Branch: refs/heads/branch-2
Commit: 3e3733437fbfa94800e2cb2d77a2b5592b9a8add
Parents: b5d08e2
Author: Jason Lowe <jlowe@apache.org>
Authored: Fri Oct 2 20:09:13 2015 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Fri Oct 2 20:21:32 2015 +0000

----------------------------------------------------------------------
 .../hadoop/metrics2/impl/MetricsSystemImpl.java |  3 +-
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../hadoop/yarn/conf/YarnConfiguration.java     | 10 ++++-
 .../src/main/resources/yarn-default.xml         |  8 ++++
 .../monitor/ContainerMetrics.java               | 38 +++++++++++++----
 .../monitor/ContainersMonitorImpl.java          | 16 +++++--
 .../monitor/TestContainerMetrics.java           | 45 +++++++++++++++++++-
 7 files changed, 107 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e373343/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
index 02cec36..866c846 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
@@ -395,7 +395,8 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource
{
    * Sample all the sources for a snapshot of metrics/tags
    * @return  the metrics buffer containing the snapshot
    */
-  synchronized MetricsBuffer sampleMetrics() {
+  @VisibleForTesting
+  public synchronized MetricsBuffer sampleMetrics() {
     collector.clear();
     MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e373343/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a2ab598..0836c88 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -956,6 +956,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3727. For better error recovery, check if the directory exists before
     using it for localization. (Zhihai Xu via jlowe)
 
+    YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
+    ConcurrentModificationException (Zhihai Xu via jlowe)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e373343/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 128a286..848907f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1008,7 +1008,15 @@ public class YarnConfiguration extends Configuration {
       NM_PREFIX + "container-metrics.period-ms";
   @Private
   public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
-  
+
+  /** The delay time ms to unregister container metrics after completion. */
+  @Private
+  public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
+      NM_PREFIX + "container-metrics.unregister-delay-ms";
+  @Private
+  public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
+      10000;
+
   /** Prefix for all node manager disk health checker configs. */
   private static final String NM_DISK_HEALTH_CHECK_PREFIX =
       "yarn.nodemanager.disk-health-checker.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e373343/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bcd64c3..c8bca8e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1573,6 +1573,14 @@
 
   <property>
     <description>
+    The delay time ms to unregister container metrics after completion.
+    </description>
+    <name>yarn.nodemanager.container-metrics.unregister-delay-ms</name>
+    <value>10000</value>
+  </property>
+
+  <property>
+    <description>
     Class used to calculate current container resource utilization.
     </description>
     <name>yarn.nodemanager.container-monitor.process-tree.class</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e373343/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
index c364143..48128c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java
@@ -100,6 +100,7 @@ public class ContainerMetrics implements MetricsSource {
   private boolean flushOnPeriod = false; // true if period elapsed
   private boolean finished = false; // true if container finished
   private boolean unregister = false; // unregister
+  private long unregisterDelayMs;
   private Timer timer; // lazily initialized
 
   /**
@@ -107,15 +108,21 @@ public class ContainerMetrics implements MetricsSource {
    */
   protected final static Map<ContainerId, ContainerMetrics>
       usageMetrics = new HashMap<>();
+  // Create a timer to unregister container metrics,
+  // whose associated thread run as a daemon.
+  private final static Timer unregisterContainerMetricsTimer =
+      new Timer("Container metrics unregistration", true);
 
   ContainerMetrics(
-      MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+      MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
+      long delayMs) {
     this.recordInfo =
         info(sourceName(containerId), RECORD_INFO.description());
     this.registry = new MetricsRegistry(recordInfo);
     this.metricsSystem = ms;
     this.containerId = containerId;
     this.flushPeriodMs = flushPeriodMs;
+    this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs;
     scheduleTimerTaskIfRequired();
 
     this.pMemMBsStat = registry.newStat(
@@ -148,17 +155,18 @@ public class ContainerMetrics implements MetricsSource {
   }
 
   public static ContainerMetrics forContainer(
-      ContainerId containerId, long flushPeriodMs) {
+      ContainerId containerId, long flushPeriodMs, long delayMs) {
     return forContainer(
-        DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
+        DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
   }
 
   synchronized static ContainerMetrics forContainer(
-      MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
+      MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
+      long delayMs) {
     ContainerMetrics metrics = usageMetrics.get(containerId);
     if (metrics == null) {
-      metrics = new ContainerMetrics(
-          ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
+      metrics = new ContainerMetrics(ms, containerId, flushPeriodMs,
+          delayMs).tag(RECORD_INFO, containerId);
 
       // Register with the MetricsSystems
       if (ms != null) {
@@ -172,12 +180,15 @@ public class ContainerMetrics implements MetricsSource {
     return metrics;
   }
 
+  synchronized static void unregisterContainerMetrics(ContainerMetrics cm) {
+    cm.metricsSystem.unregisterSource(cm.recordInfo.name());
+    usageMetrics.remove(cm.containerId);
+  }
+
   @Override
   public synchronized void getMetrics(MetricsCollector collector, boolean all) {
     //Container goes through registered -> finished -> unregistered.
     if (unregister) {
-      metricsSystem.unregisterSource(recordInfo.name());
-      usageMetrics.remove(containerId);
       return;
     }
 
@@ -199,6 +210,7 @@ public class ContainerMetrics implements MetricsSource {
       timer.cancel();
       timer = null;
     }
+    scheduleTimerTaskForUnregistration();
   }
 
   public void recordMemoryUsage(int memoryMBs) {
@@ -252,4 +264,14 @@ public class ContainerMetrics implements MetricsSource {
       timer.schedule(timerTask, flushPeriodMs);
     }
   }
+
+  private void scheduleTimerTaskForUnregistration() {
+    TimerTask timerTask = new TimerTask() {
+      @Override
+      public void run() {
+        ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
+      }
+    };
+    unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e373343/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index b3839d2..82ad53e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -55,6 +55,7 @@ public class ContainersMonitorImpl extends AbstractService implements
   private MonitoringThread monitoringThread;
   private boolean containerMetricsEnabled;
   private long containerMetricsPeriodMs;
+  private long containerMetricsUnregisterDelayMs;
 
   @VisibleForTesting
   final Map<ContainerId, ProcessTreeInfo> trackingContainers =
@@ -126,6 +127,9 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.containerMetricsPeriodMs =
         conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
             YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
+    this.containerMetricsUnregisterDelayMs = conf.getLong(
+        YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
+        YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
 
     long configuredPMemForContainers =
         NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
@@ -425,7 +429,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
                 if (containerMetricsEnabled) {
                   ContainerMetrics usageMetrics = ContainerMetrics
-                      .forContainer(containerId, containerMetricsPeriodMs);
+                      .forContainer(containerId, containerMetricsPeriodMs,
+                      containerMetricsUnregisterDelayMs);
                   usageMetrics.recordProcessId(pId);
                 }
               }
@@ -476,10 +481,12 @@ public class ContainersMonitorImpl extends AbstractService implements
             // Add usage to container metrics
             if (containerMetricsEnabled) {
               ContainerMetrics.forContainer(
-                  containerId, containerMetricsPeriodMs).recordMemoryUsage(
+                  containerId, containerMetricsPeriodMs,
+                  containerMetricsUnregisterDelayMs).recordMemoryUsage(
                   (int) (currentPmemUsage >> 20));
               ContainerMetrics.forContainer(
-                  containerId, containerMetricsPeriodMs).recordCpuUsage
+                  containerId, containerMetricsPeriodMs,
+                  containerMetricsUnregisterDelayMs).recordCpuUsage
                   ((int)cpuUsagePercentPerCore, milliVcoresUsed);
             }
 
@@ -609,7 +616,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
     ContainerId containerId = monitoringEvent.getContainerId();
     ContainerMetrics usageMetrics = ContainerMetrics
-        .forContainer(containerId, containerMetricsPeriodMs);
+        .forContainer(containerId, containerMetricsPeriodMs,
+        containerMetricsUnregisterDelayMs);
 
     int vmemLimitMBs;
     int pmemLimitMBs;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e373343/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
index bdf9994..2beb927 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java
@@ -22,11 +22,15 @@ import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 import org.apache.hadoop.metrics2.impl.MetricsRecords;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -44,7 +48,8 @@ public class TestContainerMetrics {
 
     MetricsCollectorImpl collector = new MetricsCollectorImpl();
     ContainerId containerId = mock(ContainerId.class);
-    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
+        100, 1);
 
     metrics.recordMemoryUsage(1024);
     metrics.getMetrics(collector, true);
@@ -82,7 +87,8 @@ public class TestContainerMetrics {
 
     MetricsCollectorImpl collector = new MetricsCollectorImpl();
     ContainerId containerId = mock(ContainerId.class);
-    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
+    ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
+        100, 1);
 
     int anyPmemLimit = 1024;
     int anyVmemLimit = 2048;
@@ -117,4 +123,39 @@ public class TestContainerMetrics {
 
     collector.clear();
   }
+
+  @Test
+  public void testContainerMetricsFinished() throws InterruptedException {
+    MetricsSystemImpl system = new MetricsSystemImpl();
+    system.init("test");
+    MetricsCollectorImpl collector = new MetricsCollectorImpl();
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 4);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    ContainerMetrics metrics1 = ContainerMetrics.forContainer(system,
+        containerId1, 1, 0);
+    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2);
+    ContainerMetrics metrics2 = ContainerMetrics.forContainer(system,
+        containerId2, 1, 0);
+    ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
+    ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
+        containerId3, 1, 0);
+    metrics1.finished();
+    metrics2.finished();
+    system.sampleMetrics();
+    system.sampleMetrics();
+    Thread.sleep(100);
+    system.stop();
+    // verify metrics1 is unregistered
+    assertTrue(metrics1 != ContainerMetrics.forContainer(
+        system, containerId1, 1, 0));
+    // verify metrics2 is unregistered
+    assertTrue(metrics2 != ContainerMetrics.forContainer(
+        system, containerId2, 1, 0));
+    // verify metrics3 is still registered
+    assertTrue(metrics3 == ContainerMetrics.forContainer(
+        system, containerId3, 1, 0));
+    system.shutdown();
+  }
 }


Mime
View raw message