hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fab...@apache.org
Subject [20/50] [abbrv] hadoop git commit: HDFS-11194. Maintain aggregated peer performance metrics on NameNode.
Date Tue, 31 Jan 2017 01:57:32 GMT
HDFS-11194. Maintain aggregated peer performance metrics on NameNode.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b57368b6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b57368b6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b57368b6

Branch: refs/heads/HADOOP-13345
Commit: b57368b6f893cb27d77fc9425e116f1312f4790f
Parents: 8528d85
Author: Arpit Agarwal <arp@apache.org>
Authored: Tue Jan 24 16:58:20 2017 -0800
Committer: Arpit Agarwal <arp@apache.org>
Committed: Tue Jan 24 16:58:20 2017 -0800

----------------------------------------------------------------------
 .../hadoop/metrics2/lib/RollingAverages.java    |  57 +++-
 .../metrics2/lib/TestRollingAverages.java       |  13 +-
 .../hdfs/server/protocol/SlowPeerReports.java   | 107 ++++++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  26 +-
 .../DatanodeProtocolClientSideTranslatorPB.java |   9 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   3 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  44 +++
 .../server/blockmanagement/DatanodeManager.java |  42 ++-
 .../server/blockmanagement/SlowPeerTracker.java | 273 +++++++++++++++
 .../hdfs/server/datanode/BPServiceActor.java    |  38 ++-
 .../hdfs/server/datanode/BlockReceiver.java     |  17 +-
 .../hadoop/hdfs/server/datanode/DNConf.java     |  11 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |   8 +-
 .../hdfs/server/datanode/DataXceiver.java       |   4 +-
 .../datanode/metrics/DataNodePeerMetrics.java   |  79 +++--
 .../datanode/metrics/SlowNodeDetector.java      | 194 +++++++++++
 .../hdfs/server/namenode/FSNamesystem.java      |   8 +-
 .../hadoop/hdfs/server/namenode/NameNode.java   |   6 +
 .../hdfs/server/namenode/NameNodeRpcServer.java |   9 +-
 .../server/namenode/NameNodeStatusMXBean.java   |   6 +
 .../hdfs/server/protocol/DatanodeProtocol.java  |   8 +-
 .../src/main/proto/DatanodeProtocol.proto       |  20 ++
 .../src/main/resources/hdfs-default.xml         |  18 +-
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |  22 ++
 .../blockmanagement/TestHeartbeatHandling.java  |  10 +
 .../TestNameNodePrunesMissingStorages.java      |   4 +-
 .../blockmanagement/TestSlowPeerTracker.java    | 226 +++++++++++++
 .../datanode/InternalDataNodeTestUtils.java     |   4 +-
 .../server/datanode/TestBPOfferService.java     |   6 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   4 +-
 .../datanode/TestBpServiceActorScheduler.java   |  30 +-
 .../server/datanode/TestDataNodeLifeline.java   |   7 +-
 .../datanode/TestDataNodePeerMetrics.java       |   8 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 .../server/datanode/TestFsDatasetCache.java     |   3 +-
 .../hdfs/server/datanode/TestStorageReport.java |   4 +-
 .../TestDataNodeOutlierDetectionViaMetrics.java | 142 ++++++++
 .../datanode/metrics/TestSlowNodeDetector.java  | 335 +++++++++++++++++++
 .../server/namenode/NNThroughputBenchmark.java  |   7 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   4 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   4 +-
 .../hadoop/tools/TestHdfsConfigFields.java      |   4 +
 42 files changed, 1721 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
index 06ae30d..4e3b73f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.metrics2.lib;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +40,9 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import javax.annotation.Nullable;
+
 import static org.apache.hadoop.metrics2.lib.Interns.*;
 
 /**
@@ -63,7 +67,10 @@ public class RollingAverages extends MutableMetric implements Closeable {
           .setNameFormat("RollingAverages-%d").build());
 
   private ScheduledFuture<?> scheduledTask = null;
+
+  @Nullable
   private Map<String, MutableRate> currentSnapshot;
+
   private final int numWindows;
   private final String avgInfoNameTemplate;
   private final String avgInfoDescTemplate;
@@ -100,31 +107,31 @@ public class RollingAverages extends MutableMetric implements Closeable {
 
   /**
    * Constructor of {@link RollingAverages}.
-   * @param windowSize
-   *          The number of seconds of each window for which sub set of samples
-   *          are gathered to compute the rolling average, A.K.A. roll over
-   *          interval.
+   * @param windowSizeMs
+   *          The number of milliseconds of each window for which subset
+   *          of samples are gathered to compute the rolling average, A.K.A.
+   *          roll over interval.
    * @param numWindows
    *          The number of windows maintained to compute the rolling average.
    * @param valueName
    *          of the metric (e.g. "Time", "Latency")
    */
   public RollingAverages(
-      final int windowSize,
+      final long windowSizeMs,
       final int numWindows,
       final String valueName) {
     String uvName = StringUtils.capitalize(valueName);
     String lvName = StringUtils.uncapitalize(valueName);
-    avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
+    avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName;
     avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
     this.numWindows = numWindows;
     scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
-        windowSize, windowSize, TimeUnit.SECONDS);
+        windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS);
   }
 
   /**
    * Constructor of {@link RollingAverages}.
-   * @param windowSize
+   * @param windowSizeMs
    *          The number of seconds of each window for which sub set of samples
    *          are gathered to compute rolling average, also A.K.A roll over
    *          interval.
@@ -133,9 +140,9 @@ public class RollingAverages extends MutableMetric implements Closeable {
    *          average of the rolling averages.
    */
   public RollingAverages(
-      final int windowSize,
+      final long windowSizeMs,
       final int numWindows) {
-    this(windowSize, numWindows, "Time");
+    this(windowSizeMs, numWindows, "Time");
   }
 
   @Override
@@ -213,7 +220,7 @@ public class RollingAverages extends MutableMetric implements Closeable {
    * Iterates over snapshot to capture all Avg metrics into rolling structure
    * {@link RollingAverages#averages}.
    */
-  private void rollOverAvgs() {
+  private synchronized void rollOverAvgs() {
     if (currentSnapshot == null) {
       return;
     }
@@ -248,4 +255,32 @@ public class RollingAverages extends MutableMetric implements Closeable {
     }
     scheduledTask = null;
   }
+
+  /**
+   * Retrieve a map of metric name -> (aggregate).
+   * Filter out entries that don't have at least minSamples.
+   *
+   * @return a map of peer DataNode Id to the average latency to that
+   *         node seen over the measurement period.
+   */
+  public synchronized Map<String, Double> getStats(long minSamples) {
+    final Map<String, Double> stats = new HashMap<>();
+
+    for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
+        : averages.entrySet()) {
+      final String name = entry.getKey();
+      double totalSum = 0;
+      long totalCount = 0;
+
+      for (final SumAndCount sumAndCount : entry.getValue()) {
+        totalCount += sumAndCount.getCount();
+        totalSum += sumAndCount.getSum();
+      }
+
+      if (totalCount > minSamples) {
+        stats.put(name, totalSum / totalCount);
+      }
+    }
+    return stats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
index 899d98c..44202e7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java
@@ -42,7 +42,8 @@ public class TestRollingAverages {
   public void testRollingAveragesEmptyRollover() throws Exception {
     final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
     /* 5s interval and 2 windows */
-    try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
+    try (final RollingAverages rollingAverages =
+             new RollingAverages(5000, 2)) {
       /* Check it initially */
       rollingAverages.snapshot(rb, true);
       verify(rb, never()).addGauge(
@@ -74,10 +75,10 @@ public class TestRollingAverages {
   public void testRollingAveragesRollover() throws Exception {
     final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
     final String name = "foo2";
-    final int windowSize = 5; // 5s roll over interval
+    final int windowSizeMs = 5000; // 5s roll over interval
     final int numWindows = 2;
     final int numOpsPerIteration = 1000;
-    try (RollingAverages rollingAverages = new RollingAverages(windowSize,
+    try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs,
         numWindows)) {
 
       /* Push values for three intervals */
@@ -92,7 +93,7 @@ public class TestRollingAverages {
          * Sleep until 1s after the next windowSize seconds interval, to let the
          * metrics roll over
          */
-        final long sleep = (start + (windowSize * 1000 * i) + 1000)
+        final long sleep = (start + (windowSizeMs * i) + 1000)
             - Time.monotonicNow();
         Thread.sleep(sleep);
 
@@ -110,12 +111,12 @@ public class TestRollingAverages {
         final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
             : numOpsPerIteration;
         verify(rb).addGauge(
-            info("Foo2RollingAvgTime", "Rolling average time for foo2"),
+            info("[Foo2]RollingAvgTime", "Rolling average time for foo2"),
             rollingSum / rollingTotal);
 
         /* Verify the metrics were added the right number of times */
         verify(rb, times(i)).addGauge(
-            eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
+            eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")),
             anyDouble());
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java
new file mode 100644
index 0000000..218e30d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * A class that allows a DataNode to communicate information about all
+ * its peer DataNodes that appear to be slow.
+ *
+ * The wire representation of this structure is a list of
+ * SlowPeerReportProto messages.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class SlowPeerReports {
+  /**
+   * A map from the DataNode's DataNodeUUID to its aggregate latency
+   * as seen by the reporting node.
+   *
+   * The exact choice of the aggregate is opaque to the NameNode but it
+   * should be chosen consistently by all DataNodes in the cluster.
+   * Examples of aggregates are 90th percentile (good) and mean (not so
+   * good).
+   *
+   * The NameNode must not attempt to interpret the aggregate latencies
+   * beyond exposing them as a diagnostic. e.g. metrics. Also, comparing
+   * latencies across reports from different DataNodes may not be not
+   * meaningful and must be avoided.
+   */
+  @Nonnull
+  private final Map<String, Double> slowPeers;
+
+  /**
+   * An object representing a SlowPeerReports with no entries. Should
+   * be used instead of null or creating new objects when there are
+   * no slow peers to report.
+   */
+  public static final SlowPeerReports EMPTY_REPORT =
+      new SlowPeerReports(ImmutableMap.of());
+
+  private SlowPeerReports(Map<String, Double> slowPeers) {
+    this.slowPeers = slowPeers;
+  }
+
+  public static SlowPeerReports create(
+      @Nullable Map<String, Double> slowPeers) {
+    if (slowPeers == null || slowPeers.isEmpty()) {
+      return EMPTY_REPORT;
+    }
+    return new SlowPeerReports(slowPeers);
+  }
+
+  public Map<String, Double> getSlowPeers() {
+    return slowPeers;
+  }
+
+  public boolean haveSlowPeers() {
+    return slowPeers.size() > 0;
+  }
+
+  /**
+   * Return true if the two objects represent the same set slow peer
+   * entries. Primarily for unit testing convenience.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof SlowPeerReports)) {
+      return false;
+    }
+
+    SlowPeerReports that = (SlowPeerReports) o;
+
+    return slowPeers.equals(that.slowPeers);
+  }
+
+  @Override
+  public int hashCode() {
+    return slowPeers.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8f60af0..3cc4b5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -457,14 +457,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_METRICS_SESSION_ID_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
   public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
-  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
-      "dfs.metrics.rolling.average.window.size";
-  public static final int     DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
-      3600;
-  public static final String  DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
-      "dfs.metrics.rolling.average.window.numbers";
-  public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
-      48;
+
+  // The following setting is not meant to be changed by administrators.
+  public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY =
+      "dfs.metrics.rolling.averages.window.length";
+  public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT =
+      "5m";
+
+  // The following setting is not meant to be changed by administrators.
+  public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY =
+      "dfs.metrics.rolling.average.num.windows";
+  public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT =
+      36;
+
   public static final String  DFS_DATANODE_PEER_STATS_ENABLED_KEY =
       "dfs.datanode.peer.stats.enabled";
   public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
@@ -669,6 +674,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
   public static final int     DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
 
+  public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY =
+      "dfs.datanode.slow.peers.report.interval";
+  public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT =
+      "30m";
+
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
   public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 79113dd..d9e6026 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -71,6 +72,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
+import javax.annotation.Nonnull;
+
 /**
  * This class is the client side translator to translate the requests made on
  * {@link DatanodeProtocol} interfaces to the RPC server implementing
@@ -132,7 +135,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       StorageReport[] reports, long cacheCapacity, long cacheUsed,
       int xmitsInProgress, int xceiverCount, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
-      boolean requestFullBlockReportLease) throws IOException {
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -149,6 +153,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
           volumeFailureSummary));
     }
+    if (slowPeers.haveSlowPeers()) {
+      builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
+    }
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 257adf9..b1c8e34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -120,7 +120,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           report, request.getCacheCapacity(), request.getCacheUsed(),
           request.getXmitsInProgress(),
           request.getXceiverCount(), request.getFailedVolumes(),
-          volumeFailureSummary, request.getRequestFullBlockReportLease());
+          volumeFailureSummary, request.getRequestFullBlockReportLease(),
+          PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index d97708f..69c3c83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.hdfs.protocolPB;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import com.google.protobuf.ByteString;
 
@@ -44,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeComm
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
@@ -107,6 +111,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 
 /**
@@ -829,6 +834,45 @@ public class PBHelper {
     return builder.build();
   }
 
+  public static List<SlowPeerReportProto> convertSlowPeerInfo(
+      SlowPeerReports slowPeers) {
+    if (slowPeers.getSlowPeers().size() == 0) {
+      return Collections.emptyList();
+    }
+
+    List<SlowPeerReportProto> slowPeerInfoProtos =
+        new ArrayList<>(slowPeers.getSlowPeers().size());
+    for (Map.Entry<String, Double> entry :
+        slowPeers.getSlowPeers().entrySet()) {
+      slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder()
+              .setDataNodeId(entry.getKey())
+              .setAggregateLatency(entry.getValue())
+              .build());
+    }
+    return slowPeerInfoProtos;
+  }
+
+  public static SlowPeerReports convertSlowPeerInfo(
+      List<SlowPeerReportProto> slowPeerProtos) {
+
+    // No slow peers, or possibly an older DataNode.
+    if (slowPeerProtos == null || slowPeerProtos.size() == 0) {
+      return SlowPeerReports.EMPTY_REPORT;
+    }
+
+    Map<String, Double> slowPeersMap = new HashMap<>(slowPeerProtos.size());
+    for (SlowPeerReportProto proto : slowPeerProtos) {
+      if (!proto.hasDataNodeId()) {
+        // The DataNodeId should be reported.
+        continue;
+      }
+      slowPeersMap.put(
+          proto.getDataNodeId(),
+          proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0);
+    }
+    return SlowPeerReports.create(slowPeersMap);
+  }
+
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index cc64a04..fed1864 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -50,7 +50,10 @@ import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Timer;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
@@ -172,6 +175,14 @@ public class DatanodeManager {
    */
   private final HashMap<String, Integer> datanodesSoftwareVersions =
     new HashMap<>(4, 0.75f);
+
+  /**
+   * True if we should process latency metrics from downstream peers.
+   */
+  private final boolean dataNodePeerStatsEnabled;
+
+  @Nullable
+  private final SlowPeerTracker slowPeerTracker;
   
   /**
    * The minimum time between resending caching directives to Datanodes,
@@ -194,6 +205,12 @@ public class DatanodeManager {
     this.decomManager = new DecommissionManager(namesystem, blockManager,
         heartbeatManager);
     this.fsClusterStats = newFSClusterStats();
+    this.dataNodePeerStatsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+
+    this.slowPeerTracker = dataNodePeerStatsEnabled ?
+        new SlowPeerTracker(conf, new Timer()) : null;
 
     this.defaultXferPort = NetUtils.createSocketAddr(
           conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
@@ -1566,7 +1583,8 @@ public class DatanodeManager {
       StorageReport[] reports, final String blockPoolId,
       long cacheCapacity, long cacheUsed, int xceiverCount, 
       int maxTransfers, int failedVolumes,
-      VolumeFailureSummary volumeFailureSummary) throws IOException {
+      VolumeFailureSummary volumeFailureSummary,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     final DatanodeDescriptor nodeinfo;
     try {
       nodeinfo = getDatanode(nodeReg);
@@ -1632,6 +1650,19 @@ public class DatanodeManager {
       nodeinfo.setBalancerBandwidth(0);
     }
 
+    if (slowPeerTracker != null) {
+      final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
+      if (!slowPeersMap.isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
+              slowPeersMap);
+        }
+        for (String slowNodeId : slowPeersMap.keySet()) {
+          slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
+        }
+      }
+    }
+
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }
@@ -1834,5 +1865,14 @@ public class DatanodeManager {
     this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
         DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
   }
+
+  /**
+   * Retrieve information about slow peers as a JSON.
+   * Returns null if we are not tracking slow peers.
+   * @return
+   */
+  public String getSlowPeersReport() {
+    return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
new file mode 100644
index 0000000..cf3a20c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * This class aggregates information from {@link SlowPeerReports} received via
+ * heartbeats.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SlowPeerTracker {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SlowPeerTracker.class);
+
+  /**
+   * Time duration after which a report is considered stale. This is
+   * set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e.
+   * maintained for at least two successive reports.
+   */
+  private final long reportValidityMs;
+
+  /**
+   * Timer object for querying the current time. Separated out for
+   * unit testing.
+   */
+  private final Timer timer;
+
+  /**
+   * Number of nodes to include in JSON report. We will return nodes with
+   * the highest number of votes from peers.
+   */
+  private static final int MAX_NODES_TO_REPORT = 5;
+
+  /**
+   * Information about peers that have reported a node as being slow.
+   * Each outer map entry is a map of (DatanodeId) -> (timestamp),
+   * mapping reporting nodes to the timestamp of the last report from
+   * that node.
+   *
+   * DatanodeId could be the DataNodeId or its address. We
+   * don't care as long as the caller uses it consistently.
+   *
+   * Stale reports are not evicted proactively and can potentially
+   * hang around forever.
+   */
+  private final ConcurrentMap<String, ConcurrentMap<String, Long>>
+      allReports;
+
+  public SlowPeerTracker(Configuration conf, Timer timer) {
+    this.timer = timer;
+    this.allReports = new ConcurrentHashMap<>();
+    this.reportValidityMs = conf.getTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS) * 3;
+  }
+
+  /**
+   * Add a new report. DatanodeIds can be the DataNodeIds or addresses
+   * We don't care as long as the caller is consistent.
+   *
+   * @param reportingNode DataNodeId of the node reporting on its peer.
+   * @param slowNode DataNodeId of the peer suspected to be slow.
+   */
+  public void addReport(String slowNode,
+                        String reportingNode) {
+    ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
+
+    if (nodeEntries == null) {
+      // putIfAbsent guards against multiple writers.
+      allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>());
+      nodeEntries = allReports.get(slowNode);
+    }
+
+    // Replace the existing entry from this node, if any.
+    nodeEntries.put(reportingNode, timer.monotonicNow());
+  }
+
+  /**
+   * Retrieve the non-expired reports that mark a given DataNode
+   * as slow. Stale reports are excluded.
+   *
+   * @param slowNode target node Id.
+   * @return set of reports which implicate the target node as being slow.
+   */
+  public Set<String> getReportsForNode(String slowNode) {
+    final ConcurrentMap<String, Long> nodeEntries =
+        allReports.get(slowNode);
+
+    if (nodeEntries == null || nodeEntries.isEmpty()) {
+      return Collections.emptySet();
+    }
+
+    return filterNodeReports(nodeEntries, timer.monotonicNow());
+  }
+
+  /**
+   * Retrieve all reports for all nodes. Stale reports are excluded.
+   *
+   * @return map from SlowNodeId -> (set of nodes reporting peers).
+   */
+  public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
+    if (allReports.isEmpty()) {
+      return ImmutableMap.of();
+    }
+
+    final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
+    final long now = timer.monotonicNow();
+
+    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
+        allReports.entrySet()) {
+      SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
+      if (!validReports.isEmpty()) {
+        allNodesValidReports.put(entry.getKey(), validReports);
+      }
+    }
+    return allNodesValidReports;
+  }
+
+  /**
+   * Filter the given reports to return just the valid ones.
+   *
+   * @param reports
+   * @param now
+   * @return
+   */
+  private SortedSet<String> filterNodeReports(
+      ConcurrentMap<String, Long> reports, long now) {
+    final SortedSet<String> validReports = new TreeSet<>();
+
+    for (Map.Entry<String, Long> entry : reports.entrySet()) {
+      if (now - entry.getValue() < reportValidityMs) {
+        validReports.add(entry.getKey());
+      }
+    }
+    return validReports;
+  }
+
+  /**
+   * Retrieve all valid reports as a JSON string.
+   * @return serialized representation of valid reports. null if
+   *         serialization failed.
+   */
+  public String getJson() {
+    Collection<ReportForJson> validReports = getJsonReports(
+        MAX_NODES_TO_REPORT);
+    ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      return objectMapper.writeValueAsString(validReports);
+    } catch (JsonProcessingException e) {
+      // Failed to serialize. Don't log the exception call stack.
+      LOG.debug("Failed to serialize statistics" + e);
+      return null;
+    }
+  }
+
+  /**
+   * This structure is a thin wrapper over reports to make Json
+   * [de]serialization easy.
+   */
+  public static class ReportForJson {
+    @JsonProperty("SlowNode")
+    final private String slowNode;
+
+    @JsonProperty("ReportingNodes")
+    final private SortedSet<String> reportingNodes;
+
+    public ReportForJson(
+        @JsonProperty("SlowNode") String slowNode,
+        @JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
+      this.slowNode = slowNode;
+      this.reportingNodes = reportingNodes;
+    }
+
+    public String getSlowNode() {
+      return slowNode;
+    }
+
+    public SortedSet<String> getReportingNodes() {
+      return reportingNodes;
+    }
+  }
+
+  /**
+   * Retrieve reports in a structure for generating JSON, limiting the
+   * output to the top numNodes nodes i.e nodes with the most reports.
+   * @param numNodes number of nodes to return. This is to limit the
+   *                 size of the generated JSON.
+   */
+  private Collection<ReportForJson> getJsonReports(int numNodes) {
+    if (allReports.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final PriorityQueue<ReportForJson> topNReports =
+        new PriorityQueue<>(allReports.size(),
+            new Comparator<ReportForJson>() {
+          @Override
+          public int compare(ReportForJson o1, ReportForJson o2) {
+            return Ints.compare(o1.reportingNodes.size(),
+                o2.reportingNodes.size());
+          }
+        });
+
+    final long now = timer.monotonicNow();
+
+    for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
+        allReports.entrySet()) {
+      SortedSet<String> validReports = filterNodeReports(
+          entry.getValue(), now);
+      if (!validReports.isEmpty()) {
+        if (topNReports.size() < numNodes) {
+          topNReports.add(new ReportForJson(entry.getKey(), validReports));
+        } else if (topNReports.peek().getReportingNodes().size() <
+            validReports.size()){
+          // Remove the lowest element
+          topNReports.poll();
+          topNReports.add(new ReportForJson(entry.getKey(), validReports));
+        }
+      }
+    }
+    return topNReports;
+  }
+
+  @VisibleForTesting
+  long getReportValidityMs() {
+    return reportValidityMs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 5294799..644a8ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -127,7 +128,8 @@ class BPServiceActor implements Runnable {
     this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
     scheduler = new Scheduler(dnConf.heartBeatInterval,
-        dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
+        dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
+        dnConf.slowPeersReportIntervalMs);
     // get the value of maxDataLength.
     this.maxDataLength = dnConf.getMaxDataLength();
   }
@@ -489,12 +491,18 @@ class BPServiceActor implements Runnable {
                 " storage reports from service actor: " + this);
     }
     
-    scheduler.updateLastHeartbeatTime(monotonicNow());
+    final long now = monotonicNow();
+    scheduler.updateLastHeartbeatTime(now);
     VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
         .getVolumeFailureSummary();
     int numFailedVolumes = volumeFailureSummary != null ?
         volumeFailureSummary.getFailedStorageLocations().length : 0;
-    return bpNamenode.sendHeartbeat(bpRegistration,
+    final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
+    final SlowPeerReports slowPeers =
+        slowPeersReportDue && dn.getPeerMetrics() != null ?
+            SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
+            SlowPeerReports.EMPTY_REPORT;
+    HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
         dn.getFSDataset().getCacheUsed(),
@@ -502,7 +510,14 @@ class BPServiceActor implements Runnable {
         dn.getXceiverCount(),
         numFailedVolumes,
         volumeFailureSummary,
-        requestBlockReportLease);
+        requestBlockReportLease,
+        slowPeers);
+
+    if (slowPeersReportDue) {
+      // If the report was due and successfully sent, schedule the next one.
+      scheduler.scheduleNextSlowPeerReport();
+    }
+    return response;
   }
 
   @VisibleForTesting
@@ -1079,18 +1094,23 @@ class BPServiceActor implements Runnable {
     @VisibleForTesting
     boolean resetBlockReportTime = true;
 
+    @VisibleForTesting
+    volatile long nextSlowPeersReportTime = monotonicNow();
+
     private final AtomicBoolean forceFullBlockReport =
         new AtomicBoolean(false);
 
     private final long heartbeatIntervalMs;
     private final long lifelineIntervalMs;
     private final long blockReportIntervalMs;
+    private final long slowPeersReportIntervalMs;
 
     Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
-        long blockReportIntervalMs) {
+              long blockReportIntervalMs, long slowPeersReportIntervalMs) {
       this.heartbeatIntervalMs = heartbeatIntervalMs;
       this.lifelineIntervalMs = lifelineIntervalMs;
       this.blockReportIntervalMs = blockReportIntervalMs;
+      this.slowPeersReportIntervalMs = slowPeersReportIntervalMs;
       scheduleNextLifeline(nextHeartbeatTime);
     }
 
@@ -1123,6 +1143,10 @@ class BPServiceActor implements Runnable {
       lastBlockReportTime = blockReportTime;
     }
 
+    void scheduleNextSlowPeerReport() {
+      nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs;
+    }
+
     long getLastHearbeatTime() {
       return (monotonicNow() - lastHeartbeatTime)/1000;
     }
@@ -1149,6 +1173,10 @@ class BPServiceActor implements Runnable {
       return nextBlockReportTime - curTime <= 0;
     }
 
+    boolean isSlowPeersReportDue(long curTime) {
+      return nextSlowPeersReportTime - curTime <= 0;
+    }
+
     void forceFullBlockReportNow() {
       forceFullBlockReport.set(true);
       resetBlockReportTime = true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 567597d..dd4b58b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -93,7 +94,7 @@ class BlockReceiver implements Closeable {
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
-  private String bracketedMirrorAddr;
+  private String mirrorNameForMetrics;
   private DataOutputStream mirrorOut;
   private Daemon responder = null;
   private DataTransferThrottler throttler;
@@ -843,10 +844,9 @@ class BlockReceiver implements Closeable {
    * </p>
    */
   private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
-    if (isPenultimateNode && mirrorAddr != null) {
-      datanode.getPeerMetrics().addSendPacketDownstream(
-          bracketedMirrorAddr,
-          elapsedMs);
+    final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
+    if (peerMetrics != null && isPenultimateNode) {
+      peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
     }
   }
 
@@ -927,8 +927,13 @@ class BlockReceiver implements Closeable {
     boolean responderClosed = false;
     mirrorOut = mirrOut;
     mirrorAddr = mirrAddr;
-    bracketedMirrorAddr = "[" + mirrAddr + "]";
     isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
+    if (isPenultimateNode) {
+      mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
+          downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
+      LOG.debug("Will collect peer metrics for downstream node {}",
+          mirrorNameForMetrics);
+    }
     throttler = throttlerArg;
 
     this.replyOut = replyOut;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index c1487b1..e2c5fbc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@@ -94,6 +96,8 @@ public class DNConf {
   private final long lifelineIntervalMs;
   final long blockReportInterval;
   final long blockReportSplitThreshold;
+  final boolean peerStatsEnabled;
+  final long slowPeersReportIntervalMs;
   final long ibrInterval;
   final long initialBlockReportDelayMs;
   final long cacheReportInterval;
@@ -173,6 +177,13 @@ public class DNConf {
     this.blockReportInterval = getConf().getLong(
         DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.peerStatsEnabled = getConf().getBoolean(
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+    this.slowPeersReportIntervalMs = getConf().getTimeDuration(
+        DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
+        DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
     this.ibrInterval = getConf().getLong(
         DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
         DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 090d8b9..a6dfa46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -334,6 +334,7 @@ public class DataNode extends ReconfigurableBase
   private int infoSecurePort;
 
   DataNodeMetrics metrics;
+  @Nullable
   private DataNodePeerMetrics peerMetrics;
   private InetSocketAddress streamingAddr;
   
@@ -422,6 +423,7 @@ public class DataNode extends ReconfigurableBase
     this.blockScanner = new BlockScanner(this, this.getConf());
     this.pipelineSupportECN = false;
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.dnConf = new DNConf(this);
     initOOBTimeout();
     storageLocationChecker = null;
     volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@@ -1363,7 +1365,8 @@ public class DataNode extends ReconfigurableBase
     initIpcServer();
 
     metrics = DataNodeMetrics.create(getConf(), getDisplayName());
-    peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
+    peerMetrics = dnConf.peerStatsEnabled ?
+        DataNodePeerMetrics.create(getConf(), getDisplayName()) : null;
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
     ecWorker = new ErasureCodingWorker(getConf(), this);
@@ -3456,6 +3459,7 @@ public class DataNode extends ReconfigurableBase
 
   @Override // DataNodeMXBean
   public String getSendPacketDownstreamAvgInfo() {
-    return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+    return peerMetrics != null ?
+        peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index abcaa4a..f838fd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -341,7 +341,9 @@ class DataXceiver extends Receiver implements Runnable {
    * the thread dies away.
    */
   private void collectThreadLocalStates() {
-    datanode.getPeerMetrics().collectThreadLocalStates();
+    if (datanode.getPeerMetrics() != null) {
+      datanode.getPeerMetrics().collectThreadLocalStates();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
index 9344d1b..5241c78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -18,40 +18,59 @@
 
 package org.apache.hadoop.hdfs.server.datanode.metrics;
 
-import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.metrics2.MetricsJsonBuilder;
 import org.apache.hadoop.metrics2.lib.RollingAverages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
  * various peer operations.
  */
 @InterfaceAudience.Private
+@InterfaceStability.Unstable
 public class DataNodePeerMetrics {
 
-  static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DataNodePeerMetrics.class);
 
   private final RollingAverages sendPacketDownstreamRollingAvgerages;
 
   private final String name;
-  private final boolean peerStatsEnabled;
+
+  /**
+   * Threshold in milliseconds below which a DataNode is definitely not slow.
+   */
+  private static final long LOW_THRESHOLD_MS = 5;
+
+  private final SlowNodeDetector slowNodeDetector;
+
+  /**
+   * Minimum number of packet send samples which are required to qualify
+   * for outlier detection. If the number of samples is below this then
+   * outlier detection is skipped.
+   */
+  @VisibleForTesting
+  static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000;
 
   public DataNodePeerMetrics(
       final String name,
-      final int windowSize,
-      final int numWindows,
-      final boolean peerStatsEnabled) {
+      final long windowSizeMs,
+      final int numWindows) {
     this.name = name;
-    this.peerStatsEnabled = peerStatsEnabled;
+    this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS);
     sendPacketDownstreamRollingAvgerages = new RollingAverages(
-        windowSize,
-        numWindows);
+        windowSizeMs, numWindows);
   }
 
   public String name() {
@@ -66,21 +85,18 @@ public class DataNodePeerMetrics {
         ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
         : dnName.replace(':', '-'));
 
-    final int windowSize = conf.getInt(
-            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
-            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
+    final long windowSizeMs = conf.getTimeDuration(
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
+            DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT,
+            TimeUnit.MILLISECONDS);
     final int numWindows = conf.getInt(
-        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
-        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
-    final boolean peerStatsEnabled = conf.getBoolean(
-        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
-        DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT);
 
     return new DataNodePeerMetrics(
         name,
-        windowSize,
-        numWindows,
-        peerStatsEnabled);
+        windowSizeMs,
+        numWindows);
   }
 
   /**
@@ -94,9 +110,7 @@ public class DataNodePeerMetrics {
   public void addSendPacketDownstream(
       final String peerAddr,
       final long elapsedMs) {
-    if (peerStatsEnabled) {
-      sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
-    }
+    sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
   }
 
   /**
@@ -114,4 +128,19 @@ public class DataNodePeerMetrics {
   public void collectThreadLocalStates() {
     sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
   }
+
+  /**
+   * Retrieve the set of dataNodes that look significantly slower
+   * than their peers.
+   */
+  public Map<String, Double> getOutliers() {
+    // This maps the metric name to the aggregate latency.
+    // The metric name is the datanode ID.
+    final Map<String, Double> stats =
+        sendPacketDownstreamRollingAvgerages.getStats(
+            MIN_OUTLIER_DETECTION_SAMPLES);
+    LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
+
+    return slowNodeDetector.getOutliers(stats);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
new file mode 100644
index 0000000..b6278ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * A utility class to help detect nodes whose aggregate latency
+ * is an outlier within a given set.
+ *
+ * We use the median absolute deviation for outlier detection as
+ * described in the following publication:
+ *
+ * Leys, C., et al., Detecting outliers: Do not use standard deviation
+ * around the mean, use absolute deviation around the median.
+ * http://dx.doi.org/10.1016/j.jesp.2013.03.013
+ *
+ * We augment the above scheme with the following heuristics to be even
+ * more conservative:
+ *
+ *  1. Skip outlier detection if the sample size is too small.
+ *  2. Never flag nodes whose aggregate latency is below a low threshold.
+ *  3. Never flag nodes whose aggregate latency is less than a small
+ *     multiple of the median.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SlowNodeDetector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SlowNodeDetector.class);
+
+  /**
+   * Minimum number of peers to run outlier detection.
+   */
+  private static long minOutlierDetectionPeers = 10;
+
+  /**
+   * The multiplier is from Leys, C. et al.
+   */
+  private static final double MAD_MULTIPLIER = (double) 1.4826;
+
+  /**
+   * Threshold in milliseconds below which a DataNode is definitely not slow.
+   */
+  private final long lowThresholdMs;
+
+  /**
+   * Deviation multiplier. A sample is considered to be an outlier if it
+   * exceeds the median by (multiplier * median abs. deviation). 3 is a
+   * conservative choice.
+   */
+  private static final int DEVIATION_MULTIPLIER = 3;
+
+  /**
+   * If most of the samples are clustered together, the MAD can be
+   * low. The median multiplier introduces another safeguard to avoid
+   * overaggressive outlier detection.
+   */
+  @VisibleForTesting
+  static final int MEDIAN_MULTIPLIER = 3;
+
+  public SlowNodeDetector(long lowThresholdMs) {
+    this.lowThresholdMs = lowThresholdMs;
+  }
+
+  /**
+   * Return a set of DataNodes whose latency is much higher than
+   * their peers. The input is a map of (node -> aggregate latency)
+   * entries.
+   *
+   * The aggregate may be an arithmetic mean or a percentile e.g.
+   * 90th percentile. Percentiles are a better choice than median
+   * since latency is usually not a normal distribution.
+   *
+   * This method allocates temporary memory O(n) and
+   * has run time O(n.log(n)), where n = stats.size().
+   *
+   * @return
+   */
+  public Map<String, Double> getOutliers(Map<String, Double> stats) {
+    if (stats.size() < minOutlierDetectionPeers) {
+      LOG.debug("Skipping statistical outlier detection as we don't have " +
+              "latency data for enough peers. Have {}, need at least {}",
+          stats.size(), minOutlierDetectionPeers);
+      return ImmutableMap.of();
+    }
+    // Compute the median absolute deviation of the aggregates.
+    final List<Double> sorted = new ArrayList<>(stats.values());
+    Collections.sort(sorted);
+    final Double median = computeMedian(sorted);
+    final Double mad = computeMad(sorted);
+    Double upperLimitLatency = Math.max(
+        lowThresholdMs, median * MEDIAN_MULTIPLIER);
+    upperLimitLatency = Math.max(
+        upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
+
+    final Map<String, Double> slowNodes = new HashMap<>();
+
+    LOG.trace("getOutliers: List={}, MedianLatency={}, " +
+        "MedianAbsoluteDeviation={}, upperLimitLatency={}",
+        sorted, median, mad, upperLimitLatency);
+
+    // Find nodes whose latency exceeds the threshold.
+    for (Map.Entry<String, Double> entry : stats.entrySet()) {
+      if (entry.getValue() > upperLimitLatency) {
+        slowNodes.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return slowNodes;
+  }
+
+  /**
+   * Compute the Median Absolute Deviation of a sorted list.
+   */
+  public static Double computeMad(List<Double> sortedValues) {
+    if (sortedValues.size() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot compute the Median Absolute Deviation " +
+              "of an empty list.");
+    }
+
+    // First get the median of the values.
+    Double median = computeMedian(sortedValues);
+    List<Double> deviations = new ArrayList<>(sortedValues);
+
+    // Then update the list to store deviation from the median.
+    for (int i = 0; i < sortedValues.size(); ++i) {
+      deviations.set(i, Math.abs(sortedValues.get(i) - median));
+    }
+
+    // Finally get the median absolute deviation.
+    Collections.sort(deviations);
+    return computeMedian(deviations) * MAD_MULTIPLIER;
+  }
+
+  /**
+   * Compute the median of a sorted list.
+   */
+  public static Double computeMedian(List<Double> sortedValues) {
+    if (sortedValues.size() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot compute the median of an empty list.");
+    }
+
+    Double median = sortedValues.get(sortedValues.size() / 2);
+    if (sortedValues.size() % 2 == 0) {
+      median += sortedValues.get((sortedValues.size() / 2) - 1);
+      median /= 2;
+    }
+    return median;
+  }
+
+  /**
+   * This method *must not* be used outside of unit tests.
+   */
+  @VisibleForTesting
+  static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) {
+    SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers;
+  }
+
+  @VisibleForTesting
+  static long getMinOutlierDetectionPeers() {
+    return minOutlierDetectionPeers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 6ec0ee9..38a326c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -129,6 +129,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.annotation.Nonnull;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
@@ -255,6 +256,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -3639,7 +3641,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       StorageReport[] reports, long cacheCapacity, long cacheUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
-      boolean requestFullBlockReportLease) throws IOException {
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3647,7 +3650,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           - xmitsInProgress;
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
-          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
+          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
+          slowPeers);
       long blockReportLeaseId = 0;
       if (requestFullBlockReportLease) {
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index f6c724b..df5ee0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -1822,6 +1822,12 @@ public class NameNode extends ReconfigurableBase implements
     return getNamesystem().getBytesInFuture();
   }
 
+  @Override
+  public String getSlowPeersReport() {
+    return namesystem.getBlockManager().getDatanodeManager()
+        .getSlowPeersReport();
+  }
+
   /**
    * Shutdown the NN immediately in an ungraceful way. Used when it would be
    * unsafe for the NN to continue operating, e.g. during a failed HA state

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 4a1e8dd..f9cfa42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -205,6 +206,8 @@ import org.slf4j.Logger;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 
+import javax.annotation.Nonnull;
+
 /**
  * This class is responsible for handling all of the RPC calls to the NameNode.
  * It is created, started, and stopped by {@link NameNode}.
@@ -1418,12 +1421,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
       int xmitsInProgress, int xceiverCount,
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
-      boolean requestFullBlockReportLease) throws IOException {
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
-        failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
+        failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
+        slowPeers);
   }
 
   @Override // DatanodeProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
index 7b37372..f46b9ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java
@@ -69,4 +69,10 @@ public interface NameNodeStatusMXBean {
    * @return number of bytes that can be deleted if exited from safe mode.
    */
   long getBytesWithFutureGenerationStamps();
+
+  /**
+   * Retrieves information about slow DataNodes, if the feature is
+   * enabled. The report is in a JSON format.
+   */
+  String getSlowPeersReport();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 8c4359f..d738e79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.KerberosInfo;
 
+import javax.annotation.Nonnull;
+
 /**********************************************************************
  * Protocol that a DFS datanode uses to communicate with the NameNode.
  * It's used to upload current load information and block reports.
@@ -105,6 +107,9 @@ public interface DatanodeProtocol {
    * @param volumeFailureSummary info about volume failures
    * @param requestFullBlockReportLease whether to request a full block
    *                                    report lease.
+   * @param slowPeers Details of peer DataNodes that were detected as being
+   *                  slow to respond to packet writes. Empty report if no
+   *                  slow peers were detected by the DataNode.
    * @throws IOException on error
    */
   @Idempotent
@@ -116,7 +121,8 @@ public interface DatanodeProtocol {
                                        int xceiverCount,
                                        int failedVolumes,
                                        VolumeFailureSummary volumeFailureSummary,
-                                       boolean requestFullBlockReportLease)
+                                       boolean requestFullBlockReportLease,
+                                       @Nonnull SlowPeerReports slowPeers)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 7423b33..3b25a43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -195,6 +195,7 @@ message VolumeFailureSummaryProto {
  * cacheCapacity - total cache capacity available at the datanode
  * cacheUsed - amount of cache used
  * volumeFailureSummary - info about volume failures
+ * slowPeers - info about peer DataNodes that are suspected to be slow.
  */
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -206,6 +207,7 @@ message HeartbeatRequestProto {
   optional uint64 cacheUsed = 7 [default = 0 ];
   optional VolumeFailureSummaryProto volumeFailureSummary = 8;
   optional bool requestFullBlockReportLease = 9 [ default = false ];
+  repeated SlowPeerReportProto slowPeers = 10;
 }
 
 /**
@@ -386,6 +388,24 @@ message CommitBlockSynchronizationResponseProto {
 }
 
 /**
+ * Information about a single slow peer that may be reported by
+ * the DataNode to the NameNode as part of the heartbeat request.
+ * The message includes the peer's DataNodeId and its
+ * aggregate packet latency as observed by the reporting DataNode.
+ * (DataNodeId must be transmitted as a string for protocol compability
+ *  with earlier versions of Hadoop).
+ *
+ * The exact choice of the aggregate is opaque to the NameNode but it
+ * _should_ be chosen consistenly by all DataNodes in the cluster.
+ * Examples of aggregates are 90th percentile (good) and mean (not so
+ * good).
+ */
+message SlowPeerReportProto {
+  optional string dataNodeId = 1;
+  optional double aggregateLatency = 2;
+}
+
+/**
  * Protocol used from datanode to the namenode
  * See the request and response for details of rpc call.
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 3389d84..966cb2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1980,19 +1980,15 @@
 </property>
 
 <property>
-  <name>dfs.metrics.rolling.average.window.size</name>
-  <value>3600</value>
+  <name>dfs.datanode.slow.peers.report.interval</name>
+  <value>30m</value>
   <description>
-    The number of seconds of each window for which sub set of samples are gathered
-    to compute the rolling average, A.K.A. roll over interval.
-  </description>
-</property>
+    This setting controls how frequently DataNodes will report their peer
+    latencies to the NameNode via heartbeats.  This setting supports
+    multiple time unit suffixes as described in dfs.heartbeat.interval.
+    If no suffix is specified then milliseconds is assumed.
 
-<property>
-  <name>dfs.metrics.rolling.average.window.numbers</name>
-  <value>48</value>
-  <description>
-    The number of windows maintained to compute the rolling average.
+    It is ignored if dfs.datanode.peer.stats.enabled is false.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 7f5cf2d..ff08528 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclEntryType;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
@@ -770,6 +772,26 @@ public class TestPBHelper {
     assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed());
   }
 
+  @Test
+  public void testSlowPeerInfoPBHelper() {
+    // Test with a map that has a few slow peer entries.
+    final SlowPeerReports slowPeers = SlowPeerReports.create(
+        ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0));
+    SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo(
+        PBHelper.convertSlowPeerInfo(slowPeers));
+    assertTrue(
+        "Expected map:" + slowPeers + ", got map:" +
+            slowPeersConverted1.getSlowPeers(),
+        slowPeersConverted1.equals(slowPeers));
+
+    // Test with an empty map.
+    SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo(
+        PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT));
+    assertTrue(
+        "Expected empty map:" + ", got map:" + slowPeersConverted2,
+        slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT));
+  }
+
   private void assertBlockECRecoveryInfoEquals(
       BlockECReconstructionInfo blkECRecoveryInfo1,
       BlockECReconstructionInfo blkECRecoveryInfo2) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
index ab607ea..f12f6f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
@@ -42,13 +42,23 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
 /**
  * Test if FSNamesystem handles heartbeat right
  */
 public class TestHeartbeatHandling {
+
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
   /**
    * Test if
    * {@link FSNamesystem#handleHeartbeat}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 070a768..a5c6e0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import com.google.common.base.Supplier;
 import java.util.ArrayList;
 import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
@@ -112,7 +114,7 @@ public class TestNameNodePrunesMissingStorages {
       // Stop the DataNode and send fake heartbeat with missing storage.
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
-          0, null, true);
+          0, null, true, SlowPeerReports.EMPTY_REPORT);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message