hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject hadoop git commit: HDFS-11461. DataNode Disk Outlier Detection. Contributed by Hanisha Koneru.
Date Thu, 09 Mar 2017 21:00:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 60be2e5d8 -> b6c477691


HDFS-11461. DataNode Disk Outlier Detection. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b6c47769
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b6c47769
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b6c47769

Branch: refs/heads/branch-2
Commit: b6c4776911c264b1e68e85100487e4456705e775
Parents: 60be2e5
Author: Arpit Agarwal <arp@apache.org>
Authored: Thu Mar 9 12:59:48 2017 -0800
Committer: Arpit Agarwal <arp@apache.org>
Committed: Thu Mar 9 12:59:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +-
 .../server/blockmanagement/SlowPeerTracker.java |   4 +-
 .../hdfs/server/datanode/BPServiceActor.java    |   2 +-
 .../hadoop/hdfs/server/datanode/DNConf.java     |  16 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  10 +
 .../datanode/metrics/DataNodeDiskMetrics.java   | 181 +++++++++++++++++
 .../datanode/metrics/DataNodePeerMetrics.java   |   6 +-
 .../datanode/metrics/OutlierDetector.java       | 182 +++++++++++++++++
 .../datanode/metrics/SlowNodeDetector.java      | 194 -------------------
 .../src/main/resources/hdfs-default.xml         |   2 +-
 .../TestDataNodeOutlierDetectionViaMetrics.java |   6 +-
 .../datanode/metrics/TestSlowNodeDetector.java  |  30 +--
 12 files changed, 411 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 853306b..e9e6bcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -633,9 +633,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
   public static final int     DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
 
-  public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY =
+  public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY =
       "dfs.datanode.slow.peers.report.interval";
-  public static final int DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT =
+  public static final int DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT =
       1800 * 1000;
 
   // property for fsimage compression

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
index a1ffd20..c8a6348 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java
@@ -94,8 +94,8 @@ public class SlowPeerTracker {
     this.timer = timer;
     this.allReports = new ConcurrentHashMap<>();
     this.reportValidityMs = conf.getTimeDuration(
-        DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
-        DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+        DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS) * 3;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index c605588..adbf025 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -129,7 +129,7 @@ class BPServiceActor implements Runnable {
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
     scheduler = new Scheduler(dnConf.heartBeatInterval,
         dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
-        dnConf.slowPeersReportIntervalMs);
+        dnConf.outliersReportIntervalMs);
     // get the value of maxDataLength.
     this.maxDataLength = dnConf.getMaxDataLength();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index a90fecf..c98793c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -30,8 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
@@ -94,7 +94,8 @@ public class DNConf {
   final long blockReportInterval;
   final long blockReportSplitThreshold;
   final boolean peerStatsEnabled;
-  final long slowPeersReportIntervalMs;
+  final boolean diskStatsEnabled;
+  final long outliersReportIntervalMs;
   final long ibrInterval;
   final long initialBlockReportDelayMs;
   final long cacheReportInterval;
@@ -170,9 +171,12 @@ public class DNConf {
     this.peerStatsEnabled = getConf().getBoolean(
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
         DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
-    this.slowPeersReportIntervalMs = getConf().getTimeDuration(
-        DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY,
-        DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT,
+    this.diskStatsEnabled = getConf().getBoolean(
+        DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY,
+        DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_DEFAULT);
+    this.outliersReportIntervalMs = getConf().getTimeDuration(
+        DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
+        DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
     this.ibrInterval = getConf().getLong(
         DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index d9a3bcc..c783956 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -165,6 +165,7 @@ import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
@@ -336,6 +337,7 @@ public class DataNode extends ReconfigurableBase
   DataNodeMetrics metrics;
   @Nullable
   private DataNodePeerMetrics peerMetrics;
+  private DataNodeDiskMetrics diskMetrics;
   private InetSocketAddress streamingAddr;
 
   // See the note below in incrDatanodeNetworkErrors re: concurrency.
@@ -1356,6 +1358,11 @@ public class DataNode extends ReconfigurableBase
         dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
     startMetricsLogger();
+
+    if (dnConf.diskStatsEnabled) {
+      diskMetrics = new DataNodeDiskMetrics(this,
+          dnConf.outliersReportIntervalMs);
+    }
   }
 
   /**
@@ -2024,6 +2031,9 @@ public class DataNode extends ReconfigurableBase
     if (metrics != null) {
       metrics.shutdown();
     }
+    if (diskMetrics != null) {
+      diskMetrics.shutdownAndWait();
+    }
     if (dataNodeInfoBeanName != null) {
       MBeans.unregister(dataNodeInfoBeanName);
       dataNodeInfoBeanName = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
new file mode 100644
index 0000000..32791b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class detects and maintains DataNode disk outliers and their
+ * latencies for different ops (metadata, read, write).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DataNodeDiskMetrics {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DataNodeDiskMetrics.class);
+
+  private DataNode dn;
+  private final long MIN_OUTLIER_DETECTION_DISKS = 5;
+  private final long SLOW_DISK_LOW_THRESHOLD_MS = 20;
+  private final long detectionInterval;
+  private volatile boolean shouldRun;
+  private OutlierDetector slowDiskDetector;
+  private Daemon slowDiskDetectionDaemon;
+  private volatile Map<String, Map<DiskOutlierDetectionOp, Double>> diskOutliersStats;
+
+  public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
+    this.dn = dn;
+    this.detectionInterval = diskOutlierDetectionIntervalMs;
+    slowDiskDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_DISKS,
+        SLOW_DISK_LOW_THRESHOLD_MS);
+    shouldRun = true;
+    startDiskOutlierDetectionThread();
+  }
+
+  private void startDiskOutlierDetectionThread() {
+    slowDiskDetectionDaemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        while (shouldRun) {
+          Map<String, Double> metadataOpStats = Maps.newHashMap();
+          Map<String, Double> readIoStats = Maps.newHashMap();
+          Map<String, Double> writeIoStats = Maps.newHashMap();
+          FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null;
+          try {
+            fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences();
+            Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences
+                .iterator();
+            while (volumeIterator.hasNext()) {
+              FsVolumeSpi volume = volumeIterator.next();
+              DataNodeVolumeMetrics metrics = volumeIterator.next().getMetrics();
+              String volumeName = volume.getBasePath();
+
+              metadataOpStats.put(volumeName,
+                  metrics.getMetadataOperationMean());
+              readIoStats.put(volumeName, metrics.getReadIoMean());
+              writeIoStats.put(volumeName, metrics.getWriteIoMean());
+            }
+          } finally {
+            if (fsVolumeReferences != null) {
+              try {
+                fsVolumeReferences.close();
+              } catch (IOException e) {
+                LOG.error("Error in releasing FS Volume references", e);
+              }
+            }
+          }
+          if (metadataOpStats.isEmpty() && readIoStats.isEmpty() &&
+              writeIoStats.isEmpty()) {
+            LOG.debug("No disk stats available for detecting outliers.");
+            return;
+          }
+
+          detectAndUpdateDiskOutliers(metadataOpStats, readIoStats,
+              writeIoStats);
+
+          try {
+            Thread.sleep(detectionInterval);
+          } catch (InterruptedException e) {
+            LOG.error("Disk Outlier Detection thread interrupted", e);
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    });
+    slowDiskDetectionDaemon.start();
+  }
+
+  private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
+      Map<String, Double> readIoStats, Map<String, Double> writeIoStats) {
+    Set<String> diskOutliersSet = Sets.newHashSet();
+
+    // Get MetadataOp Outliers
+    Map<String, Double> metadataOpOutliers = slowDiskDetector
+        .getOutliers(metadataOpStats);
+    if (!metadataOpOutliers.isEmpty()) {
+      diskOutliersSet.addAll(metadataOpOutliers.keySet());
+    }
+
+    // Get ReadIo Outliers
+    Map<String, Double> readIoOutliers = slowDiskDetector
+        .getOutliers(readIoStats);
+    if (!readIoOutliers.isEmpty()) {
+      diskOutliersSet.addAll(readIoOutliers.keySet());
+    }
+
+    // Get WriteIo Outliers
+    Map<String, Double> writeIoOutliers = slowDiskDetector
+        .getOutliers(writeIoStats);
+    if (!readIoOutliers.isEmpty()) {
+      diskOutliersSet.addAll(writeIoOutliers.keySet());
+    }
+
+    Map<String, Map<DiskOutlierDetectionOp, Double>> diskStats =
+        Maps.newHashMap();
+    for (String disk : diskOutliersSet) {
+      Map<DiskOutlierDetectionOp, Double> diskStat = Maps.newHashMap();
+      diskStat.put(DiskOutlierDetectionOp.METADATA, metadataOpStats.get(disk));
+      diskStat.put(DiskOutlierDetectionOp.READ, readIoStats.get(disk));
+      diskStat.put(DiskOutlierDetectionOp.WRITE, writeIoStats.get(disk));
+      diskStats.put(disk, diskStat);
+    }
+
+    diskOutliersStats = diskStats;
+    LOG.debug("Updated disk outliers.");
+  }
+
+  /**
+   * Lists the types of operations on which disk latencies are measured.
+   */
+  public enum DiskOutlierDetectionOp {
+    METADATA,
+    READ,
+    WRITE
+  }
+
+  public Map<String,
+      Map<DiskOutlierDetectionOp, Double>> getDiskOutliersStats() {
+    return diskOutliersStats;
+  }
+
+  public void shutdownAndWait() {
+    shouldRun = false;
+    slowDiskDetectionDaemon.interrupt();
+    try {
+      slowDiskDetectionDaemon.join();
+    } catch (InterruptedException e) {
+      LOG.error("Disk Outlier Detection daemon did not shutdown", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
index 5241c78..827bdd2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -52,8 +52,9 @@ public class DataNodePeerMetrics {
    * Threshold in milliseconds below which a DataNode is definitely not slow.
    */
   private static final long LOW_THRESHOLD_MS = 5;
+  private static final long MIN_OUTLIER_DETECTION_NODES = 10;
 
-  private final SlowNodeDetector slowNodeDetector;
+  private final OutlierDetector slowNodeDetector;
 
   /**
    * Minimum number of packet send samples which are required to qualify
@@ -68,7 +69,8 @@ public class DataNodePeerMetrics {
       final long windowSizeMs,
       final int numWindows) {
     this.name = name;
-    this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS);
+    this.slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_NODES,
+        LOW_THRESHOLD_MS);
     sendPacketDownstreamRollingAvgerages = new RollingAverages(
         windowSizeMs, numWindows);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
new file mode 100644
index 0000000..771a17b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * A utility class to help detect resources (nodes/ disks) whose aggregate
+ * latency is an outlier within a given set.
+ *
+ * We use the median absolute deviation for outlier detection as
+ * described in the following publication:
+ *
+ * Leys, C., et al., Detecting outliers: Do not use standard deviation
+ * around the mean, use absolute deviation around the median.
+ * http://dx.doi.org/10.1016/j.jesp.2013.03.013
+ *
+ * We augment the above scheme with the following heuristics to be even
+ * more conservative:
+ *
+ *  1. Skip outlier detection if the sample size is too small.
+ *  2. Never flag resources whose aggregate latency is below a low threshold.
+ *  3. Never flag resources whose aggregate latency is less than a small
+ *     multiple of the median.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class OutlierDetector {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OutlierDetector.class);
+
+  /**
+   * Minimum number of resources to run outlier detection.
+   */
+  private final long minNumResources;
+
+  /**
+   * The multiplier is from Leys, C. et al.
+   */
+  private static final double MAD_MULTIPLIER = (double) 1.4826;
+
+  /**
+   * Threshold in milliseconds below which a node/ disk is definitely not slow.
+   */
+  private final long lowThresholdMs;
+
+  /**
+   * Deviation multiplier. A sample is considered to be an outlier if it
+   * exceeds the median by (multiplier * median abs. deviation). 3 is a
+   * conservative choice.
+   */
+  private static final int DEVIATION_MULTIPLIER = 3;
+
+  /**
+   * If most of the samples are clustered together, the MAD can be
+   * low. The median multiplier introduces another safeguard to avoid
+   * overaggressive outlier detection.
+   */
+  @VisibleForTesting
+  static final int MEDIAN_MULTIPLIER = 3;
+
+  public OutlierDetector(long minNumResources, long lowThresholdMs) {
+    this.minNumResources = minNumResources;
+    this.lowThresholdMs = lowThresholdMs;
+  }
+
+  /**
+   * Return a set of nodes/ disks whose latency is much higher than
+   * their counterparts. The input is a map of (resource -> aggregate latency)
+   * entries.
+   *
+   * The aggregate may be an arithmetic mean or a percentile e.g.
+   * 90th percentile. Percentiles are a better choice than median
+   * since latency is usually not a normal distribution.
+   *
+   * This method allocates temporary memory O(n) and
+   * has run time O(n.log(n)), where n = stats.size().
+   *
+   * @return
+   */
+  public Map<String, Double> getOutliers(Map<String, Double> stats) {
+    if (stats.size() < minNumResources) {
+      LOG.debug("Skipping statistical outlier detection as we don't have " +
+              "latency data for enough resources. Have {}, need at least {}",
+          stats.size(), minNumResources);
+      return ImmutableMap.of();
+    }
+    // Compute the median absolute deviation of the aggregates.
+    final List<Double> sorted = new ArrayList<>(stats.values());
+    Collections.sort(sorted);
+    final Double median = computeMedian(sorted);
+    final Double mad = computeMad(sorted);
+    Double upperLimitLatency = Math.max(
+        lowThresholdMs, median * MEDIAN_MULTIPLIER);
+    upperLimitLatency = Math.max(
+        upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
+
+    final Map<String, Double> slowResources = new HashMap<>();
+
+    LOG.trace("getOutliers: List={}, MedianLatency={}, " +
+        "MedianAbsoluteDeviation={}, upperLimitLatency={}",
+        sorted, median, mad, upperLimitLatency);
+
+    // Find resources whose latency exceeds the threshold.
+    for (Map.Entry<String, Double> entry : stats.entrySet()) {
+      if (entry.getValue() > upperLimitLatency) {
+        slowResources.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    return slowResources;
+  }
+
+  /**
+   * Compute the Median Absolute Deviation of a sorted list.
+   */
+  public static Double computeMad(List<Double> sortedValues) {
+    if (sortedValues.size() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot compute the Median Absolute Deviation " +
+              "of an empty list.");
+    }
+
+    // First get the median of the values.
+    Double median = computeMedian(sortedValues);
+    List<Double> deviations = new ArrayList<>(sortedValues);
+
+    // Then update the list to store deviation from the median.
+    for (int i = 0; i < sortedValues.size(); ++i) {
+      deviations.set(i, Math.abs(sortedValues.get(i) - median));
+    }
+
+    // Finally get the median absolute deviation.
+    Collections.sort(deviations);
+    return computeMedian(deviations) * MAD_MULTIPLIER;
+  }
+
+  /**
+   * Compute the median of a sorted list.
+   */
+  public static Double computeMedian(List<Double> sortedValues) {
+    if (sortedValues.size() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot compute the median of an empty list.");
+    }
+
+    Double median = sortedValues.get(sortedValues.size() / 2);
+    if (sortedValues.size() % 2 == 0) {
+      median += sortedValues.get((sortedValues.size() / 2) - 1);
+      median /= 2;
+    }
+    return median;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
deleted file mode 100644
index b6278ce..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.metrics;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * A utility class to help detect nodes whose aggregate latency
- * is an outlier within a given set.
- *
- * We use the median absolute deviation for outlier detection as
- * described in the following publication:
- *
- * Leys, C., et al., Detecting outliers: Do not use standard deviation
- * around the mean, use absolute deviation around the median.
- * http://dx.doi.org/10.1016/j.jesp.2013.03.013
- *
- * We augment the above scheme with the following heuristics to be even
- * more conservative:
- *
- *  1. Skip outlier detection if the sample size is too small.
- *  2. Never flag nodes whose aggregate latency is below a low threshold.
- *  3. Never flag nodes whose aggregate latency is less than a small
- *     multiple of the median.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class SlowNodeDetector {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(SlowNodeDetector.class);
-
-  /**
-   * Minimum number of peers to run outlier detection.
-   */
-  private static long minOutlierDetectionPeers = 10;
-
-  /**
-   * The multiplier is from Leys, C. et al.
-   */
-  private static final double MAD_MULTIPLIER = (double) 1.4826;
-
-  /**
-   * Threshold in milliseconds below which a DataNode is definitely not slow.
-   */
-  private final long lowThresholdMs;
-
-  /**
-   * Deviation multiplier. A sample is considered to be an outlier if it
-   * exceeds the median by (multiplier * median abs. deviation). 3 is a
-   * conservative choice.
-   */
-  private static final int DEVIATION_MULTIPLIER = 3;
-
-  /**
-   * If most of the samples are clustered together, the MAD can be
-   * low. The median multiplier introduces another safeguard to avoid
-   * overaggressive outlier detection.
-   */
-  @VisibleForTesting
-  static final int MEDIAN_MULTIPLIER = 3;
-
-  public SlowNodeDetector(long lowThresholdMs) {
-    this.lowThresholdMs = lowThresholdMs;
-  }
-
-  /**
-   * Return a set of DataNodes whose latency is much higher than
-   * their peers. The input is a map of (node -> aggregate latency)
-   * entries.
-   *
-   * The aggregate may be an arithmetic mean or a percentile e.g.
-   * 90th percentile. Percentiles are a better choice than median
-   * since latency is usually not a normal distribution.
-   *
-   * This method allocates temporary memory O(n) and
-   * has run time O(n.log(n)), where n = stats.size().
-   *
-   * @return
-   */
-  public Map<String, Double> getOutliers(Map<String, Double> stats) {
-    if (stats.size() < minOutlierDetectionPeers) {
-      LOG.debug("Skipping statistical outlier detection as we don't have " +
-              "latency data for enough peers. Have {}, need at least {}",
-          stats.size(), minOutlierDetectionPeers);
-      return ImmutableMap.of();
-    }
-    // Compute the median absolute deviation of the aggregates.
-    final List<Double> sorted = new ArrayList<>(stats.values());
-    Collections.sort(sorted);
-    final Double median = computeMedian(sorted);
-    final Double mad = computeMad(sorted);
-    Double upperLimitLatency = Math.max(
-        lowThresholdMs, median * MEDIAN_MULTIPLIER);
-    upperLimitLatency = Math.max(
-        upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
-
-    final Map<String, Double> slowNodes = new HashMap<>();
-
-    LOG.trace("getOutliers: List={}, MedianLatency={}, " +
-        "MedianAbsoluteDeviation={}, upperLimitLatency={}",
-        sorted, median, mad, upperLimitLatency);
-
-    // Find nodes whose latency exceeds the threshold.
-    for (Map.Entry<String, Double> entry : stats.entrySet()) {
-      if (entry.getValue() > upperLimitLatency) {
-        slowNodes.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    return slowNodes;
-  }
-
-  /**
-   * Compute the Median Absolute Deviation of a sorted list.
-   */
-  public static Double computeMad(List<Double> sortedValues) {
-    if (sortedValues.size() == 0) {
-      throw new IllegalArgumentException(
-          "Cannot compute the Median Absolute Deviation " +
-              "of an empty list.");
-    }
-
-    // First get the median of the values.
-    Double median = computeMedian(sortedValues);
-    List<Double> deviations = new ArrayList<>(sortedValues);
-
-    // Then update the list to store deviation from the median.
-    for (int i = 0; i < sortedValues.size(); ++i) {
-      deviations.set(i, Math.abs(sortedValues.get(i) - median));
-    }
-
-    // Finally get the median absolute deviation.
-    Collections.sort(deviations);
-    return computeMedian(deviations) * MAD_MULTIPLIER;
-  }
-
-  /**
-   * Compute the median of a sorted list.
-   */
-  public static Double computeMedian(List<Double> sortedValues) {
-    if (sortedValues.size() == 0) {
-      throw new IllegalArgumentException(
-          "Cannot compute the median of an empty list.");
-    }
-
-    Double median = sortedValues.get(sortedValues.size() / 2);
-    if (sortedValues.size() % 2 == 0) {
-      median += sortedValues.get((sortedValues.size() / 2) - 1);
-      median /= 2;
-    }
-    return median;
-  }
-
-  /**
-   * This method *must not* be used outside of unit tests.
-   */
-  @VisibleForTesting
-  static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) {
-    SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers;
-  }
-
-  @VisibleForTesting
-  static long getMinOutlierDetectionPeers() {
-    return minOutlierDetectionPeers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 4ea0e69..ca08eb3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1930,7 +1930,7 @@
 </property>
 
 <property>
-  <name>dfs.datanode.slow.peers.report.interval</name>
+  <name>dfs.datanode.outliers.report.interval</name>
   <value>30m</value>
   <description>
     This setting controls how frequently DataNodes will report their peer

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
index 34e15e5..eb7769e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java
@@ -54,13 +54,14 @@ public class TestDataNodeOutlierDetectionViaMetrics {
   private static final int ROLLING_AVERAGE_WINDOWS = 10;
   private static final int SLOW_NODE_LATENCY_MS = 20_000;
   private static final int FAST_NODE_MAX_LATENCY_MS = 5;
+  private static final long MIN_OUTLIER_DETECTION_PEERS = 10;
 
   private Random random = new Random(System.currentTimeMillis());
 
   @Before
   public void setup() {
     GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL);
-    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(OutlierDetector.LOG, Level.ALL);
   }
 
   /**
@@ -111,8 +112,7 @@ public class TestDataNodeOutlierDetectionViaMetrics {
    */
   public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) {
     for (int nodeIndex = 0;
-         nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers();
-         ++nodeIndex) {
+         nodeIndex < MIN_OUTLIER_DETECTION_PEERS; ++nodeIndex) {
       final String nodeName = "FastNode-" + nodeIndex;
       LOG.info("Generating stats for node {}", nodeName);
       for (int i = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6c47769/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
index 6107d63..9bd0687 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java
@@ -39,7 +39,7 @@ import java.util.Set;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Unit tests for {@link SlowNodeDetector}.
+ * Unit tests for {@link OutlierDetector}.
  */
 public class TestSlowNodeDetector {
   public static final Logger LOG =
@@ -182,7 +182,7 @@ public class TestSlowNodeDetector {
           .put(ImmutableMap.of(
               "n1", LOW_THRESHOLD + 0.1,
               "n2", LOW_THRESHOLD + 0.1,
-              "n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1),
+              "n3", LOW_THRESHOLD * OutlierDetector.MEDIAN_MULTIPLIER - 0.1),
               ImmutableSet.<String>of())
 
           // A statistical outlier must be returned if it is outside a
@@ -191,7 +191,7 @@ public class TestSlowNodeDetector {
               "n1", LOW_THRESHOLD + 0.1,
               "n2", LOW_THRESHOLD + 0.1,
               "n3", (LOW_THRESHOLD + 0.1) *
-                  SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1),
+                  OutlierDetector.MEDIAN_MULTIPLIER + 0.1),
               ImmutableSet.of("n3"))
 
           // Only the statistical outliers n3 and n11 should be returned.
@@ -232,13 +232,13 @@ public class TestSlowNodeDetector {
           .build();
 
 
-  private SlowNodeDetector slowNodeDetector;
+  private OutlierDetector slowNodeDetector;
 
   @Before
   public void setup() {
-    slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD);
-    SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS);
-    GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL);
+    slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_PEERS,
+        (long) LOW_THRESHOLD);
+    GenericTestUtils.setLogLevel(OutlierDetector.LOG, Level.ALL);
   }
 
   @Test
@@ -258,7 +258,7 @@ public class TestSlowNodeDetector {
   }
 
   /**
-   * Unit test for {@link SlowNodeDetector#computeMedian(List)}.
+   * Unit test for {@link OutlierDetector#computeMedian(List)}.
    */
   @Test
   public void testMediansFromTestMatrix() {
@@ -266,7 +266,7 @@ public class TestSlowNodeDetector {
         medianTestMatrix.entrySet()) {
       final List<Double> inputList = new ArrayList<>(entry.getKey());
       Collections.sort(inputList);
-      final Double median = SlowNodeDetector.computeMedian(inputList);
+      final Double median = OutlierDetector.computeMedian(inputList);
       final Double expectedMedian = entry.getValue().getLeft();
 
       // Ensure that the median is within 0.001% of expected.
@@ -282,7 +282,7 @@ public class TestSlowNodeDetector {
   }
 
   /**
-   * Unit test for {@link SlowNodeDetector#computeMad(List)}.
+   * Unit test for {@link OutlierDetector#computeMad(List)}.
    */
   @Test
   public void testMadsFromTestMatrix() {
@@ -290,7 +290,7 @@ public class TestSlowNodeDetector {
         medianTestMatrix.entrySet()) {
       final List<Double> inputList = new ArrayList<>(entry.getKey());
       Collections.sort(inputList);
-      final Double mad = SlowNodeDetector.computeMad(inputList);
+      final Double mad = OutlierDetector.computeMad(inputList);
       final Double expectedMad = entry.getValue().getRight();
 
       // Ensure that the MAD is within 0.001% of expected.
@@ -315,21 +315,21 @@ public class TestSlowNodeDetector {
   }
 
   /**
-   * Verify that {@link SlowNodeDetector#computeMedian(List)} throws when
+   * Verify that {@link OutlierDetector#computeMedian(List)} throws when
    * passed an empty list.
    */
   @Test(expected=IllegalArgumentException.class)
   public void testMedianOfEmptyList() {
-    SlowNodeDetector.computeMedian(Collections.<Double>emptyList());
+    OutlierDetector.computeMedian(Collections.<Double>emptyList());
   }
 
   /**
-   * Verify that {@link SlowNodeDetector#computeMad(List)} throws when
+   * Verify that {@link OutlierDetector#computeMad(List)} throws when
    * passed an empty list.
    */
   @Test(expected=IllegalArgumentException.class)
   public void testMadOfEmptyList() {
-    SlowNodeDetector.computeMedian(Collections.<Double>emptyList());
+    OutlierDetector.computeMedian(Collections.<Double>emptyList());
   }
   
   private static class Pair<L, R> {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message