hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xg...@apache.org
Subject [12/17] hadoop git commit: HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal.
Date Wed, 21 Dec 2016 22:42:36 GMT
HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f678080d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f678080d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f678080d

Branch: refs/heads/YARN-5734
Commit: f678080dbd25a218e0406463a3c3a1fc03680702
Parents: 5daa8d8
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Tue Dec 20 13:53:07 2016 -0800
Committer: Xiaoyu Yao <xyao@apache.org>
Committed: Tue Dec 20 13:53:32 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/server/datanode/DataNode.java   | 130 ++++++++-----------
 .../datanode/checker/DatasetVolumeChecker.java  | 116 +++++++++++------
 .../server/datanode/fsdataset/FsDatasetSpi.java |   3 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   5 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   7 -
 .../datanode/fsdataset/impl/FsVolumeList.java   |  25 +---
 .../blockmanagement/TestBlockStatsMXBean.java   |   4 +
 .../server/datanode/SimulatedFSDataset.java     |  18 ++-
 .../datanode/TestDataNodeHotSwapVolumes.java    |   3 +
 .../datanode/TestDataNodeVolumeFailure.java     |   3 +
 .../TestDataNodeVolumeFailureReporting.java     |   3 +
 .../TestDataNodeVolumeFailureToleration.java    |   3 +
 .../hdfs/server/datanode/TestDiskError.java     |  24 ++--
 .../checker/TestDatasetVolumeChecker.java       |  17 ++-
 .../TestDatasetVolumeCheckerFailures.java       |  45 ++++---
 .../extdataset/ExternalDatasetImpl.java         |   3 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |  84 ++----------
 .../fsdataset/impl/TestFsVolumeList.java        |  37 ------
 18 files changed, 233 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 794b1ad..a94c4b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -74,6 +74,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -85,7 +86,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -108,6 +108,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
 import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
@@ -369,11 +370,7 @@ public class DataNode extends ReconfigurableBase
   SaslDataTransferClient saslClient;
   SaslDataTransferServer saslServer;
   private ObjectName dataNodeInfoBeanName;
-  private Thread checkDiskErrorThread = null;
-  protected final int checkDiskErrorInterval;
-  private boolean checkDiskErrorFlag = false;
-  private Object checkDiskErrorMutex = new Object();
-  private long lastDiskErrorCheck;
+  private volatile long lastDiskErrorCheck;
   private String supergroup;
   private boolean isPermissionEnabled;
   private String dnUserName = null;
@@ -389,6 +386,7 @@ public class DataNode extends ReconfigurableBase
   @Nullable
   private final StorageLocationChecker storageLocationChecker;
 
+  private final DatasetVolumeChecker volumeChecker;
 
   private final SocketFactory socketFactory;
 
@@ -407,7 +405,7 @@ public class DataNode extends ReconfigurableBase
    */
   @VisibleForTesting
   @InterfaceAudience.LimitedPrivate("HDFS")
-  DataNode(final Configuration conf) {
+  DataNode(final Configuration conf) throws DiskErrorException {
     super(conf);
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
@@ -420,11 +418,10 @@ public class DataNode extends ReconfigurableBase
     this.connectToDnViaHostname = false;
     this.blockScanner = new BlockScanner(this, this.getConf());
     this.pipelineSupportECN = false;
-    this.checkDiskErrorInterval =
-        ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
     initOOBTimeout();
     storageLocationChecker = null;
+    volumeChecker = new DatasetVolumeChecker(conf, new Timer());
   }
 
   /**
@@ -464,8 +461,7 @@ public class DataNode extends ReconfigurableBase
         ",hdfs-" +
         conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
 
-    this.checkDiskErrorInterval =
-        ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
+    this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
 
     // Determine whether we should try to pass file descriptors to clients.
     if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
@@ -1918,11 +1914,6 @@ public class DataNode extends ReconfigurableBase
       }
     }
 
-    // Interrupt the checkDiskErrorThread and terminate it.
-    if(this.checkDiskErrorThread != null) {
-      this.checkDiskErrorThread.interrupt();
-    }
-    
     // Record the time of initial notification
     long timeNotified = Time.monotonicNow();
 
@@ -1944,6 +1935,8 @@ public class DataNode extends ReconfigurableBase
       }
     }
 
+    volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
+
     if (storageLocationChecker != null) {
       storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
     }
@@ -2051,16 +2044,19 @@ public class DataNode extends ReconfigurableBase
    * Check if there is a disk failure asynchronously and if so, handle the error
    */
   public void checkDiskErrorAsync() {
-    synchronized(checkDiskErrorMutex) {
-      checkDiskErrorFlag = true;
-      if(checkDiskErrorThread == null) {
-        startCheckDiskErrorThread();
-        checkDiskErrorThread.start();
-        LOG.info("Starting CheckDiskError Thread");
-      }
-    }
+    volumeChecker.checkAllVolumesAsync(
+        data, (healthyVolumes, failedVolumes) -> {
+          if (failedVolumes.size() > 0) {
+            LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
+                failedVolumes.size(), failedVolumes);
+          } else {
+            LOG.debug("checkDiskErrorAsync: no volume failures detected");
+          }
+          lastDiskErrorCheck = Time.monotonicNow();
+          handleVolumeFailures(failedVolumes);
+        });
   }
-  
+
   private void handleDiskError(String errMsgr) {
     final boolean hasEnoughResources = data.hasEnoughResource();
     LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
@@ -3208,11 +3204,40 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
-   * Check the disk error
+   * Check the disk error synchronously.
    */
-  private void checkDiskError() {
-    Set<StorageLocation> unhealthyLocations = data.checkDataDir();
-    if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) {
+  @VisibleForTesting
+  public void checkDiskError() throws IOException {
+    Set<FsVolumeSpi> unhealthyVolumes;
+    try {
+      unhealthyVolumes = volumeChecker.checkAllVolumes(data);
+      lastDiskErrorCheck = Time.monotonicNow();
+    } catch (InterruptedException e) {
+      LOG.error("Interruped while running disk check", e);
+      throw new IOException("Interrupted while running disk check", e);
+    }
+
+    if (unhealthyVolumes.size() > 0) {
+      LOG.warn("checkDiskError got {} failed volumes - {}",
+          unhealthyVolumes.size(), unhealthyVolumes);
+      handleVolumeFailures(unhealthyVolumes);
+    } else {
+      LOG.debug("checkDiskError encountered no failures");
+    }
+  }
+
+  private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
+    data.handleVolumeFailures(unhealthyVolumes);
+    Set<StorageLocation> unhealthyLocations = new HashSet<>(
+        unhealthyVolumes.size());
+
+    if (!unhealthyVolumes.isEmpty()) {
+      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
+      for (FsVolumeSpi vol : unhealthyVolumes) {
+        unhealthyLocations.add(vol.getStorageLocation());
+        sb.append(vol.getStorageLocation()).append(";");
+      }
+
       try {
         // Remove all unhealthy volumes from DataNode.
         removeVolumes(unhealthyLocations, false);
@@ -3220,56 +3245,13 @@ public class DataNode extends ReconfigurableBase
         LOG.warn("Error occurred when removing unhealthy storage dirs: "
             + e.getMessage(), e);
       }
-      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
-      for (StorageLocation location : unhealthyLocations) {
-        sb.append(location + ";");
-      }
+      LOG.info(sb.toString());
       handleDiskError(sb.toString());
     }
   }
 
-  /**
-   * Starts a new thread which will check for disk error check request 
-   * every 5 sec
-   */
-  private void startCheckDiskErrorThread() {
-    checkDiskErrorThread = new Thread(new Runnable() {
-          @Override
-          public void run() {
-            while(shouldRun) {
-              boolean tempFlag ;
-              synchronized(checkDiskErrorMutex) {
-                tempFlag = checkDiskErrorFlag;
-                checkDiskErrorFlag = false;
-              }
-              if(tempFlag) {
-                try {
-                  checkDiskError();
-                } catch (Exception e) {
-                  LOG.warn("Unexpected exception occurred while checking disk error  " + e);
-                  checkDiskErrorThread = null;
-                  return;
-                }
-                synchronized(checkDiskErrorMutex) {
-                  lastDiskErrorCheck = Time.monotonicNow();
-                }
-              }
-              try {
-                Thread.sleep(checkDiskErrorInterval);
-              } catch (InterruptedException e) {
-                LOG.debug("InterruptedException in check disk error thread", e);
-                checkDiskErrorThread = null;
-                return;
-              }
-            }
-          }
-    });
-  }
-  
   public long getLastDiskErrorCheck() {
-    synchronized(checkDiskErrorMutex) {
-      return lastDiskErrorCheck;
-    }
+    return lastDiskErrorCheck;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index 8a57812..ba09d23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -161,37 +160,54 @@ public class DatasetVolumeChecker {
    * @param dataset - FsDatasetSpi to be checked.
    * @return set of failed volumes.
    */
-  public Set<StorageLocation> checkAllVolumes(
+  public Set<FsVolumeSpi> checkAllVolumes(
       final FsDatasetSpi<? extends FsVolumeSpi> dataset)
       throws InterruptedException {
-
-    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+    final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+    if (gap < minDiskCheckGapMs) {
       numSkippedChecks.incrementAndGet();
+      LOG.trace(
+          "Skipped checking all volumes, time since last check {} is less " +
+          "than the minimum gap between checks ({} ms).",
+          gap, minDiskCheckGapMs);
       return Collections.emptySet();
     }
 
-    lastAllVolumesCheck = timer.monotonicNow();
-    final Set<StorageLocation> healthyVolumes = new HashSet<>();
-    final Set<StorageLocation> failedVolumes = new HashSet<>();
-    final Set<StorageLocation> allVolumes = new HashSet<>();
-
     final FsDatasetSpi.FsVolumeReferences references =
         dataset.getFsVolumeReferences();
-    final CountDownLatch resultsLatch = new CountDownLatch(references.size());
+
+    if (references.size() == 0) {
+      LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
+      return Collections.emptySet();
+    }
+
+    lastAllVolumesCheck = timer.monotonicNow();
+    final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
+    final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
+    final Set<FsVolumeSpi> allVolumes = new HashSet<>();
+
+    final AtomicLong numVolumes = new AtomicLong(references.size());
+    final CountDownLatch latch = new CountDownLatch(1);
 
     for (int i = 0; i < references.size(); ++i) {
       final FsVolumeReference reference = references.getReference(i);
-      allVolumes.add(reference.getVolume().getStorageLocation());
+      allVolumes.add(reference.getVolume());
       ListenableFuture<VolumeCheckResult> future =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       LOG.info("Scheduled health check for volume {}", reference.getVolume());
       Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, resultsLatch, null));
+          reference, healthyVolumes, failedVolumes, numVolumes, new Callback() {
+        @Override
+        public void call(Set<FsVolumeSpi> ignored1,
+                         Set<FsVolumeSpi> ignored2) {
+          latch.countDown();
+        }
+      }));
     }
 
     // Wait until our timeout elapses, after which we give up on
     // the remaining volumes.
-    if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+    if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
       LOG.warn("checkAllVolumes timed out after {} ms" +
           maxAllowedTimeForCheckMs);
     }
@@ -225,18 +241,28 @@ public class DatasetVolumeChecker {
   public boolean checkAllVolumesAsync(
       final FsDatasetSpi<? extends FsVolumeSpi> dataset,
       Callback callback) {
-
-    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+    final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+    if (gap < minDiskCheckGapMs) {
       numSkippedChecks.incrementAndGet();
+      LOG.trace(
+          "Skipped checking all volumes, time since last check {} is less " +
+              "than the minimum gap between checks ({} ms).",
+          gap, minDiskCheckGapMs);
       return false;
     }
 
-    lastAllVolumesCheck = timer.monotonicNow();
-    final Set<StorageLocation> healthyVolumes = new HashSet<>();
-    final Set<StorageLocation> failedVolumes = new HashSet<>();
     final FsDatasetSpi.FsVolumeReferences references =
         dataset.getFsVolumeReferences();
-    final CountDownLatch latch = new CountDownLatch(references.size());
+
+    if (references.size() == 0) {
+      LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
+      return false;
+    }
+
+    lastAllVolumesCheck = timer.monotonicNow();
+    final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
+    final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
+    final AtomicLong numVolumes = new AtomicLong(references.size());
 
     LOG.info("Checking {} volumes", references.size());
     for (int i = 0; i < references.size(); ++i) {
@@ -245,7 +271,7 @@ public class DatasetVolumeChecker {
       ListenableFuture<VolumeCheckResult> future =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, latch, callback));
+          reference, healthyVolumes, failedVolumes, numVolumes, callback));
     }
     numAsyncDatasetChecks.incrementAndGet();
     return true;
@@ -260,8 +286,8 @@ public class DatasetVolumeChecker {
      * @param healthyVolumes set of volumes that passed disk checks.
      * @param failedVolumes set of volumes that failed disk checks.
      */
-    void call(Set<StorageLocation> healthyVolumes,
-              Set<StorageLocation> failedVolumes);
+    void call(Set<FsVolumeSpi> healthyVolumes,
+              Set<FsVolumeSpi> failedVolumes);
   }
 
   /**
@@ -273,8 +299,10 @@ public class DatasetVolumeChecker {
    *
    * @param volume the volume that is to be checked.
    * @param callback callback to be invoked when the volume check completes.
+   * @return true if the check was scheduled and the callback will be invoked.
+   *         false otherwise.
    */
-  public void checkVolume(
+  public boolean checkVolume(
       final FsVolumeSpi volume,
       Callback callback) {
     FsVolumeReference volumeReference;
@@ -283,14 +311,15 @@ public class DatasetVolumeChecker {
     } catch (ClosedChannelException e) {
       // The volume has already been closed.
       callback.call(new HashSet<>(), new HashSet<>());
-      return;
+      return false;
     }
     ListenableFuture<VolumeCheckResult> future =
         delegateChecker.schedule(volume, IGNORED_CONTEXT);
     numVolumeChecks.incrementAndGet();
     Futures.addCallback(future, new ResultHandler(
         volumeReference, new HashSet<>(), new HashSet<>(),
-        new CountDownLatch(1), callback));
+        new AtomicLong(1), callback));
+    return true;
   }
 
   /**
@@ -299,26 +328,35 @@ public class DatasetVolumeChecker {
   private class ResultHandler
       implements FutureCallback<VolumeCheckResult> {
     private final FsVolumeReference reference;
-    private final Set<StorageLocation> failedVolumes;
-    private final Set<StorageLocation> healthyVolumes;
-    private final CountDownLatch latch;
-    private final AtomicLong numVolumes;
+    private final Set<FsVolumeSpi> failedVolumes;
+    private final Set<FsVolumeSpi> healthyVolumes;
+    private final AtomicLong volumeCounter;
 
     @Nullable
     private final Callback callback;
 
+    /**
+     *
+     * @param reference FsVolumeReference to be released when the check is
+     *                  complete.
+     * @param healthyVolumes set of healthy volumes. If the disk check is
+     *                       successful, add the volume here.
+     * @param failedVolumes set of failed volumes. If the disk check fails,
+     *                      add the volume here.
+     * @param semaphore semaphore used to trigger callback invocation.
+     * @param callback invoked when the semaphore can be successfully acquired.
+     */
     ResultHandler(FsVolumeReference reference,
-                  Set<StorageLocation> healthyVolumes,
-                  Set<StorageLocation> failedVolumes,
-                  CountDownLatch latch,
+                  Set<FsVolumeSpi> healthyVolumes,
+                  Set<FsVolumeSpi> failedVolumes,
+                  AtomicLong volumeCounter,
                   @Nullable Callback callback) {
       Preconditions.checkState(reference != null);
       this.reference = reference;
       this.healthyVolumes = healthyVolumes;
       this.failedVolumes = failedVolumes;
-      this.latch = latch;
+      this.volumeCounter = volumeCounter;
       this.callback = callback;
-      numVolumes = new AtomicLong(latch.getCount());
     }
 
     @Override
@@ -355,13 +393,13 @@ public class DatasetVolumeChecker {
 
     private void markHealthy() {
       synchronized (DatasetVolumeChecker.this) {
-        healthyVolumes.add(reference.getVolume().getStorageLocation());
+        healthyVolumes.add(reference.getVolume());
       }
     }
 
     private void markFailed() {
       synchronized (DatasetVolumeChecker.this) {
-        failedVolumes.add(reference.getVolume().getStorageLocation());
+        failedVolumes.add(reference.getVolume());
       }
     }
 
@@ -372,10 +410,8 @@ public class DatasetVolumeChecker {
 
     private void invokeCallback() {
       try {
-        latch.countDown();
-
-        if (numVolumes.decrementAndGet() == 0 &&
-            callback != null) {
+        final long remaining = volumeCounter.decrementAndGet();
+        if (callback != null && remaining == 0) {
           callback.call(healthyVolumes, failedVolumes);
         }
       } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 30f045f..9e979f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -494,8 +494,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
     /**
      * Check if all the data directories are healthy
      * @return A set of unhealthy data directories.
+     * @param failedVolumes
      */
-  Set<StorageLocation> checkDataDir();
+  void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes);
 
   /**
    * Shutdown the FSDataset

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 35561cd..0d5a12c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2067,10 +2067,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * if some volumes failed - the caller must emove all the blocks that belong
    * to these failed volumes.
    * @return the failed volumes. Returns null if no volume failed.
+   * @param failedVolumes
    */
   @Override // FsDatasetSpi
-  public Set<StorageLocation> checkDataDir() {
-   return volumes.checkDirs();
+  public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
+    volumes.handleVolumeFailures(failedVolumes);
   }
     
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e28ee27..753c083 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -959,13 +959,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return cacheExecutor;
   }
 
-  void checkDirs() throws DiskErrorException {
-    // TODO:FEDERATION valid synchronization
-    for(BlockPoolSlice s : bpSlices.values()) {
-      s.checkDirs();
-    }
-  }
-
   @Override
   public VolumeCheckResult check(VolumeCheckContext ignored)
       throws DiskErrorException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index cf9c319..64921d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
 class FsVolumeList {
@@ -235,23 +233,14 @@ class FsVolumeList {
    * Use {@link checkDirsLock} to allow only one instance of checkDirs() call.
    *
    * @return list of all the failed volumes.
+   * @param failedVolumes
    */
-  Set<StorageLocation> checkDirs() {
+  void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
     try (AutoCloseableLock lock = checkDirsLock.acquire()) {
-      Set<StorageLocation> failedLocations = null;
-      // Make a copy of volumes for performing modification 
-      final List<FsVolumeImpl> volumeList = getVolumes();
 
-      for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
-        final FsVolumeImpl fsv = i.next();
+      for(FsVolumeSpi vol : failedVolumes) {
+        FsVolumeImpl fsv = (FsVolumeImpl) vol;
         try (FsVolumeReference ref = fsv.obtainReference()) {
-          fsv.checkDirs();
-        } catch (DiskErrorException e) {
-          FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
-          if (failedLocations == null) {
-            failedLocations = new HashSet<>(1);
-          }
-          failedLocations.add(fsv.getStorageLocation());
           addVolumeFailureInfo(fsv);
           removeVolume(fsv);
         } catch (ClosedChannelException e) {
@@ -262,13 +251,7 @@ class FsVolumeList {
         }
       }
       
-      if (failedLocations != null && failedLocations.size() > 0) {
-        FsDatasetImpl.LOG.warn("Completed checkDirs. Found " +
-            failedLocations.size() + " failure volumes.");
-      }
-
       waitVolumeRemoved(5000, checkDirsLockCondition);
-      return failedLocations;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
index 476565dc..b7583c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
@@ -30,9 +30,11 @@ import java.net.URL;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -52,6 +54,8 @@ public class TestBlockStatsMXBean {
   @Before
   public void setup() throws IOException {
     HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = null;
     StorageType[][] types = new StorageType[6][];
     for (int i=0; i<3; i++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 484fbe4..8472eca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -489,7 +489,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     public FsVolumeReference obtainReference() throws ClosedChannelException {
-      return null;
+      return new FsVolumeReference() {
+        @Override
+        public void close() throws IOException {
+          // no-op.
+        }
+
+        @Override
+        public FsVolumeSpi getVolume() {
+          return SimulatedVolume.this;
+        }
+      };
     }
 
     @Override
@@ -1078,9 +1088,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public Set<StorageLocation> checkDataDir() {
-    // nothing to check for simulated data set
-    return null;
+  public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
   }
 
   @Override // FsDatasetSpi
@@ -1349,7 +1357,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public FsVolumeReferences getFsVolumeReferences() {
-    throw new UnsupportedOperationException();
+    return new FsVolumeReferences(Collections.singletonList(volume));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 5607ccc..e31e783 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -62,6 +62,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -113,6 +114,8 @@ public class TestDataNodeHotSwapVolumes {
         1000);
     /* Allow 1 volume failure */
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
 
     MiniDFSNNTopology nnTopology =
         MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 8db7658..06e2871 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -33,6 +33,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -118,6 +119,8 @@ public class TestDataNodeVolumeFailure {
     // Allow a single volume failure (there are two volumes)
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index aa9b7aa..3d37b10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -30,6 +30,7 @@ import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -659,6 +660,8 @@ public class TestDataNodeVolumeFailureReporting {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
         failedVolumesTolerated);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
         .storagesPerDatanode(storagesPerDatanode).build();
     cluster.waitActive();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
index 5ff7d9b..de50ccb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -144,6 +145,8 @@ public class TestDataNodeVolumeFailureToleration {
     // Bring up two additional datanodes that need both of their volumes
     // functioning in order to stay up.
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
     final DatanodeManager dm = cluster.getNamesystem().getBlockManager(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 56dee43..cd86720 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -26,7 +26,9 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,8 +51,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -69,6 +71,9 @@ public class TestDiskError {
   public void setUp() throws Exception {
     conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
@@ -213,19 +218,22 @@ public class TestDiskError {
    * Before refactoring the code the above function was not getting called 
    * @throws IOException, InterruptedException
    */
-  @Test
-  public void testcheckDiskError() throws IOException, InterruptedException {
+  @Test(timeout=60000)
+  public void testcheckDiskError() throws Exception {
     if(cluster.getDataNodes().size() <= 0) {
       cluster.startDataNodes(conf, 1, true, null, null);
       cluster.waitActive();
     }
     DataNode dataNode = cluster.getDataNodes().get(0);
-    long slackTime = dataNode.checkDiskErrorInterval/2;
     //checking for disk error
-    dataNode.checkDiskErrorAsync();
-    Thread.sleep(dataNode.checkDiskErrorInterval);
-    long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
-    assertTrue("Disk Error check is not performed within  " + dataNode.checkDiskErrorInterval +  "  ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
+    final long lastCheckTimestamp = dataNode.getLastDiskErrorCheck();
+    dataNode.checkDiskError();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return dataNode.getLastDiskErrorCheck() > lastCheckTimestamp;
+      }
+    }, 100, 60000);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
index fa809d1..50096ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
@@ -35,7 +35,10 @@ import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -103,8 +106,8 @@ public class TestDatasetVolumeChecker {
      */
     checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
       @Override
-      public void call(Set<StorageLocation> healthyVolumes,
-                       Set<StorageLocation> failedVolumes) {
+      public void call(Set<FsVolumeSpi> healthyVolumes,
+                       Set<FsVolumeSpi> failedVolumes) {
         numCallbackInvocations.incrementAndGet();
         if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
           assertThat(healthyVolumes.size(), is(1));
@@ -138,7 +141,7 @@ public class TestDatasetVolumeChecker {
         new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
     checker.setDelegateChecker(new DummyChecker());
 
-    Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+    Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
     LOG.info("Got back {} failed volumes", failedVolumes.size());
 
     if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
@@ -174,8 +177,8 @@ public class TestDatasetVolumeChecker {
         dataset, new DatasetVolumeChecker.Callback() {
           @Override
           public void call(
-              Set<StorageLocation> healthyVolumes,
-              Set<StorageLocation> failedVolumes) {
+              Set<FsVolumeSpi> healthyVolumes,
+              Set<FsVolumeSpi> failedVolumes) {
             LOG.info("Got back {} failed volumes", failedVolumes.size());
             if (expectedVolumeHealth == null ||
                 expectedVolumeHealth == FAILED) {
@@ -236,7 +239,7 @@ public class TestDatasetVolumeChecker {
     return dataset;
   }
 
-  private static List<FsVolumeSpi> makeVolumes(
+  static List<FsVolumeSpi> makeVolumes(
       int numVolumes, VolumeCheckResult health) throws Exception {
     final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes);
     for (int i = 0; i < numVolumes; ++i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
index b57d84f..16c333b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
 import org.apache.hadoop.util.FakeTimer;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -47,6 +48,19 @@ public class TestDatasetVolumeCheckerFailures {
   public static final Logger LOG =LoggerFactory.getLogger(
       TestDatasetVolumeCheckerFailures.class);
 
+  private FakeTimer timer;
+  private Configuration conf;
+
+  private static final long MIN_DISK_CHECK_GAP_MS = 1000; // 1 second.
+
+  @Before
+  public void commonInit() {
+    timer = new FakeTimer();
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        MIN_DISK_CHECK_GAP_MS, TimeUnit.MILLISECONDS);
+  }
+
   /**
    * Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}.
    * @throws Exception
@@ -61,14 +75,13 @@ public class TestDatasetVolumeCheckerFailures {
         TestDatasetVolumeChecker.makeDataset(volumes);
 
     // Create a disk checker with a very low timeout.
-    final HdfsConfiguration conf = new HdfsConfiguration();
     conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
         1, TimeUnit.SECONDS);
     final DatasetVolumeChecker checker =
         new DatasetVolumeChecker(conf, new FakeTimer());
 
     // Ensure that the hung volume is detected as failed.
-    Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+    Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
     assertThat(failedVolumes.size(), is(1));
   }
 
@@ -86,10 +99,10 @@ public class TestDatasetVolumeCheckerFailures {
     final FsDatasetSpi<FsVolumeSpi> dataset =
         TestDatasetVolumeChecker.makeDataset(volumes);
 
-    DatasetVolumeChecker checker =
-        new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
-    Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
+    DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
+    Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
     assertThat(failedVolumes.size(), is(0));
+    assertThat(checker.getNumSyncDatasetChecks(), is(0L));
 
     // The closed volume should not have been checked as it cannot
     // be referenced.
@@ -98,13 +111,10 @@ public class TestDatasetVolumeCheckerFailures {
 
   @Test(timeout=60000)
   public void testMinGapIsEnforcedForSyncChecks() throws Exception {
+    final List<FsVolumeSpi> volumes =
+        TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
     final FsDatasetSpi<FsVolumeSpi> dataset =
-        TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
-    final FakeTimer timer = new FakeTimer();
-    final Configuration conf = new HdfsConfiguration();
-    final long minGapMs = 100;
-    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
-        minGapMs, TimeUnit.MILLISECONDS);
+        TestDatasetVolumeChecker.makeDataset(volumes);
     final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
 
     checker.checkAllVolumes(dataset);
@@ -116,7 +126,7 @@ public class TestDatasetVolumeCheckerFailures {
     assertThat(checker.getNumSkippedChecks(), is(1L));
 
     // Re-check after advancing the timer. Ensure the check is performed.
-    timer.advance(minGapMs);
+    timer.advance(MIN_DISK_CHECK_GAP_MS);
     checker.checkAllVolumes(dataset);
     assertThat(checker.getNumSyncDatasetChecks(), is(2L));
     assertThat(checker.getNumSkippedChecks(), is(1L));
@@ -124,13 +134,10 @@ public class TestDatasetVolumeCheckerFailures {
 
   @Test(timeout=60000)
   public void testMinGapIsEnforcedForASyncChecks() throws Exception {
+    final List<FsVolumeSpi> volumes =
+        TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
     final FsDatasetSpi<FsVolumeSpi> dataset =
-        TestDatasetVolumeChecker.makeDataset(Collections.emptyList());
-    final FakeTimer timer = new FakeTimer();
-    final Configuration conf = new HdfsConfiguration();
-    final long minGapMs = 100;
-    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
-        minGapMs, TimeUnit.MILLISECONDS);
+        TestDatasetVolumeChecker.makeDataset(volumes);
     final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
 
     checker.checkAllVolumesAsync(dataset, null);
@@ -142,7 +149,7 @@ public class TestDatasetVolumeCheckerFailures {
     assertThat(checker.getNumSkippedChecks(), is(1L));
 
     // Re-check after advancing the timer. Ensure the check is performed.
-    timer.advance(minGapMs);
+    timer.advance(MIN_DISK_CHECK_GAP_MS);
     checker.checkAllVolumesAsync(dataset, null);
     assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
     assertThat(checker.getNumSkippedChecks(), is(1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 5cd86e2..62ef731 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -239,8 +239,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public Set<StorageLocation> checkDataDir() {
-    return null;
+  public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index e48aae0..905c3f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -52,13 +52,10 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
@@ -66,8 +63,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -76,16 +71,18 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -94,13 +91,10 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.slf4j.Logger;
@@ -339,68 +333,6 @@ public class TestFsDatasetImpl {
     assertEquals(numExistingVolumes, getNumVolumes());
   }
 
-  @Test(timeout = 5000)
-  public void testChangeVolumeWithRunningCheckDirs() throws IOException {
-    RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
-        new RoundRobinVolumeChoosingPolicy<>();
-    conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
-    final BlockScanner blockScanner = new BlockScanner(datanode);
-    final FsVolumeList volumeList = new FsVolumeList(
-        Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
-    final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
-
-    // Initialize FsVolumeList with 5 mock volumes.
-    final int NUM_VOLUMES = 5;
-    for (int i = 0; i < NUM_VOLUMES; i++) {
-      FsVolumeImpl volume = mock(FsVolumeImpl.class);
-      oldVolumes.add(volume);
-      when(volume.getStorageLocation()).thenReturn(
-          StorageLocation.parse(new File("data" + i).toURI().toString()));
-      when(volume.checkClosed()).thenReturn(true);
-      FsVolumeReference ref = mock(FsVolumeReference.class);
-      when(ref.getVolume()).thenReturn(volume);
-      volumeList.addVolume(ref);
-    }
-
-    // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
-    // volume and add another volume. It does not affect checkDirs() running.
-    final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
-    final FsVolumeReference newRef = mock(FsVolumeReference.class);
-    when(newRef.getVolume()).thenReturn(newVolume);
-    when(newVolume.getStorageLocation()).thenReturn(
-        StorageLocation.parse(new File("data4").toURI().toString()));
-    FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
-    doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        volumeList.removeVolume(
-            StorageLocation.parse((new File("data4")).toURI().toString()),
-            false);
-        volumeList.addVolume(newRef);
-        return null;
-      }
-    }).when(blockedVolume).checkDirs();
-
-    FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
-    doThrow(new DiskChecker.DiskErrorException("broken"))
-        .when(brokenVolume).checkDirs();
-
-    volumeList.checkDirs();
-
-    // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
-    // before running removeVolume(), it is supposed to run checkDirs() on all
-    // the old volumes.
-    for (FsVolumeImpl volume : oldVolumes) {
-      verify(volume).checkDirs();
-    }
-    // New volume is not visible to checkDirs() process.
-    verify(newVolume, never()).checkDirs();
-    assertTrue(volumeList.getVolumes().contains(newVolume));
-    assertFalse(volumeList.getVolumes().contains(brokenVolume));
-    assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
-  }
-
   @Test
   public void testAddVolumeFailureReleasesInUseLock() throws IOException {
     FsDatasetImpl spyDataset = spy(dataset);
@@ -717,6 +649,9 @@ public class TestFsDatasetImpl {
       Configuration config = new HdfsConfiguration();
       config.setLong(
           DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+      config.setTimeDuration(
+          DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0,
+          TimeUnit.MILLISECONDS);
       config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
 
       cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
@@ -734,6 +669,8 @@ public class TestFsDatasetImpl {
           getVolume(block);
       File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
           .getBlockPoolId());
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+      DatanodeInfo info = lb.getLocations()[0];
 
       if (finalizedDir.exists()) {
         // Remove write and execute access so that checkDiskErrorThread detects
@@ -744,15 +681,14 @@ public class TestFsDatasetImpl {
       Assert.assertTrue("Reference count for the volume should be greater "
           + "than 0", volume.getReferenceCount() > 0);
       // Invoke the synchronous checkDiskError method
-      dataNode.getFSDataset().checkDataDir();
+      dataNode.checkDiskError();
       // Sleep for 1 second so that datanode can interrupt and cluster clean up
       GenericTestUtils.waitFor(new Supplier<Boolean>() {
           @Override public Boolean get() {
               return volume.getReferenceCount() == 0;
             }
           }, 100, 10);
-      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
-      DatanodeInfo info = lb.getLocations()[0];
+      assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1));
 
       try {
         out.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f678080d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 6eff300..83c15ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -109,43 +109,6 @@ public class TestFsVolumeList {
   }
 
   @Test(timeout=30000)
-  public void testCheckDirsWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(
-        Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
-    final List<FsVolumeImpl> volumes = new ArrayList<>();
-    for (int i = 0; i < 3; i++) {
-      File curDir = new File(baseDir, "volume-" + i);
-      curDir.mkdirs();
-      FsVolumeImpl volume = new FsVolumeImplBuilder()
-          .setConf(conf)
-          .setDataset(dataset)
-          .setStorageID("storage-id")
-          .setStorageDirectory(
-              new StorageDirectory(StorageLocation.parse(curDir.getPath())))
-          .build();
-      volumes.add(volume);
-      volumeList.addVolume(volume.obtainReference());
-    }
-
-    // Close the 2nd volume.
-    volumes.get(1).setClosed();
-    try {
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          return volumes.get(1).checkClosed();
-        }
-      }, 100, 3000);
-    } catch (TimeoutException e) {
-      fail("timed out while waiting for volume to be removed.");
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-    }
-    // checkDirs() should ignore the 2nd volume since it is closed.
-    volumeList.checkDirs();
-  }
-
-  @Test(timeout=30000)
   public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
     FsVolumeList volumeList = new FsVolumeList(
         Collections.<VolumeFailureInfo>emptyList(), null, blockChooser);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message