hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xg...@apache.org
Subject [36/57] [abbrv] hadoop git commit: HDFS-11194. Maintain aggregated peer performance metrics on NameNode.
Date Thu, 26 Jan 2017 21:12:30 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
new file mode 100644
index 0000000..15eb3a5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.ReportForJson;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link SlowPeerTracker}.
+ */
+public class TestSlowPeerTracker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      TestSlowPeerTracker.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  private Configuration conf;
+  private SlowPeerTracker tracker;
+  private FakeTimer timer;
+  private long reportValidityMs;
+
+  @Before
+  public void setup() {
+    conf = new HdfsConfiguration();
+    timer = new FakeTimer();
+    tracker = new SlowPeerTracker(conf, timer);
+    reportValidityMs = tracker.getReportValidityMs();
+  }
+
+  /**
+   * Edge case, there are no reports to retrieve.
+   */
+  @Test
+  public void testEmptyReports() {
+    assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+    assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty());
+  }
+
+  @Test
+  public void testReportsAreRetrieved() {
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node3", "node1");
+    tracker.addReport("node3", "node2");
+
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
+    assertThat(tracker.getReportsForNode("node2").size(), is(1));
+    assertThat(tracker.getReportsForNode("node3").size(), is(2));
+    assertThat(tracker.getReportsForNode("node1").size(), is(0));
+  }
+
+  /**
+   * Test that when all reports are expired, we get back nothing.
+   */
+  @Test
+  public void testAllReportsAreExpired() {
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node3", "node2");
+    tracker.addReport("node1", "node3");
+
+    // No reports should expire after 1ms.
+    timer.advance(1);
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(3));
+
+    // All reports should expire after REPORT_VALIDITY_MS.
+    timer.advance(reportValidityMs);
+    assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
+    assertTrue(tracker.getReportsForNode("node1").isEmpty());
+    assertTrue(tracker.getReportsForNode("node2").isEmpty());
+    assertTrue(tracker.getReportsForNode("node3").isEmpty());
+  }
+
+  /**
+   * Test the case when a subset of reports has expired.
+   * Ensure that we only get back non-expired reports.
+   */
+  @Test
+  public void testSomeReportsAreExpired() {
+    tracker.addReport("node3", "node1");
+    tracker.addReport("node3", "node2");
+    timer.advance(reportValidityMs);
+    tracker.addReport("node3", "node4");
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+    assertThat(tracker.getReportsForNode("node3").size(), is(1));
+    assertTrue(tracker.getReportsForNode("node3").contains("node4"));
+  }
+
+  /**
+   * Test the case when an expired report is replaced by a valid one.
+   */
+  @Test
+  public void testReplacement() {
+    tracker.addReport("node2", "node1");
+    timer.advance(reportValidityMs); // Expire the report.
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
+
+    // This should replace the expired report with a newer valid one.
+    tracker.addReport("node2", "node1");
+    assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
+    assertThat(tracker.getReportsForNode("node2").size(), is(1));
+  }
+
+  @Test
+  public void testGetJson() throws IOException {
+    tracker.addReport("node1", "node2");
+    tracker.addReport("node2", "node3");
+    tracker.addReport("node2", "node1");
+    tracker.addReport("node4", "node1");
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // And ensure its contents are what we expect.
+    assertThat(reports.size(), is(3));
+    assertTrue(isNodeInReports(reports, "node1"));
+    assertTrue(isNodeInReports(reports, "node2"));
+    assertTrue(isNodeInReports(reports, "node4"));
+
+    assertFalse(isNodeInReports(reports, "node3"));
+  }
+
+  @Test
+  public void testGetJsonSizeIsLimited() throws IOException {
+    tracker.addReport("node1", "node2");
+    tracker.addReport("node1", "node3");
+    tracker.addReport("node2", "node3");
+    tracker.addReport("node2", "node4");
+    tracker.addReport("node3", "node4");
+    tracker.addReport("node3", "node5");
+    tracker.addReport("node4", "node6");
+    tracker.addReport("node5", "node6");
+    tracker.addReport("node5", "node7");
+    tracker.addReport("node6", "node7");
+    tracker.addReport("node6", "node8");
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // Ensure that node4 is not in the list since it was
+    // tagged by just one peer and we already have 5 other nodes.
+    assertFalse(isNodeInReports(reports, "node4"));
+
+    // Remaining nodes should be in the list.
+    assertTrue(isNodeInReports(reports, "node1"));
+    assertTrue(isNodeInReports(reports, "node2"));
+    assertTrue(isNodeInReports(reports, "node3"));
+    assertTrue(isNodeInReports(reports, "node5"));
+    assertTrue(isNodeInReports(reports, "node6"));
+  }
+
+  @Test
+  public void testLowRankedElementsIgnored() throws IOException {
+    // Insert 5 nodes with 2 peer reports each.
+    for (int i = 0; i < 5; ++i) {
+      tracker.addReport("node" + i, "reporter1");
+      tracker.addReport("node" + i, "reporter2");
+    }
+
+    // Insert 10 nodes with 1 peer report each.
+    for (int i = 10; i < 20; ++i) {
+      tracker.addReport("node" + i, "reporter1");
+    }
+
+    final Set<ReportForJson> reports = getAndDeserializeJson();
+
+    // Ensure that only the first 5 nodes with two reports each were
+    // included in the JSON.
+    for (int i = 0; i < 5; ++i) {
+      assertTrue(isNodeInReports(reports, "node" + i));
+    }
+  }
+
+  private boolean isNodeInReports(
+      Set<ReportForJson> reports, String node) {
+    for (ReportForJson report : reports) {
+      if (report.getSlowNode().equalsIgnoreCase(node)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private Set<ReportForJson> getAndDeserializeJson()
+      throws IOException {
+    final String json = tracker.getJson();
+    LOG.info("Got JSON: {}", json);
+    return (new ObjectMapper()).readValue(
+        json, new TypeReference<Set<ReportForJson>>() {});
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index de856e6..cf43fd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.junit.Assert;
@@ -136,7 +137,8 @@ public class InternalDataNodeTestUtils {
             Mockito.any(StorageReport[].class), Mockito.anyLong(),
             Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
-            Mockito.anyBoolean())).thenReturn(
+            Mockito.anyBoolean(),
+            Mockito.any(SlowPeerReports.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index b7b8966..c6b38ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -119,7 +120,7 @@ public class TestBPOfferService {
     Mockito.doReturn(conf).when(mockDn).getConf();
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
-    .when(mockDn).getMetrics();
+        .when(mockDn).getMetrics();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -152,7 +153,8 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.any(VolumeFailureSummary.class),
-          Mockito.anyBoolean());
+          Mockito.anyBoolean(),
+          Mockito.any(SlowPeerReports.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 619eda0..b64f1e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -217,7 +218,8 @@ public class TestBlockRecovery {
             Mockito.anyInt(),
             Mockito.anyInt(),
             Mockito.any(VolumeFailureSummary.class),
-            Mockito.anyBoolean()))
+            Mockito.anyBoolean(),
+            Mockito.any(SlowPeerReports.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index 76885e4..6435d4d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
 import static java.lang.Math.abs;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
@@ -31,11 +36,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
 
 
 /**
@@ -51,6 +51,7 @@ public class TestBpServiceActorScheduler {
   private static final long HEARTBEAT_INTERVAL_MS = 5000;      // 5 seconds
   private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
   private static final long BLOCK_REPORT_INTERVAL_MS = 10000;  // 10 seconds
+  private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000;  // 10 seconds
   private final Random random = new Random(System.nanoTime());
 
   @Test
@@ -180,13 +181,28 @@ public class TestBpServiceActorScheduler {
     }
   }
 
+  @Test
+  public void testSlowPeerReportScheduling() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      assertTrue(scheduler.isSlowPeersReportDue(now));
+      scheduler.scheduleNextSlowPeerReport();
+      assertFalse(scheduler.isSlowPeersReportDue(now));
+      assertFalse(scheduler.isSlowPeersReportDue(now + 1));
+      assertTrue(scheduler.isSlowPeersReportDue(
+          now + SLOW_PEER_REPORT_INTERVAL_MS));
+    }
+  }
+
   private Scheduler makeMockScheduler(long now) {
     LOG.info("Using now = " + now);
-    Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
-        LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+    Scheduler mockScheduler = spy(new Scheduler(
+        HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
+        BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS));
     doReturn(now).when(mockScheduler).monotonicNow();
     mockScheduler.nextBlockReportTime = now;
     mockScheduler.nextHeartbeatTime = now;
+    mockScheduler.nextSlowPeersReportTime = now;
     return mockScheduler;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index df2fe5a..8a9f0b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -167,7 +168,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             anyInt(),
             any(VolumeFailureSummary.class),
-            anyBoolean());
+            anyBoolean(),
+            any(SlowPeerReports.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -230,7 +232,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             anyInt(),
             any(VolumeFailureSummary.class),
-            anyBoolean());
+            anyBoolean(),
+            any(SlowPeerReports.class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
index 5af54a4..b18ff2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -41,9 +42,10 @@ public class TestDataNodePeerMetrics {
     final int numOpsPerIteration = 1000;
 
     final Configuration conf = new HdfsConfiguration();
-    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
-        windowSize);
-    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY,
+        windowSize, TimeUnit.SECONDS);
+    conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY,
         numWindows);
     conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index d447a76..c94f74e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -218,7 +219,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyInt(),
            Mockito.anyInt(),
            Mockito.any(VolumeFailureSummary.class),
-           Mockito.anyBoolean());
+           Mockito.anyBoolean(),
+           Mockito.any(SlowPeerReports.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 6557055..eb015c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
@@ -172,7 +173,7 @@ public class TestFsDatasetCache {
         (DatanodeRegistration) any(),
         (StorageReport[]) any(), anyLong(), anyLong(),
         anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
-        anyBoolean());
+        anyBoolean(), any(SlowPeerReports.class));
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index d8418d4..2b793e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.junit.After;
@@ -106,7 +107,8 @@ public class TestStorageReport {
         any(DatanodeRegistration.class),
         captor.capture(),
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
-        Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
+        Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
+        Mockito.any(SlowPeerReports.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
new file mode 100644
index 0000000..34e15e5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test that the {@link DataNodePeerMetrics} class is able to detect
+ * outliers i.e. slow nodes via the metrics it maintains.
+ */
+public class TestDataNodeOutlierDetectionViaMetrics {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestDataNodeOutlierDetectionViaMetrics.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  // A few constants to keep the test run time short.
+  private static final int WINDOW_INTERVAL_SECONDS = 3;
+  private static final int ROLLING_AVERAGE_WINDOWS = 10;
+  private static final int SLOW_NODE_LATENCY_MS = 20_000;
+  private static final int FAST_NODE_MAX_LATENCY_MS = 5;
+
+  private Random random = new Random(System.currentTimeMillis());
+
+  @Before
+  public void setup() {
+    GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+  }
+
+  /**
+   * Test that a very slow peer is detected as an outlier.
+   */
+  @Test
+  public void testOutlierIsDetected() throws Exception {
+    final String slowNodeName = "SlowNode";
+
+    DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+        ROLLING_AVERAGE_WINDOWS);
+
+    injectFastNodesSamples(peerMetrics);
+    injectSlowNodeSamples(peerMetrics, slowNodeName);
+
+    // Trigger a snapshot.
+    peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+    final Map<String, Double> outliers = peerMetrics.getOutliers();
+    LOG.info("Got back outlier nodes: {}", outliers);
+    assertThat(outliers.size(), is(1));
+    assertTrue(outliers.containsKey(slowNodeName));
+  }
+
+  /**
+   * Test that when there are no outliers, we get back nothing.
+   */
+  @Test
+  public void testWithNoOutliers() throws Exception {
+    DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics(
+        "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS,
+        ROLLING_AVERAGE_WINDOWS);
+
+    injectFastNodesSamples(peerMetrics);
+
+    // Trigger a snapshot.
+    peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
+
+    // Ensure that we get back the outlier.
+    assertTrue(peerMetrics.getOutliers().isEmpty());
+  }
+
+  /**
+   * Inject fake stats for MIN_OUTLIER_DETECTION_PEERS fast nodes.
+   *
+   * @param peerMetrics
+   */
+  public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
+    for (int nodeIndex = 0;
+         nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers();
+         ++nodeIndex) {
+      final String nodeName = "FastNode-" + nodeIndex;
+      LOG.info("Generating stats for node {}", nodeName);
+      for (int i = 0;
+           i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+           ++i) {
+        peerMetrics.addSendPacketDownstream(
+            nodeName, random.nextInt(FAST_NODE_MAX_LATENCY_MS));
+      }
+    }
+  }
+
+  /**
+   * Inject fake stats for one extremely slow node.
+   */
+  public void injectSlowNodeSamples(
+      DataNodePeerMetrics peerMetrics, String slowNodeName)
+      throws InterruptedException {
+
+    // And the one slow node.
+    for (int i = 0;
+         i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES;
+         ++i) {
+      peerMetrics.addSendPacketDownstream(
+          slowNodeName, SLOW_NODE_LATENCY_MS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
new file mode 100644
index 0000000..7b368c4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link SlowNodeDetector}.
+ */
+public class TestSlowNodeDetector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestSlowNodeDetector.class);
+
+  /**
+   * Set a timeout for every test case.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300_000);
+
+  private final static double LOW_THRESHOLD = 1000;
+  private final static long MIN_OUTLIER_DETECTION_PEERS = 3;
+
+  // Randomly generated test cases for median and MAD. The first entry
+  // in each pair is the expected median and the second entry is the
+  // expected Median Absolute Deviation. The small sets of size 1 and 2
+  // exist to test the edge cases however in practice the MAD of a very
+  // small set is not useful.
+  private Map<List<Double>, Pair<Double, Double>> medianTestMatrix =
+      new ImmutableMap.Builder<List<Double>, Pair<Double, Double>>()
+          // Single element.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(9.6502431302).build(),
+              Pair.of(9.6502431302, 0.0))
+
+          // Two elements.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(1.72168104625)
+                  .add(11.7872544459).build(),
+              Pair.of(6.75446774606, 7.4616095611))
+
+          // The Remaining lists were randomly generated with sizes 3-10.
+          .put(new ImmutableList.Builder<Double>()
+                  .add(76.2635686249)
+                  .add(27.0652018553)
+                  .add(1.3868476443)
+                  .add(49.7194624164)
+                  .add(47.385680883)
+                  .add(57.8721199173).build(),
+              Pair.of(48.5525716497, 22.837202532))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(86.0573389581)
+                  .add(93.2399572424)
+                  .add(64.9545429122)
+                  .add(35.8509730085)
+                  .add(1.6534313654).build(),
+              Pair.of(64.9545429122, 41.9360180373))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(5.00127007366)
+                  .add(37.9790589127)
+                  .add(67.5784746266).build(),
+              Pair.of(37.9790589127, 43.8841594039))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(1.43442932944)
+                  .add(70.6769829947)
+                  .add(37.47579656)
+                  .add(51.1126141394)
+                  .add(72.2465914419)
+                  .add(32.2930549225)
+                  .add(39.677459781).build(),
+              Pair.of(39.677459781, 16.9537852208))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(26.7913745214)
+                  .add(68.9833706658)
+                  .add(29.3882180746)
+                  .add(68.3455244453)
+                  .add(74.9277265022)
+                  .add(12.1469972942)
+                  .add(72.5395402683)
+                  .add(7.87917492506)
+                  .add(33.3253447774)
+                  .add(72.2753759125).build(),
+              Pair.of(50.8354346113, 31.9881230079))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(38.6482290705)
+                  .add(88.0690746319)
+                  .add(50.6673611649)
+                  .add(64.5329814115)
+                  .add(25.2580979294)
+                  .add(59.6709630711)
+                  .add(71.5406993741)
+                  .add(81.3073035091)
+                  .add(20.5549547284).build(),
+              Pair.of(59.6709630711, 31.1683520683))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(87.352734249)
+                  .add(65.4760359094)
+                  .add(28.9206803169)
+                  .add(36.5908574008)
+                  .add(87.7407653175)
+                  .add(99.3704511335)
+                  .add(41.3227434076)
+                  .add(46.2713494909)
+                  .add(3.49940920921).build(),
+              Pair.of(46.2713494909, 28.4729106898))
+
+          .put(new ImmutableList.Builder<Double>()
+                  .add(95.3251533286)
+                  .add(27.2777870437)
+                  .add(43.73477168).build(),
+              Pair.of(43.73477168, 24.3991619317))
+
+          .build();
+
+  // A test matrix that maps inputs to the expected output list of
+  // slow nodes i.e. outliers.
+  private Map<Map<String, Double>, Set<String>> outlierTestMatrix =
+      new ImmutableMap.Builder<Map<String, Double>, Set<String>>()
+          // The number of samples is too low and all samples are below
+          // the low threshold. Nothing should be returned.
+          .put(ImmutableMap.of(
+              "n1", 0.0,
+              "n2", LOW_THRESHOLD + 1),
+              ImmutableSet.of())
+
+          // A statistical outlier below the low threshold must not be
+          // returned.
+          .put(ImmutableMap.of(
+              "n1", 1.0,
+              "n2", 1.0,
+              "n3", LOW_THRESHOLD - 1),
+              ImmutableSet.of())
+
+          // A statistical outlier above the low threshold must be returned.
+          .put(ImmutableMap.of(
+              "n1", 1.0,
+              "n2", 1.0,
+              "n3", LOW_THRESHOLD + 1),
+              ImmutableSet.of("n3"))
+
+          // A statistical outlier must not be returned if it is within a
+          // MEDIAN_MULTIPLIER multiple of the median.
+          .put(ImmutableMap.of(
+              "n1", LOW_THRESHOLD + 0.1,
+              "n2", LOW_THRESHOLD + 0.1,
+              "n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1),
+              ImmutableSet.of())
+
+          // A statistical outlier must be returned if it is outside a
+          // MEDIAN_MULTIPLIER multiple of the median.
+          .put(ImmutableMap.of(
+              "n1", LOW_THRESHOLD + 0.1,
+              "n2", LOW_THRESHOLD + 0.1,
+              "n3", (LOW_THRESHOLD + 0.1) *
+                  SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1),
+              ImmutableSet.of("n3"))
+
+          // Only the statistical outliers n3 and n11 should be returned.
+          .put(new ImmutableMap.Builder<String, Double>()
+                  .put("n1", 1029.4322)
+                  .put("n2", 2647.876)
+                  .put("n3", 9194.312)
+                  .put("n4", 2.2)
+                  .put("n5", 2012.92)
+                  .put("n6", 1843.81)
+                  .put("n7", 1201.43)
+                  .put("n8", 6712.01)
+                  .put("n9", 3278.554)
+                  .put("n10", 2091.765)
+                  .put("n11", 9194.77).build(),
+              ImmutableSet.of("n3", "n11"))
+
+          // The following input set has multiple outliers.
+          //   - The low outliers (n4, n6) should not be returned.
+          //   - High outlier n2 is within 3 multiples of the median
+          //     and so it should not be returned.
+          //   - Only the high outlier n8 should be returned.
+          .put(new ImmutableMap.Builder<String, Double>()
+                  .put("n1", 5002.0)
+                  .put("n2", 9001.0)
+                  .put("n3", 5004.0)
+                  .put("n4", 1001.0)
+                  .put("n5", 5003.0)
+                  .put("n6", 2001.0)
+                  .put("n7", 5000.0)
+                  .put("n8", 101002.0)
+                  .put("n9", 5001.0)
+                  .put("n10", 5002.0)
+                  .put("n11", 5105.0)
+                  .put("n12", 5006.0).build(),
+              ImmutableSet.of("n8"))
+
+          .build();
+
+
+  private SlowNodeDetector slowNodeDetector;
+
+  @Before
+  public void setup() {
+    slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD);
+    SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS);
+    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+  }
+
+  @Test
+  public void testOutliersFromTestMatrix() {
+    for (Map.Entry<Map<String, Double>, Set<String>> entry :
+        outlierTestMatrix.entrySet()) {
+
+      LOG.info("Verifying set {}", entry.getKey());
+      final Set<String> outliers =
+          slowNodeDetector.getOutliers(entry.getKey()).keySet();
+      assertTrue(
+          "Running outlier detection on " + entry.getKey() +
+              " was expected to yield set " + entry.getValue() + ", but " +
+              " we got set " + outliers,
+          outliers.equals(entry.getValue()));
+    }
+  }
+
+  /**
+   * Unit test for {@link SlowNodeDetector#computeMedian(List)}.
+   */
+  @Test
+  public void testMediansFromTestMatrix() {
+    for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
+        medianTestMatrix.entrySet()) {
+      final List<Double> inputList = new ArrayList<>(entry.getKey());
+      Collections.sort(inputList);
+      final Double median = SlowNodeDetector.computeMedian(inputList);
+      final Double expectedMedian = entry.getValue().getLeft();
+
+      // Ensure that the median is within 0.001% of expected.
+      // We need some fudge factor for floating point comparison.
+      final Double errorPercent =
+          Math.abs(median - expectedMedian) * 100.0 / expectedMedian;
+
+      assertTrue(
+          "Set " + inputList + "; Expected median: " +
+              expectedMedian + ", got: " + median,
+          errorPercent < 0.001);
+    }
+  }
+
+  /**
+   * Unit test for {@link SlowNodeDetector#computeMad(List)}.
+   */
+  @Test
+  public void testMadsFromTestMatrix() {
+    for (Map.Entry<List<Double>, Pair<Double, Double>> entry :
+        medianTestMatrix.entrySet()) {
+      final List<Double> inputList = new ArrayList<>(entry.getKey());
+      Collections.sort(inputList);
+      final Double mad = SlowNodeDetector.computeMad(inputList);
+      final Double expectedMad = entry.getValue().getRight();
+
+      // Ensure that the MAD is within 0.001% of expected.
+      // We need some fudge factor for floating point comparison.
+      if (entry.getKey().size() > 1) {
+        final Double errorPercent =
+            Math.abs(mad - expectedMad) * 100.0 / expectedMad;
+
+        assertTrue(
+            "Set " + entry.getKey() + "; Expected M.A.D.: " +
+                expectedMad + ", got: " + mad,
+            errorPercent < 0.001);
+      } else {
+        // For an input list of size 1, the MAD should be 0.0.
+        final Double epsilon = 0.000001; // Allow for some FP math error.
+        assertTrue(
+            "Set " + entry.getKey() + "; Expected M.A.D.: " +
+                expectedMad + ", got: " + mad,
+            mad < epsilon);
+      }
+    }
+  }
+
+  /**
+   * Verify that {@link SlowNodeDetector#computeMedian(List)} throws when
+   * passed an empty list.
+   */
+  @Test(expected=IllegalArgumentException.class)
+  public void testMedianOfEmptyList() {
+    SlowNodeDetector.computeMedian(Collections.emptyList());
+  }
+
+  /**
+   * Verify that {@link SlowNodeDetector#computeMad(List)} throws when
+   * passed an empty list.
+   */
+  @Test(expected=IllegalArgumentException.class)
+  public void testMadOfEmptyList() {
+    SlowNodeDetector.computeMedian(Collections.emptyList());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index a3d0be5..b86b3fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -951,7 +952,8 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
-          0L, 0L, 0, 0, 0, null, true).getCommands();
+          0L, 0L, 0, 0, 0, null, true,
+          SlowPeerReports.EMPTY_REPORT).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1000,7 +1002,8 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
-          rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
+          rep, 0L, 0L, 0, 0, 0, null, true,
+          SlowPeerReports.EMPTY_REPORT).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index ed6c92a..2b8faf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
@@ -122,7 +123,8 @@ public class NameNodeAdapter {
       DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
-        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true);
+        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
+        SlowPeerReports.EMPTY_REPORT);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 5c2d291..b9161c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -132,7 +133,8 @@ public class TestDeadDatanode {
         new DatanodeStorage(reg.getDatanodeUuid()),
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
-        dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
+        dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
+            SlowPeerReports.EMPTY_REPORT).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b57368b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index 6b0dced..cdce342 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -66,6 +66,10 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
     // Purposely hidden, based on comments in DFSConfigKeys
     configurationPropsToSkipCompare
         .add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY);
+    configurationPropsToSkipCompare
+        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY);
+    configurationPropsToSkipCompare
+        .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY);
 
     // Fully deprecated properties?
     configurationPropsToSkipCompare


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message