hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject hadoop git commit: HADOOP-14503. Make RollingAverages a mutable metric. Contributed by Hanisha Koneru.
Date Fri, 16 Jun 2017 23:41:31 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 182e98c70 -> 9529513f1


HADOOP-14503. Make RollingAverages a mutable metric. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9529513f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9529513f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9529513f

Branch: refs/heads/branch-2
Commit: 9529513f18c54deb5376f96ac91a13d2b681c8e6
Parents: 182e98c
Author: Arpit Agarwal <arp@apache.org>
Authored: Fri Jun 16 16:41:08 2017 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Fri Jun 16 16:41:08 2017 -0700

----------------------------------------------------------------------
 .../hadoop/metrics2/lib/MetricsRegistry.java    |   9 +
 .../metrics2/lib/MutableMetricsFactory.java     |   4 +
 .../metrics2/lib/MutableRollingAverages.java    | 282 ++++++++++++++++++
 .../hadoop/metrics2/lib/RollingAverages.java    | 288 -------------------
 .../hadoop/metrics2/lib/MetricsTestHelper.java  |  49 ++++
 .../lib/TestMutableRollingAverages.java         | 191 ++++++++++++
 .../metrics2/lib/TestRollingAverages.java       | 124 --------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  12 -
 .../hadoop/hdfs/server/datanode/DataNode.java   |   2 +-
 .../datanode/metrics/DataNodePeerMetrics.java   |  42 +--
 .../datanode/TestDataNodePeerMetrics.java       |  10 +-
 .../TestDataNodeOutlierDetectionViaMetrics.java |  18 +-
 .../hadoop/tools/TestHdfsConfigFields.java      |   4 -
 13 files changed, 568 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
index 531231c..bdb607d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
@@ -314,6 +314,15 @@ public class MetricsRegistry {
     return rates;
   }
 
+  public synchronized MutableRollingAverages newMutableRollingAverages(
+      String name, String valueName) {
+    checkMetricName(name);
+    MutableRollingAverages rollingAverages =
+        new MutableRollingAverages(valueName);
+    metricsMap.put(name, rollingAverages);
+    return rollingAverages;
+  }
+
   synchronized void add(String name, MutableMetric metric) {
     checkMetricName(name);
     metricsMap.put(name, metric);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
index b5f1799..9e7810a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
@@ -78,6 +78,10 @@ public class MutableMetricsFactory {
                               annotation.sampleName(), annotation.valueName(),
                               annotation.always());
     }
+    if (cls == MutableRollingAverages.class) {
+      return registry.newMutableRollingAverages(info.name(),
+          annotation.valueName());
+    }
     throw new MetricsException("Unsupported metric field "+ field.getName() +
                                " of type "+ field.getType().getName());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java
new file mode 100644
index 0000000..1713ee7
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import javax.annotation.Nullable;
+
+import static org.apache.hadoop.metrics2.lib.Interns.*;
+
+/**
+ * <p>
+ * This class maintains a group of rolling average metrics. It implements the
+ * algorithm of rolling average, i.e. a number of sliding windows are kept to
+ * roll over and evict old subsets of samples. Each window has a subset of
+ * samples in a stream, where sub-sum and sub-total are collected. All sub-sums
+ * and sub-totals in all windows will be aggregated to final-sum and final-total
+ * used to compute final average, which is called rolling average.
+ * </p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MutableRollingAverages extends MutableMetric implements Closeable {
+
+  private MutableRatesWithAggregation innerMetrics =
+      new MutableRatesWithAggregation();
+
+  @VisibleForTesting
+  static final ScheduledExecutorService SCHEDULER = Executors
+      .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("MutableRollingAverages-%d").build());
+
+  private ScheduledFuture<?> scheduledTask = null;
+
+  @Nullable
+  private Map<String, MutableRate> currentSnapshot;
+
+  private final String avgInfoNameTemplate;
+  private final String avgInfoDescTemplate;
+  private int numWindows;
+
+  private static class SumAndCount {
+    private final double sum;
+    private final long count;
+
+    SumAndCount(final double sum, final long count) {
+      this.sum = sum;
+      this.count = count;
+    }
+
+    public double getSum() {
+      return sum;
+    }
+
+    public long getCount() {
+      return count;
+    }
+  }
+
+  /**
+   * <p>
+   * key: metric name
+   * </p>
+   * <p>
+   * value: deque where sub-sums and sub-totals for sliding windows are
+   * maintained.
+   * </p>
+   */
+  private ConcurrentMap<String, LinkedBlockingDeque<SumAndCount>> averages =
+      new ConcurrentHashMap<>();
+
+  private static final long WINDOW_SIZE_MS_DEFAULT = 300_000;
+  private static final int NUM_WINDOWS_DEFAULT = 36;
+
+  /**
+   * Constructor for {@link MutableRollingAverages}.
+   * @param metricValueName
+   */
+  public MutableRollingAverages(String metricValueName) {
+    if (metricValueName == null) {
+      metricValueName = "";
+    }
+    avgInfoNameTemplate = "[%s]" + "RollingAvg" +
+        StringUtils.capitalize(metricValueName);
+    avgInfoDescTemplate = "Rolling average " +
+        StringUtils.uncapitalize(metricValueName) +" for "+ "%s";
+    numWindows = NUM_WINDOWS_DEFAULT;
+    scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
+        WINDOW_SIZE_MS_DEFAULT, WINDOW_SIZE_MS_DEFAULT, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * This method is for testing only to replace the scheduledTask.
+   */
+  @VisibleForTesting
+  synchronized void replaceScheduledTask(int windows, long interval,
+                                         TimeUnit timeUnit) {
+    numWindows = windows;
+    scheduledTask.cancel(true);
+    scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
+        interval, interval, timeUnit);
+  }
+
+  @Override
+  public void snapshot(MetricsRecordBuilder builder, boolean all) {
+    if (all || changed()) {
+      for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+          : averages.entrySet()) {
+        final String name = entry.getKey();
+        final MetricsInfo avgInfo = info(
+            String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
+            String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
+        double totalSum = 0;
+        long totalCount = 0;
+
+        for (final SumAndCount sumAndCount : entry.getValue()) {
+          totalCount += sumAndCount.getCount();
+          totalSum += sumAndCount.getSum();
+        }
+
+        if (totalCount != 0) {
+          builder.addGauge(avgInfo, totalSum / totalCount);
+        }
+      }
+      if (changed()) {
+        clearChanged();
+      }
+    }
+  }
+
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  public void collectThreadLocalStates() {
+    innerMetrics.collectThreadLocalStates();
+  }
+
+  /**
+   * @param name
+   *          name of metric
+   * @param value
+   *          value of metric
+   */
+  public void add(final String name, final long value) {
+    innerMetrics.add(name, value);
+  }
+
+  private static class RatesRoller implements Runnable {
+    private final MutableRollingAverages parent;
+
+    RatesRoller(final MutableRollingAverages parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void run() {
+      synchronized (parent) {
+        final MetricsCollectorImpl mc = new MetricsCollectorImpl();
+        final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
+        /**
+         * snapshot all metrics regardless of being changed or not, in case no
+         * ops since last snapshot, we will get 0.
+         */
+        parent.innerMetrics.snapshot(rb, true);
+        Preconditions.checkState(mc.getRecords().size() == 1,
+            "There must be only one record and it's named with 'RatesRoller'");
+
+        parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
+        parent.rollOverAvgs();
+      }
+      parent.setChanged();
+    }
+  }
+
+  /**
+   * Iterates over snapshot to capture all Avg metrics into rolling structure
+   * {@link MutableRollingAverages#averages}.
+   * <p>
+   * This function is not thread safe, callers should ensure thread safety.
+   * </p>
+   */
+  private synchronized void rollOverAvgs() {
+    if (currentSnapshot == null) {
+      return;
+    }
+
+    for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
+      final MutableRate rate = entry.getValue();
+
+      LinkedBlockingDeque<SumAndCount> deque = averages.get(entry.getKey());
+      if (deque == null) {
+        deque = new LinkedBlockingDeque<>(numWindows);
+        averages.put(entry.getKey(), deque);
+      }
+
+      final SumAndCount sumAndCount = new SumAndCount(
+          rate.lastStat().total(),
+          rate.lastStat().numSamples());
+      /* put newest sum and count to the end */
+      if (!deque.offerLast(sumAndCount)) {
+        deque.pollFirst();
+        deque.offerLast(sumAndCount);
+      }
+    }
+
+    setChanged();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (scheduledTask != null) {
+      scheduledTask.cancel(false);
+    }
+    scheduledTask = null;
+  }
+
+  /**
+   * Retrieve a map of metric name -> (aggregate).
+   * Filter out entries that don't have at least minSamples.
+   *
+   * @return a map of peer DataNode Id to the average latency to that
+   *         node seen over the measurement period.
+   */
+  public synchronized Map<String, Double> getStats(long minSamples) {
+    final Map<String, Double> stats = new HashMap<>();
+
+    for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+        : averages.entrySet()) {
+      final String name = entry.getKey();
+      double totalSum = 0;
+      long totalCount = 0;
+
+      for (final SumAndCount sumAndCount : entry.getValue()) {
+        totalCount += sumAndCount.getCount();
+        totalSum += sumAndCount.getSum();
+      }
+
+      if (totalCount > minSamples) {
+        stats.put(name, totalSum / totalCount);
+      }
+    }
+    return stats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
deleted file mode 100644
index aae6b5d..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.lib;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.metrics2.MetricsInfo;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import javax.annotation.Nullable;
-
-import static org.apache.hadoop.metrics2.lib.Interns.*;
-
-/**
- * <p>
- * This class maintains a group of rolling average metrics. It implements the
- * algorithm of rolling average, i.e. a number of sliding windows are kept to
- * roll over and evict old subsets of samples. Each window has a subset of
- * samples in a stream, where sub-sum and sub-total are collected. All sub-sums
- * and sub-totals in all windows will be aggregated to final-sum and final-total
- * used to compute final average, which is called rolling average.
- * </p>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class RollingAverages extends MutableMetric implements Closeable {
-
-  private final MutableRatesWithAggregation innerMetrics =
-      new MutableRatesWithAggregation();
-
-  private static final ScheduledExecutorService SCHEDULER = Executors
-      .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
-          .setNameFormat("RollingAverages-%d").build());
-
-  private ScheduledFuture<?> scheduledTask = null;
-
-  @Nullable
-  private Map<String, MutableRate> currentSnapshot;
-
-  private final int numWindows;
-  private final String avgInfoNameTemplate;
-  private final String avgInfoDescTemplate;
-
-  private static class SumAndCount {
-    private final double sum;
-    private final long count;
-
-    public SumAndCount(final double sum, final long count) {
-      this.sum = sum;
-      this.count = count;
-    }
-
-    public double getSum() {
-      return sum;
-    }
-
-    public long getCount() {
-      return count;
-    }
-  }
-
-  /**
-   * <p>
-   * key: metric name
-   * </p>
-   * <p>
-   * value: deque where sub-sums and sub-totals for sliding windows are
-   * maintained.
-   * </p>
-   */
-  private ConcurrentMap<String, LinkedBlockingDeque<SumAndCount>> averages =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Constructor of {@link RollingAverages}.
-   * @param windowSizeMs
-   *          The number of milliseconds of each window for which subset
-   *          of samples are gathered to compute the rolling average, A.K.A.
-   *          roll over interval.
-   * @param numWindows
-   *          The number of windows maintained to compute the rolling average.
-   * @param valueName
-   *          of the metric (e.g. "Time", "Latency")
-   */
-  public RollingAverages(
-      final long windowSizeMs,
-      final int numWindows,
-      final String valueName) {
-    String uvName = StringUtils.capitalize(valueName);
-    String lvName = StringUtils.uncapitalize(valueName);
-    avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName;
-    avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
-    this.numWindows = numWindows;
-    scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
-        windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Constructor of {@link RollingAverages}.
-   * @param windowSizeMs
-   *          The number of seconds of each window for which sub set of samples
-   *          are gathered to compute rolling average, also A.K.A roll over
-   *          interval.
-   * @param numWindows
-   *          The number of windows maintained in the same time to compute the
-   *          average of the rolling averages.
-   */
-  public RollingAverages(
-      final long windowSizeMs,
-      final int numWindows) {
-    this(windowSizeMs, numWindows, "Time");
-  }
-
-  @Override
-  public void snapshot(MetricsRecordBuilder builder, boolean all) {
-    if (all || changed()) {
-      for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
-          : averages.entrySet()) {
-        final String name = entry.getKey();
-        final MetricsInfo avgInfo = info(
-            String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
-            String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
-        double totalSum = 0;
-        long totalCount = 0;
-
-        for (final SumAndCount sumAndCount : entry.getValue()) {
-          totalCount += sumAndCount.getCount();
-          totalSum += sumAndCount.getSum();
-        }
-
-        if (totalCount != 0) {
-          builder.addGauge(avgInfo, totalSum / totalCount);
-        }
-      }
-      if (changed()) {
-        clearChanged();
-      }
-    }
-  }
-
-  /**
-   * Collects states maintained in {@link ThreadLocal}, if any.
-   */
-  public void collectThreadLocalStates() {
-    innerMetrics.collectThreadLocalStates();
-  }
-
-  /**
-   * @param name
-   *          name of metric
-   * @param value
-   *          value of metric
-   */
-  public void add(final String name, final long value) {
-    innerMetrics.add(name, value);
-  }
-
-  private static class RatesRoller implements Runnable {
-    private final RollingAverages parent;
-
-    public RatesRoller(final RollingAverages parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    public void run() {
-      synchronized (parent) {
-        final MetricsCollectorImpl mc = new MetricsCollectorImpl();
-        final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
-        /**
-         * snapshot all metrics regardless of being changed or not, in case no
-         * ops since last snapshot, we will get 0.
-         */
-        parent.innerMetrics.snapshot(rb, true);
-        Preconditions.checkState(mc.getRecords().size() == 1,
-            "There must be only one record and it's named with 'RatesRoller'");
-
-        parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
-        parent.rollOverAvgs();
-      }
-      parent.setChanged();
-    }
-  }
-
-  /**
-   * Iterates over snapshot to capture all Avg metrics into rolling structure
-   * {@link RollingAverages#averages}.
-   * <p>
-   * This function is not thread safe, callers should ensure thread safety.
-   * </p>
-   */
-  private synchronized void rollOverAvgs() {
-    if (currentSnapshot == null) {
-      return;
-    }
-
-    for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
-      final MutableRate rate = entry.getValue();
-
-      LinkedBlockingDeque<SumAndCount> deque = averages.get(entry.getKey());
-      if (deque == null) {
-        deque = new LinkedBlockingDeque<>(numWindows);
-        averages.put(entry.getKey(), deque);
-      }
-
-      final SumAndCount sumAndCount = new SumAndCount(
-          rate.lastStat().total(),
-          rate.lastStat().numSamples());
-      /* put newest sum and count to the end */
-      if (!deque.offerLast(sumAndCount)) {
-        deque.pollFirst();
-        deque.offerLast(sumAndCount);
-      }
-    }
-
-    setChanged();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (scheduledTask != null) {
-      scheduledTask.cancel(false);
-    }
-    scheduledTask = null;
-  }
-
-  /**
-   * Retrieve a map of metric name -> (aggregate).
-   * Filter out entries that don't have at least minSamples.
-   *
-   * @return a map of peer DataNode Id to the average latency to that
-   *         node seen over the measurement period.
-   */
-  public synchronized Map<String, Double> getStats(long minSamples) {
-    final Map<String, Double> stats = new HashMap<>();
-
-    for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
-        : averages.entrySet()) {
-      final String name = entry.getKey();
-      double totalSum = 0;
-      long totalCount = 0;
-
-      for (final SumAndCount sumAndCount : entry.getValue()) {
-        totalCount += sumAndCount.getCount();
-        totalSum += sumAndCount.getSum();
-      }
-
-      if (totalCount > minSamples) {
-        stats.put(name, totalSum / totalCount);
-      }
-    }
-    return stats;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/MetricsTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/MetricsTestHelper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/MetricsTestHelper.java
new file mode 100644
index 0000000..d3e06c6
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/MetricsTestHelper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A helper class that can provide test cases access to package-private
+ * methods.
+ */
+public final class MetricsTestHelper {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MetricsTestHelper.class);
+
+  private MetricsTestHelper() {
+    //not called
+  }
+
+  /**
+   * Replace the rolling averages windows for a
+   * {@link MutableRollingAverages} metric.
+   *
+   */
+  public static void replaceRollingAveragesScheduler(
+      MutableRollingAverages mutableRollingAverages,
+      int numWindows, long interval, TimeUnit timeUnit) {
+    mutableRollingAverages.replaceScheduledTask(
+        numWindows, interval, timeUnit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableRollingAverages.java
new file mode 100644
index 0000000..2ff4a4e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableRollingAverages.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.lib;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.mockito.Matchers.anyDouble;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * This class tests various cases of the algorithms implemented in
+ * {@link MutableRollingAverages}.
+ */
+public class TestMutableRollingAverages {
+
+  /**
+   * Tests if the results are correct if no samples are inserted, dry run of
+   * empty roll over.
+   */
+  @Test(timeout = 30000)
+  public void testRollingAveragesEmptyRollover() throws Exception {
+    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    /* 5s interval and 2 windows */
+    try (MutableRollingAverages rollingAverages =
+             new MutableRollingAverages("Time")) {
+      rollingAverages.replaceScheduledTask(2, 5, TimeUnit.SECONDS);
+      /* Check it initially */
+      rollingAverages.snapshot(rb, true);
+      verify(rb, never()).addGauge(
+          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+      verify(rb, never()).addGauge(
+          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+
+      /* sleep 6s longer than 5s interval to wait for rollover done */
+      Thread.sleep(6000);
+      rollingAverages.snapshot(rb, false);
+      verify(rb, never()).addGauge(
+          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
+      verify(rb, never()).addGauge(
+          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
+    }
+  }
+
+  /**
+   * Tests the case:
+   * <p>
+   * 5s interval and 2 sliding windows
+   * </p>
+   * <p>
+   * sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2,
+   * 2...2] and [3, 3...3]
+   * </p>
+   */
+  @Test(timeout = 30000)
+  public void testRollingAveragesRollover() throws Exception {
+    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    final String name = "foo2";
+    final int windowSizeMs = 5000; // 5s roll over interval
+    final int numOpsPerIteration = 1000;
+    try (MutableRollingAverages rollingAverages =
+             new MutableRollingAverages("Time")) {
+      rollingAverages.replaceScheduledTask(2, 5000, TimeUnit.MILLISECONDS);
+      /* Push values for three intervals */
+      final long start = Time.monotonicNow();
+      for (int i = 1; i <= 3; i++) {
+        /* insert value */
+        for (long j = 1; j <= numOpsPerIteration; j++) {
+          rollingAverages.add(name, i);
+        }
+
+        /**
+         * Sleep until 1s after the next windowSize seconds interval, to let the
+         * metrics roll over
+         */
+        final long sleep = (start + (windowSizeMs * i) + 1000)
+            - Time.monotonicNow();
+        Thread.sleep(sleep);
+
+        /* Verify that the window reset, check it has the values we pushed in */
+        rollingAverages.snapshot(rb, false);
+
+        /*
+         * #1 window with a series of 1 1000
+         * times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2],
+         * #3 window, e.g. [3, 3...3]
+         */
+        final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0)
+            + numOpsPerIteration * i;
+        /* one empty window or all 2 windows full */
+        final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
+            : numOpsPerIteration;
+        verify(rb).addGauge(
+            info("[Foo2]RollingAvgTime", "Rolling average time for foo2"),
+            rollingSum / rollingTotal);
+
+        /* Verify the metrics were added the right number of times */
+        verify(rb, times(i)).addGauge(
+            eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")),
+            anyDouble());
+      }
+    }
+  }
+
+  /**
+   * Test that MutableRollingAverages gives expected results after
+   * initialization.
+   * @throws Exception
+   */
+  @Test(timeout = 30000)
+  public void testMutableRollingAveragesMetric() throws Exception {
+    final DummyTestMetric testMetric = new DummyTestMetric();
+    testMetric.create();
+
+    testMetric.add("metric1", 100);
+    testMetric.add("metric1", 900);
+    testMetric.add("metric2", 1000);
+    testMetric.add("metric2", 1000);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        testMetric.collectThreadLocalStates();
+        return testMetric.getStats().size() > 0;
+      }
+    }, 500, 5000);
+
+    MetricsRecordBuilder rb = getMetrics(DummyTestMetric.METRIC_NAME);
+
+    double metric1Avg = getDoubleGauge("[Metric1]RollingAvgTesting", rb);
+    double metric2Avg = getDoubleGauge("[Metric2]RollingAvgTesting", rb);
+    Assert.assertTrue("The rolling average of metric1 is not as expected",
+        metric1Avg == 500.0);
+    Assert.assertTrue("The rolling average of metric2 is not as expected",
+        metric2Avg == 1000.0);
+
+  }
+
+  class DummyTestMetric {
+    @Metric (valueName = "testing")
+    private MutableRollingAverages rollingAverages;
+
+    static final String METRIC_NAME = "RollingAveragesTestMetric";
+
+    protected void create() {
+      DefaultMetricsSystem.instance().register(METRIC_NAME,
+          "mutable rolling averages test", this);
+      rollingAverages.replaceScheduledTask(10, 1000, TimeUnit.MILLISECONDS);
+    }
+
+    void add(String name, long latency) {
+      rollingAverages.add(name, latency);
+    }
+
+    void collectThreadLocalStates() {
+      rollingAverages.collectThreadLocalStates();
+    }
+
+    Map<String, Double> getStats() {
+      return rollingAverages.getStats(0);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
deleted file mode 100644
index 44202e7..0000000
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.lib;
-
-import static org.apache.hadoop.metrics2.lib.Interns.info;
-import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Matchers.anyDouble;
-import static org.mockito.Matchers.eq;
-
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.util.Time;
-import org.junit.Test;
-
-/**
- * This class tests various cases of the algorithms implemented in
- * {@link RollingAverages}.
- */
-public class TestRollingAverages {
-  /**
-   * Tests if the results are correct if no samples are inserted, dry run of
-   * empty roll over.
-   */
-  @Test(timeout = 30000)
-  public void testRollingAveragesEmptyRollover() throws Exception {
-    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
-    /* 5s interval and 2 windows */
-    try (final RollingAverages rollingAverages =
-             new RollingAverages(5000, 2)) {
-      /* Check it initially */
-      rollingAverages.snapshot(rb, true);
-      verify(rb, never()).addGauge(
-          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
-      verify(rb, never()).addGauge(
-          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
-
-      /* sleep 6s longer than 5s interval to wait for rollover done */
-      Thread.sleep(6000);
-      rollingAverages.snapshot(rb, false);
-      verify(rb, never()).addGauge(
-          info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
-      verify(rb, never()).addGauge(
-          info("BarAvgTime", "Rolling average time for bar"), (long) 0);
-    }
-  }
-
-  /**
-   * Tests the case:
-   * <p>
-   * 5s interval and 2 sliding windows
-   * </p>
-   * <p>
-   * sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2,
-   * 2...2] and [3, 3...3]
-   * </p>
-   */
-  @Test(timeout = 30000)
-  public void testRollingAveragesRollover() throws Exception {
-    final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
-    final String name = "foo2";
-    final int windowSizeMs = 5000; // 5s roll over interval
-    final int numWindows = 2;
-    final int numOpsPerIteration = 1000;
-    try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs,
-        numWindows)) {
-
-      /* Push values for three intervals */
-      final long start = Time.monotonicNow();
-      for (int i = 1; i <= 3; i++) {
-        /* insert value */
-        for (long j = 1; j <= numOpsPerIteration; j++) {
-          rollingAverages.add(name, i);
-        }
-
-        /**
-         * Sleep until 1s after the next windowSize seconds interval, to let the
-         * metrics roll over
-         */
-        final long sleep = (start + (windowSizeMs * i) + 1000)
-            - Time.monotonicNow();
-        Thread.sleep(sleep);
-
-        /* Verify that the window reset, check it has the values we pushed in */
-        rollingAverages.snapshot(rb, false);
-
-        /*
-         * #1 window with a series of 1 1000
-         * times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2],
-         * #3 window, e.g. [3, 3...3]
-         */
-        final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0)
-            + numOpsPerIteration * i;
-        /* one empty window or all 2 windows full */
-        final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
-            : numOpsPerIteration;
-        verify(rb).addGauge(
-            info("[Foo2]RollingAvgTime", "Rolling average time for foo2"),
-            rollingSum / rollingTotal);
-
-        /* Verify the metrics were added the right number of times */
-        verify(rb, times(i)).addGauge(
-            eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")),
-            anyDouble());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9da88d3..219c713 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -434,18 +434,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
   public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
 
-  // The following setting is not meant to be changed by administrators.
-  public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY =
-      "dfs.metrics.rolling.averages.window.length";
-  public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT =
-      300 * 1000;
-
-  // The following setting is not meant to be changed by administrators.
-  public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY =
-      "dfs.metrics.rolling.average.num.windows";
-  public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT =
-      36;
-
   public static final String  DFS_DATANODE_PEER_STATS_ENABLED_KEY =
       "dfs.datanode.peer.stats.enabled";
   public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 320a25e..9c96c00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1383,7 +1383,7 @@ public class DataNode extends ReconfigurableBase
 
     metrics = DataNodeMetrics.create(getConf(), getDisplayName());
     peerMetrics = dnConf.peerStatsEnabled ?
-        DataNodePeerMetrics.create(getConf(), getDisplayName()) : null;
+        DataNodePeerMetrics.create(getDisplayName()) : null;
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
     blockRecoveryWorker = new BlockRecoveryWorker(this);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
index 827bdd2..54b9559 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -22,16 +22,13 @@ package org.apache.hadoop.hdfs.server.datanode.metrics;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.metrics2.MetricsJsonBuilder;
-import org.apache.hadoop.metrics2.lib.RollingAverages;
+import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 
 /**
  * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
@@ -44,7 +41,7 @@ public class DataNodePeerMetrics {
   public static final Logger LOG = LoggerFactory.getLogger(
       DataNodePeerMetrics.class);
 
-  private final RollingAverages sendPacketDownstreamRollingAvgerages;
+  private final MutableRollingAverages sendPacketDownstreamRollingAverages;
 
   private final String name;
 
@@ -64,15 +61,11 @@ public class DataNodePeerMetrics {
   @VisibleForTesting
   static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000;
 
-  public DataNodePeerMetrics(
-      final String name,
-      final long windowSizeMs,
-      final int numWindows) {
+  public DataNodePeerMetrics(final String name) {
     this.name = name;
     this.slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_NODES,
         LOW_THRESHOLD_MS);
-    sendPacketDownstreamRollingAvgerages = new RollingAverages(
-        windowSizeMs, numWindows);
+    sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
   }
 
   public String name() {
@@ -82,23 +75,12 @@ public class DataNodePeerMetrics {
   /**
    * Creates an instance of DataNodePeerMetrics, used for registration.
    */
-  public static DataNodePeerMetrics create(Configuration conf, String dnName) {
+  public static DataNodePeerMetrics create(String dnName) {
     final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
         ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
         : dnName.replace(':', '-'));
 
-    final long windowSizeMs = conf.getTimeDuration(
-            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
-            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    final int numWindows = conf.getInt(
-        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
-        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT);
-
-    return new DataNodePeerMetrics(
-        name,
-        windowSizeMs,
-        numWindows);
+    return new DataNodePeerMetrics(name);
   }
 
   /**
@@ -112,7 +94,7 @@ public class DataNodePeerMetrics {
   public void addSendPacketDownstream(
       final String peerAddr,
       final long elapsedMs) {
-    sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
+    sendPacketDownstreamRollingAverages.add(peerAddr, elapsedMs);
   }
 
   /**
@@ -120,7 +102,7 @@ public class DataNodePeerMetrics {
    */
   public String dumpSendPacketDownstreamAvgInfoAsJson() {
     final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
-    sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
+    sendPacketDownstreamRollingAverages.snapshot(builder, true);
     return builder.toString();
   }
 
@@ -128,7 +110,7 @@ public class DataNodePeerMetrics {
    * Collects states maintained in {@link ThreadLocal}, if any.
    */
   public void collectThreadLocalStates() {
-    sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
+    sendPacketDownstreamRollingAverages.collectThreadLocalStates();
   }
 
   /**
@@ -139,10 +121,14 @@ public class DataNodePeerMetrics {
     // This maps the metric name to the aggregate latency.
     // The metric name is the datanode ID.
     final Map<String, Double> stats =
-        sendPacketDownstreamRollingAvgerages.getStats(
+        sendPacketDownstreamRollingAverages.getStats(
             MIN_OUTLIER_DETECTION_SAMPLES);
     LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
 
     return slowNodeDetector.getOutliers(stats);
   }
+
+  public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
+    return sendPacketDownstreamRollingAverages;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
index b18ff2a..44601a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
+import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
@@ -42,16 +43,13 @@ public class TestDataNodePeerMetrics {
     final int numOpsPerIteration = 1000;
 
     final Configuration conf = new HdfsConfiguration();
-    conf.setTimeDuration(
-        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
-        windowSize, TimeUnit.SECONDS);
-    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
-        numWindows);
     conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
 
     final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
-        conf,
         "Sample-DataNode");
+    MetricsTestHelper.replaceRollingAveragesScheduler(
+        peerMetrics.getSendPacketDownstreamRollingAverages(),
+        numWindows, windowSize, TimeUnit.SECONDS);
     final long start = Time.monotonicNow();
     for (int i = 1; i <= iterations; i++) {
       final String peerAddr = genPeerAddress();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
index 85ac45b..75e09f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.metrics;
 
 import com.google.common.base.Supplier;
+import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.Before;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -73,8 +75,12 @@ public class TestDataNodeOutlierDetectionViaMetrics {
     final String slowNodeName = "SlowNode";
 
     final DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
-        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
-        ROLLING_AVERAGE_WINDOWS);
+        "PeerMetrics-For-Test");
+
+    MetricsTestHelper.replaceRollingAveragesScheduler(
+        peerMetrics.getSendPacketDownstreamRollingAverages(),
+        ROLLING_AVERAGE_WINDOWS,
+        WINDOW_INTERVAL_SECONDS, TimeUnit.SECONDS);
 
     injectFastNodesSamples(peerMetrics);
     injectSlowNodeSamples(peerMetrics, slowNodeName);
@@ -101,8 +107,12 @@ public class TestDataNodeOutlierDetectionViaMetrics {
   @Test
   public void testWithNoOutliers() throws Exception {
     DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
-        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
-        ROLLING_AVERAGE_WINDOWS);
+        "PeerMetrics-For-Test");
+
+    MetricsTestHelper.replaceRollingAveragesScheduler(
+        peerMetrics.getSendPacketDownstreamRollingAverages(),
+        ROLLING_AVERAGE_WINDOWS,
+        WINDOW_INTERVAL_SECONDS, TimeUnit.SECONDS);
 
     injectFastNodesSamples(peerMetrics);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9529513f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index ec54cec..347f130 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -68,10 +68,6 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
     // Purposely hidden, based on comments in DFSConfigKeys
     configurationPropsToSkipCompare
         .add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY);
-    configurationPropsToSkipCompare
-        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY);
-    configurationPropsToSkipCompare
-        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY);
 
     // Fully deprecated properties?
     configurationPropsToSkipCompare


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message