hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [4/5] hadoop git commit: HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao.
Date Tue, 17 Jan 2017 06:14:28 GMT
HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eafaddca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eafaddca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eafaddca

Branch: refs/heads/branch-2
Commit: eafaddca1a69dd02c6e72f0b21b66f58b0c18dab
Parents: ec80de3
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Wed Dec 28 22:08:13 2016 -0800
Committer: Arpit Agarwal <arp@apache.org>
Committed: Mon Jan 16 22:14:04 2017 -0800

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockReceiver.java     |  12 +-
 .../server/datanode/CountingFileIoEvents.java   |   3 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  95 +++++++++-----
 .../server/datanode/DefaultFileIoEvents.java    |   2 +-
 .../hdfs/server/datanode/DirectoryScanner.java  |   2 +-
 .../hdfs/server/datanode/FileIoEvents.java      |  36 ++++--
 .../hdfs/server/datanode/FileIoProvider.java    |  89 +++++++------
 .../server/datanode/ProfilingFileIoEvents.java  |   2 +-
 .../hdfs/server/datanode/ReplicaInfo.java       |   2 +-
 .../server/datanode/checker/AsyncChecker.java   |   5 +-
 .../datanode/checker/DatasetVolumeChecker.java  |  72 +++++++----
 .../checker/StorageLocationChecker.java         |   8 +-
 .../datanode/checker/ThrottledAsyncChecker.java |  19 ++-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |   5 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  13 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  39 +++---
 .../server/datanode/SimulatedFSDataset.java     |   2 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    |   2 +-
 .../TestDataNodeVolumeFailureReporting.java     |  26 ++--
 .../checker/TestDatasetVolumeChecker.java       |  52 +++++---
 .../TestDatasetVolumeCheckerFailures.java       |  23 ----
 .../checker/TestThrottledAsyncChecker.java      | 129 +++++++++++--------
 .../fsdataset/impl/TestFsDatasetImpl.java       |   2 +-
 23 files changed, 376 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 7fdffb4..517a709 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -276,10 +276,9 @@ class BlockReceiver implements Closeable {
       IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
       DataNode.LOG.warn("IOException in BlockReceiver constructor"
           + (cause == null ? "" : ". Cause is "), cause);
-      
-      if (cause != null) { // possible disk error
+      if (cause != null) {
         ioe = cause;
-        datanode.checkDiskErrorAsync();
+        // Volume error check moved to FileIoProvider
       }
       
       throw ioe;
@@ -361,9 +360,8 @@ class BlockReceiver implements Closeable {
     if (measuredFlushTime) {
       datanode.metrics.addFlushNanos(flushTotalNanos);
     }
-    // disk check
     if(ioe != null) {
-      datanode.checkDiskErrorAsync();
+      // Volume error check moved to FileIoProvider
       throw ioe;
     }
   }
@@ -786,7 +784,7 @@ class BlockReceiver implements Closeable {
           manageWriterOsCache(offsetInBlock);
         }
       } catch (IOException iex) {
-        datanode.checkDiskErrorAsync();
+        // Volume error check moved to FileIoProvider
         throw iex;
       }
     }
@@ -1395,7 +1393,7 @@ class BlockReceiver implements Closeable {
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
-            datanode.checkDiskErrorAsync();
+            // Volume error check moved to FileIoProvider
             LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
index a70c151..7c6bfd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class CountingFileIoEvents implements FileIoEvents {
+public class CountingFileIoEvents extends FileIoEvents {
   private final Map<OPERATION, Counts> counts;
 
   private static class Counts {
@@ -90,7 +90,6 @@ public class CountingFileIoEvents implements FileIoEvents {
   public void onFailure(
       @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
     counts.get(op).failures.incrementAndGet();
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bddde1f..dc7d267 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -371,6 +371,7 @@ public class DataNode extends ReconfigurableBase
   SaslDataTransferServer saslServer;
   private final boolean getHdfsBlockLocationsEnabled;
   private ObjectName dataNodeInfoBeanName;
+  // Test verification only
   private volatile long lastDiskErrorCheck;
   private String supergroup;
   private boolean isPermissionEnabled;
@@ -408,7 +409,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
-    this.fileIoProvider = new FileIoProvider(conf);
+    this.fileIoProvider = new FileIoProvider(conf, this);
     this.fileDescriptorPassingDisabledReason = null;
     this.maxNumberOfBlocksToLog = 0;
     this.confVersion = null;
@@ -433,7 +434,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
-    this.fileIoProvider = new FileIoProvider(conf);
+    this.fileIoProvider = new FileIoProvider(conf, this);
     this.blockScanner = new BlockScanner(this);
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -783,7 +784,7 @@ public class DataNode extends ReconfigurableBase
 
   /**
    * Remove volumes from DataNode.
-   * See {@link #removeVolumes(Set, boolean)} for details.
+   * See {@link #removeVolumes(Collection, boolean)} for details.
    *
    * @param locations the StorageLocations of the volumes to be removed.
    * @throws IOException
@@ -810,7 +811,7 @@ public class DataNode extends ReconfigurableBase
    *   <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent
    *   active volumes.</ul>
    * </li>
-   * @param absoluteVolumePaths the absolute path of volumes.
+   * @param storageLocations the absolute path of volumes.
    * @param clearFailure if true, clears the failure information related to the
    *                     volumes.
    * @throws IOException
@@ -1258,7 +1259,7 @@ public class DataNode extends ReconfigurableBase
    * If conf's CONFIG_PROPERTY_SIMULATED property is set
    * then a simulated storage based data node is created.
    *
-   * @param dataDirs - only for a non-simulated storage data node
+   * @param dataDirectories - only for a non-simulated storage data node
    * @throws IOException
    */
   void startDataNode(List<StorageLocation> dataDirectories,
@@ -2020,16 +2021,39 @@ public class DataNode extends ReconfigurableBase
     tracer.close();
   }
 
-
   /**
-   * Check if there is a disk failure asynchronously and if so, handle the error
+   * Check if there is a disk failure asynchronously
+   * and if so, handle the error.
    */
+  @VisibleForTesting
   public void checkDiskErrorAsync() {
     volumeChecker.checkAllVolumesAsync(
         data, new DatasetVolumeChecker.Callback() {
           @Override
           public void call(Set<FsVolumeSpi> healthyVolumes,
                            Set<FsVolumeSpi> failedVolumes) {
+              if (failedVolumes.size() > 0) {
+               LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
+                   failedVolumes.size(), failedVolumes);
+              } else {
+               LOG.debug("checkDiskErrorAsync: no volume failures detected");
+              }
+              lastDiskErrorCheck = Time.monotonicNow();
+              handleVolumeFailures(failedVolumes);
+          }
+        });
+  }
+
+  /**
+   * Check if there is a disk failure asynchronously
+   * and if so, handle the error.
+   */
+  public void checkDiskErrorAsync(FsVolumeSpi volume) {
+    volumeChecker.checkVolume(
+        volume, new DatasetVolumeChecker.Callback() {
+          @Override
+          public void call(Set<FsVolumeSpi> healthyVolumes,
+                           Set<FsVolumeSpi> failedVolumes) {
             if (failedVolumes.size() > 0) {
               LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
                   failedVolumes.size(), failedVolumes);
@@ -2037,14 +2061,15 @@ public class DataNode extends ReconfigurableBase
               LOG.debug("checkDiskErrorAsync: no volume failures detected");
             }
             lastDiskErrorCheck = Time.monotonicNow();
-            DataNode.this.handleVolumeFailures(failedVolumes);
+            handleVolumeFailures(failedVolumes);
           }
         });
   }
 
-  private void handleDiskError(String errMsgr) {
+  private void handleDiskError(String failedVolumes) {
     final boolean hasEnoughResources = data.hasEnoughResource();
-    LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
+    LOG.warn("DataNode.handleDiskError on : [" + failedVolumes +
+        "] Keep Running: " + hasEnoughResources);
 
     // If we have enough active valid volumes then we do not want to
     // shutdown the DN completely.
@@ -2054,7 +2079,7 @@ public class DataNode extends ReconfigurableBase
 
     //inform NameNodes
     for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
-      bpos.trySendErrorReport(dpError, errMsgr);
+      bpos.trySendErrorReport(dpError, failedVolumes);
     }
 
     if(hasEnoughResources) {
@@ -2062,7 +2087,8 @@ public class DataNode extends ReconfigurableBase
       return; // do not shutdown
     }
 
-    LOG.warn("DataNode is shutting down: " + errMsgr);
+    LOG.warn("DataNode is shutting down due to failed volumes: ["
+        + failedVolumes + "]");
     shouldRun = false;
   }
 
@@ -2412,8 +2438,11 @@ public class DataNode extends ReconfigurableBase
         }
         LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
             targets[0] + " got ", ie);
-        // check if there are any disk problem
-        checkDiskErrorAsync();
+        // disk check moved to FileIoProvider
+        IOException cause = DatanodeUtil.getCauseIfDiskError(ie);
+        if (cause != null) { // possible disk error
+          LOG.warn("IOException in DataTransfer#run(). Cause is ", cause);
+        }
       } finally {
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
@@ -3167,28 +3196,36 @@ public class DataNode extends ReconfigurableBase
   }
 
   private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
+    if (unhealthyVolumes.isEmpty()) {
+      LOG.debug("handleVolumeFailures done with empty " +
+          "unhealthyVolumes");
+      return;
+    }
+
     data.handleVolumeFailures(unhealthyVolumes);
     final Set<File> unhealthyDirs = new HashSet<>(unhealthyVolumes.size());
 
-    if (!unhealthyVolumes.isEmpty()) {
-      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
-      for (FsVolumeSpi vol : unhealthyVolumes) {
-        unhealthyDirs.add(new File(vol.getBasePath()).getAbsoluteFile());
-        sb.append(vol).append(";");
-      }
+    StringBuilder sb = new StringBuilder("DataNode failed volumes:");
+    for (FsVolumeSpi vol : unhealthyVolumes) {
+      unhealthyDirs.add(new File(vol.getBasePath()).getAbsoluteFile());
+      sb.append(vol).append(";");
+    }
 
-      try {
-        // Remove all unhealthy volumes from DataNode.
-        removeVolumes(unhealthyDirs, false);
-      } catch (IOException e) {
-        LOG.warn("Error occurred when removing unhealthy storage dirs: "
-            + e.getMessage(), e);
-      }
-      LOG.info(sb.toString());
-      handleDiskError(sb.toString());
+    try {
+      // Remove all unhealthy volumes from DataNode.
+      removeVolumes(unhealthyDirs, false);
+    } catch (IOException e) {
+      LOG.warn("Error occurred when removing unhealthy storage dirs: "
+          + e.getMessage(), e);
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sb.toString());
+    }
+      // send blockreport regarding volume failure
+    handleDiskError(sb.toString());
   }
 
+  @VisibleForTesting
   public long getLastDiskErrorCheck() {
     return lastDiskErrorCheck;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
index bd4932b..6a12aae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public final class DefaultFileIoEvents implements FileIoEvents {
+public final class DefaultFileIoEvents extends FileIoEvents {
   @Override
   public long beforeMetadataOp(
       @Nullable FsVolumeSpi volume, OPERATION op) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index d9a6749..15208c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -855,7 +855,7 @@ public class DirectoryScanner implements Runnable {
       } catch (IOException ioe) {
         LOG.warn("Exception occured while compiling report: ", ioe);
         // Initiate a check on disk failure.
-        datanode.checkDiskErrorAsync();
+        datanode.checkDiskErrorAsync(volume);
         // Ignore this directory and proceed.
         return report;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
index 48e703f..10f2a0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public interface FileIoEvents {
+public abstract class FileIoEvents {
 
   /**
    * Invoked before a filesystem metadata operation.
@@ -42,7 +42,7 @@ public interface FileIoEvents {
    * @return  timestamp at which the operation was started. 0 if
    *          unavailable.
    */
-  long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
+  abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
 
   /**
    * Invoked after a filesystem metadata operation has completed.
@@ -52,7 +52,8 @@ public interface FileIoEvents {
    * @param begin  timestamp at which the operation was started. 0
    *               if unavailable.
    */
-  void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
+  abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op,
+                                long begin);
 
   /**
    * Invoked before a read/write/flush/channel transfer operation.
@@ -63,7 +64,8 @@ public interface FileIoEvents {
    * @return  timestamp at which the operation was started. 0 if
    *          unavailable.
    */
-  long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
+  abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+                             long len);
 
 
   /**
@@ -76,22 +78,38 @@ public interface FileIoEvents {
    * @return  timestamp at which the operation was started. 0 if
    *          unavailable.
    */
-  void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
-                   long begin, long len);
+  abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+                            long begin, long len);
 
   /**
    * Invoked if an operation fails with an exception.
-   *  @param volume  target volume for the operation. Null if unavailable.
+   * @param volume  target volume for the operation. Null if unavailable.
    * @param op  type of operation.
    * @param e  Exception encountered during the operation.
    * @param begin  time at which the operation was started.
    */
-  void onFailure(
+  abstract void onFailure(
       @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
 
   /**
+   * Invoked by FileIoProvider if an operation fails with an exception.
+   * @param datanode datanode that runs volume check upon volume io failure
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param e  Exception encountered during the operation.
+   * @param begin  time at which the operation was started.
+   */
+  void onFailure(DataNode datanode,
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+    onFailure(volume, op, e, begin);
+    if (datanode != null && volume != null) {
+      datanode.checkDiskErrorAsync(volume);
+    }
+  }
+
+  /**
    * Return statistics as a JSON string.
    * @return
    */
-  @Nullable String getStatistics();
+  @Nullable abstract String getStatistics();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
index df7718e..4a71179 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
@@ -79,12 +79,16 @@ public class FileIoProvider {
       FileIoProvider.class);
 
   private final FileIoEvents eventHooks;
+  private final DataNode datanode;
 
   /**
    * @param conf  Configuration object. May be null. When null,
    *              the event handlers are no-ops.
+   * @param datanode datanode that owns this FileIoProvider. Used for
+   *               IO error based volume checker callback
    */
-  public FileIoProvider(@Nullable Configuration conf) {
+  public FileIoProvider(@Nullable Configuration conf,
+                        final DataNode datanode) {
     if (conf != null) {
       final Class<? extends FileIoEvents> clazz = conf.getClass(
           DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
@@ -94,6 +98,7 @@ public class FileIoProvider {
     } else {
       eventHooks = new DefaultFileIoEvents();
     }
+    this.datanode = datanode;
   }
 
   /**
@@ -139,7 +144,7 @@ public class FileIoProvider {
       f.flush();
       eventHooks.afterFileIo(volume, FLUSH, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, FLUSH, e, begin);
+      eventHooks.onFailure(datanode, volume, FLUSH, e, begin);
       throw e;
     }
   }
@@ -157,7 +162,7 @@ public class FileIoProvider {
       fos.getChannel().force(true);
       eventHooks.afterFileIo(volume, SYNC, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, SYNC, e, begin);
+      eventHooks.onFailure(datanode, volume, SYNC, e, begin);
       throw e;
     }
   }
@@ -176,7 +181,7 @@ public class FileIoProvider {
       NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
       eventHooks.afterFileIo(volume, SYNC, begin, 0);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, SYNC, e, begin);
+      eventHooks.onFailure(datanode, volume, SYNC, e, begin);
       throw e;
     }
   }
@@ -196,7 +201,7 @@ public class FileIoProvider {
           identifier, outFd, offset, length, flags);
       eventHooks.afterMetadataOp(volume, FADVISE, begin);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, FADVISE, e, begin);
+      eventHooks.onFailure(datanode, volume, FADVISE, e, begin);
       throw e;
     }
   }
@@ -214,7 +219,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, DELETE, begin);
       return deleted;
     } catch (Exception e) {
-      eventHooks.onFailure(volume, DELETE, e, begin);
+      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
       throw e;
     }
   }
@@ -236,7 +241,7 @@ public class FileIoProvider {
       }
       return deleted;
     } catch (Exception e) {
-      eventHooks.onFailure(volume, DELETE, e, begin);
+      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
       throw e;
     }
   }
@@ -264,7 +269,7 @@ public class FileIoProvider {
           waitTime, transferTime);
       eventHooks.afterFileIo(volume, TRANSFER, begin, count);
     } catch (Exception e) {
-      eventHooks.onFailure(volume, TRANSFER, e, begin);
+      eventHooks.onFailure(datanode, volume, TRANSFER, e, begin);
       throw e;
     }
   }
@@ -285,7 +290,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, OPEN, begin);
       return created;
     } catch (Exception e) {
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -312,7 +317,7 @@ public class FileIoProvider {
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -328,7 +333,7 @@ public class FileIoProvider {
    * @param f  File object.
    * @param append  if true, then bytes will be written to the end of the
    *                file rather than the beginning.
-   * @param  FileOutputStream to the given file object.
+   * @return  FileOutputStream to the given file object.
    * @throws FileNotFoundException
    */
   public FileOutputStream getFileOutputStream(
@@ -342,7 +347,7 @@ public class FileIoProvider {
       return fos;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fos);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -372,7 +377,7 @@ public class FileIoProvider {
    * before delegating to the wrapped stream.
    *
    * @param volume  target volume. null if unavailable.
-   * @param f  File object.
+   * @param fd  File descriptor object.
    * @return  FileOutputStream to the given file object.
    * @throws  FileNotFoundException
    */
@@ -407,7 +412,7 @@ public class FileIoProvider {
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -438,7 +443,7 @@ public class FileIoProvider {
       return fis;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(fis);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -468,7 +473,7 @@ public class FileIoProvider {
       return raf;
     } catch(Exception e) {
       org.apache.commons.io.IOUtils.closeQuietly(raf);
-      eventHooks.onFailure(volume, OPEN, e, begin);
+      eventHooks.onFailure(datanode, volume, OPEN, e, begin);
       throw e;
     }
   }
@@ -487,7 +492,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, DELETE, begin);
       return deleted;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, DELETE, e, begin);
+      eventHooks.onFailure(datanode, volume, DELETE, e, begin);
       throw e;
     }
   }
@@ -508,7 +513,7 @@ public class FileIoProvider {
       FileUtil.replaceFile(src, target);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -530,7 +535,7 @@ public class FileIoProvider {
       Storage.rename(src, target);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -552,7 +557,7 @@ public class FileIoProvider {
       FileUtils.moveFile(src, target);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -576,7 +581,7 @@ public class FileIoProvider {
       Files.move(src, target, options);
       eventHooks.afterMetadataOp(volume, MOVE, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MOVE, e, begin);
+      eventHooks.onFailure(datanode, volume, MOVE, e, begin);
       throw e;
     }
   }
@@ -625,7 +630,7 @@ public class FileIoProvider {
       Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
       eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, NATIVE_COPY, e, begin);
+      eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin);
       throw e;
     }
   }
@@ -650,7 +655,7 @@ public class FileIoProvider {
       isDirectory = !created && dir.isDirectory();
       eventHooks.afterMetadataOp(volume, MKDIRS, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
       throw e;
     }
 
@@ -676,7 +681,7 @@ public class FileIoProvider {
       succeeded = dir.isDirectory() || dir.mkdirs();
       eventHooks.afterMetadataOp(volume, MKDIRS, begin);
     } catch(Exception e) {
-      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      eventHooks.onFailure(datanode, volume, MKDIRS, e, begin);
       throw e;
     }
 
@@ -702,7 +707,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -712,7 +717,7 @@ public class FileIoProvider {
    * {@link FileUtil#listFiles(File)}.
    *
    * @param volume  target volume. null if unavailable.
-   * @param   Driectory to be listed.
+   * @param   dir directory to be listed.
    * @return  array of strings representing the directory entries.
    * @throws IOException
    */
@@ -724,7 +729,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -747,7 +752,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return children;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -769,7 +774,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, LIST, begin);
       return count;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, LIST, e, begin);
+      eventHooks.onFailure(datanode, volume, LIST, e, begin);
       throw e;
     }
   }
@@ -788,7 +793,7 @@ public class FileIoProvider {
       eventHooks.afterMetadataOp(volume, EXISTS, begin);
       return exists;
     } catch(Exception e) {
-      eventHooks.onFailure(volume, EXISTS, e, begin);
+      eventHooks.onFailure(datanode, volume, EXISTS, e, begin);
       throw e;
     }
   }
@@ -829,7 +834,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, 1);
         return b;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -845,7 +850,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -861,7 +866,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -903,7 +908,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, 1);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -918,7 +923,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, b.length);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -933,7 +938,7 @@ public class FileIoProvider {
         super.write(b, off, len);
         eventHooks.afterFileIo(volume, WRITE, begin, len);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -961,7 +966,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, 1);
         return b;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -974,7 +979,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -987,7 +992,7 @@ public class FileIoProvider {
         eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
         return numBytesRead;
       } catch(Exception e) {
-        eventHooks.onFailure(volume, READ, e, begin);
+        eventHooks.onFailure(datanode, volume, READ, e, begin);
         throw e;
       }
     }
@@ -999,7 +1004,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, 1);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -1011,7 +1016,7 @@ public class FileIoProvider {
         super.write(b);
         eventHooks.afterFileIo(volume, WRITE, begin, b.length);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }
@@ -1023,7 +1028,7 @@ public class FileIoProvider {
         super.write(b, off, len);
         eventHooks.afterFileIo(volume, WRITE, begin, len);
       } catch(Exception e) {
-        eventHooks.onFailure(volume, WRITE, e, begin);
+        eventHooks.onFailure(datanode, volume, WRITE, e, begin);
         throw e;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
index 5835fe8..affd093 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
  * related operations on datanode volumes.
  */
 @InterfaceAudience.Private
-class ProfilingFileIoEvents implements FileIoEvents {
+class ProfilingFileIoEvents extends FileIoEvents {
 
   @Override
   public long beforeMetadataOp(@Nullable FsVolumeSpi volume,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 47ca1f1..9817f97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -64,7 +64,7 @@ abstract public class ReplicaInfo extends Block
 
   /** This is used by some tests and FsDatasetUtil#computeChecksum. */
   private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
-      new FileIoProvider(null);
+      new FileIoProvider(null, null);
 
   /**
    * Constructor

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
index 1d534a3..997c0cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -43,10 +44,10 @@ public interface AsyncChecker<K, V> {
    * @param context the interpretation of the context depends on the
    *                target.
    *
-   * @return returns a {@link ListenableFuture} that can be used to
+   * @return returns a {@link Optional of ListenableFuture} that can be used to
    *         retrieve the result of the asynchronous check.
    */
-  ListenableFuture<V> schedule(Checkable<K, V> target, K context);
+  Optional<ListenableFuture<V>> schedule(Checkable<K, V> target, K context);
 
   /**
    * Cancel all executing checks and wait for them to complete.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index 8f346dc..5ef3eec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
@@ -191,18 +192,26 @@ public class DatasetVolumeChecker {
 
     for (int i = 0; i < references.size(); ++i) {
       final FsVolumeReference reference = references.getReference(i);
-      allVolumes.add(reference.getVolume());
-      ListenableFuture<VolumeCheckResult> future =
+      Optional<ListenableFuture<VolumeCheckResult>> olf =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       LOG.info("Scheduled health check for volume {}", reference.getVolume());
-      Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, numVolumes, new Callback() {
-        @Override
-        public void call(Set<FsVolumeSpi> ignored1,
-                         Set<FsVolumeSpi> ignored2) {
+      if (olf.isPresent()) {
+        allVolumes.add(reference.getVolume());
+        Futures.addCallback(olf.get(),
+            new ResultHandler(reference, healthyVolumes, failedVolumes,
+                numVolumes, new Callback() {
+              @Override
+              public void call(Set<FsVolumeSpi> ignored1,
+                               Set<FsVolumeSpi> ignored2) {
+                latch.countDown();
+              }
+            }));
+      } else {
+        IOUtils.cleanup(null, reference);
+        if (numVolumes.decrementAndGet() == 0) {
           latch.countDown();
         }
-      }));
+      }
     }
 
     // Wait until our timeout elapses, after which we give up on
@@ -263,18 +272,26 @@ public class DatasetVolumeChecker {
     final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
     final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
     final AtomicLong numVolumes = new AtomicLong(references.size());
+    boolean added = false;
 
     LOG.info("Checking {} volumes", references.size());
     for (int i = 0; i < references.size(); ++i) {
       final FsVolumeReference reference = references.getReference(i);
       // The context parameter is currently ignored.
-      ListenableFuture<VolumeCheckResult> future =
+      Optional<ListenableFuture<VolumeCheckResult>> olf =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
-      Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, numVolumes, callback));
+      if (olf.isPresent()) {
+        added = true;
+        Futures.addCallback(olf.get(),
+            new ResultHandler(reference, healthyVolumes, failedVolumes,
+                numVolumes, callback));
+      } else {
+        IOUtils.cleanup(null, reference);
+        numVolumes.decrementAndGet();
+      }
     }
     numAsyncDatasetChecks.incrementAndGet();
-    return true;
+    return added;
   }
 
   /**
@@ -291,7 +308,7 @@ public class DatasetVolumeChecker {
   }
 
   /**
-   * Check a single volume, returning a {@link ListenableFuture}
+   * Check a single volume asynchronously, returning a {@link ListenableFuture}
    * that can be used to retrieve the final result.
    *
    * If the volume cannot be referenced then it is already closed and
@@ -305,21 +322,32 @@ public class DatasetVolumeChecker {
   public boolean checkVolume(
       final FsVolumeSpi volume,
       Callback callback) {
+    if (volume == null) {
+      LOG.debug("Cannot schedule check on null volume");
+      return false;
+    }
+
     FsVolumeReference volumeReference;
     try {
       volumeReference = volume.obtainReference();
     } catch (ClosedChannelException e) {
       // The volume has already been closed.
-      callback.call(new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>());
       return false;
     }
-    ListenableFuture<VolumeCheckResult> future =
+
+    Optional<ListenableFuture<VolumeCheckResult>> olf =
         delegateChecker.schedule(volume, IGNORED_CONTEXT);
-    numVolumeChecks.incrementAndGet();
-    Futures.addCallback(future, new ResultHandler(
-        volumeReference, new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>(),
-        new AtomicLong(1), callback));
-    return true;
+    if (olf.isPresent()) {
+      numVolumeChecks.incrementAndGet();
+      Futures.addCallback(olf.get(),
+          new ResultHandler(volumeReference, new HashSet<FsVolumeSpi>(),
+          new HashSet<FsVolumeSpi>(),
+          new AtomicLong(1), callback));
+      return true;
+    } else {
+      IOUtils.cleanup(null, volumeReference);
+    }
+    return false;
   }
 
   /**
@@ -343,8 +371,8 @@ public class DatasetVolumeChecker {
      *                       successful, add the volume here.
      * @param failedVolumes set of failed volumes. If the disk check fails,
      *                      add the volume here.
-     * @param semaphore semaphore used to trigger callback invocation.
-     * @param callback invoked when the semaphore can be successfully acquired.
+     * @param volumeCounter volumeCounter used to trigger callback invocation.
+     * @param callback invoked when the volumeCounter reaches 0.
      */
     ResultHandler(FsVolumeReference reference,
                   Set<FsVolumeSpi> healthyVolumes,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
index 7337ad0..6e323e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -158,8 +159,11 @@ public class StorageLocationChecker {
     // Start parallel disk check operations on all StorageLocations.
     for (StorageLocation location : dataDirs) {
       goodLocations.put(location, true);
-      futures.put(location,
-          delegateChecker.schedule(location, context));
+      Optional<ListenableFuture<VolumeCheckResult>> olf =
+          delegateChecker.schedule(location, context);
+      if (olf.isPresent()) {
+        futures.put(location, olf.get());
+      }
     }
 
     if (maxVolumeFailuresTolerated >= dataDirs.size()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
index 89db88e..ad56c61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -101,13 +102,11 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
    * will receive the same Future.
    */
   @Override
-  public synchronized ListenableFuture<V> schedule(
-      final Checkable<K, V> target,
-      final K context) {
-    LOG.debug("Scheduling a check of {}", target);
-
+  public Optional<ListenableFuture<V>> schedule(
+      final Checkable<K, V> target, final K context) {
+    LOG.info("Scheduling a check for {}", target);
     if (checksInProgress.containsKey(target)) {
-      return checksInProgress.get(target);
+      return Optional.absent();
     }
 
     if (completedChecks.containsKey(target)) {
@@ -115,11 +114,9 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
       final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
       if (msSinceLastCheck < minMsBetweenChecks) {
         LOG.debug("Skipped checking {}. Time since last check {}ms " +
-            "is less than the min gap {}ms.",
+                "is less than the min gap {}ms.",
             target, msSinceLastCheck, minMsBetweenChecks);
-        return result.result != null ?
-            Futures.immediateFuture(result.result) :
-            Futures.<V>immediateFailedFuture(result.exception);
+        return Optional.absent();
       }
     }
 
@@ -132,7 +129,7 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
         });
     checksInProgress.put(target, lf);
     addResultCachingCallback(target, lf);
-    return lf;
+    return Optional.of(lf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index cd44557..31cf39f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -277,7 +277,10 @@ class BlockPoolSlice {
           fileIoProvider.getFileOutputStream(volume, outFile), "UTF-8")) {
         // mtime is written last, so that truncated writes won't be valid.
         out.write(Long.toString(used) + " " + Long.toString(timer.now()));
-        fileIoProvider.flush(volume, out);
+        // This is only called as part of the volume shutdown.
+        // We explicitly avoid calling flush with fileIoProvider which triggers
+        // volume check upon io exception to avoid cyclic volume checks.
+        out.flush();
       }
     } catch (IOException ioe) {
       // If write failed, the volume might be bad. Since the cache file is

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e37934e..cc1d0c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1974,17 +1974,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   File validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
-    final File f;
+    File f = null;
+    ReplicaInfo info;
     try(AutoCloseableLock lock = datasetLock.acquire()) {
-      f = getFile(bpid, blockId, false);
+      info = volumeMap.get(bpid, blockId);
+      if (info != null) {
+        f = info.getBlockFile();
+      }
     }
 
     if(f != null ) {
-      if(f.exists())
+      if(f.exists()) {
         return f;
+      }
 
       // if file is not null, but doesn't exist - possibly disk failed
-      datanode.checkDiskErrorAsync();
+      datanode.checkDiskErrorAsync(info.getVolume());
     }
 
     if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index b1ce931..71d93ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -79,7 +79,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * The underlying volume used to store replica.
- * 
+ *
  * It uses the {@link FsDatasetImpl} object for synchronization.
  */
 @InterfaceAudience.Private
@@ -98,7 +98,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   private final Map<String, BlockPoolSlice> bpSlices
       = new ConcurrentHashMap<String, BlockPoolSlice>();
   private final File currentDir;    // <StorageDirectory>/current
-  private final DF usage;           
+  private final DF usage;
   private final long reserved;
   private CloseableReferenceCount reference = new CloseableReferenceCount();
 
@@ -120,7 +120,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
    * contention.
    */
   protected ThreadPoolExecutor cacheExecutor;
-  
+
   FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
       Configuration conf, StorageType storageType) throws IOException {
     this.dataset = dataset;
@@ -137,7 +137,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.configuredCapacity = -1;
     // dataset.datanode may be null in some tests.
     this.fileIoProvider = dataset.datanode != null ?
-        dataset.datanode.getFileIoProvider() : new FileIoProvider(conf);
+        dataset.datanode.getFileIoProvider() :
+        new FileIoProvider(conf, dataset.datanode);
     cacheExecutor = initializeCacheExecutor(parent);
     this.metrics = DataNodeVolumeMetrics.create(conf, parent.getAbsolutePath());
   }
@@ -288,7 +289,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   File getCurrentDir() {
     return currentDir;
   }
-  
+
   File getRbwDir(String bpid) throws IOException {
     return getBlockPoolSlice(bpid).getRbwDir();
   }
@@ -358,11 +359,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
   long getBlockPoolUsed(String bpid) throws IOException {
     return getBlockPoolSlice(bpid).getDfsUsed();
   }
-  
+
   /**
    * Return either the configured capacity of the file system if configured; or
    * the capacity of the file system excluding space reserved for non-HDFS.
-   * 
+   *
    * @return the unreserved number of bytes left in this filesystem. May be
    *         zero.
    */
@@ -389,7 +390,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   /*
    * Calculate the available space of the filesystem, excluding space reserved
    * for non-HDFS and space reserved for RBW
-   * 
+   *
    * @return the available number of bytes left in this filesystem. May be zero.
    */
   @Override
@@ -460,7 +461,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   public String getBasePath() {
     return currentDir.getParent();
   }
-  
+
   @Override
   public boolean isTransientStorage() {
     return storageType.isTransient();
@@ -481,9 +482,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
    */
   @Override
   public String[] getBlockPoolList() {
-    return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);   
+    return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
   }
-    
+
   /**
    * Temporary files. They get moved to the finalized block directory when
    * the block is finalized.
@@ -692,7 +693,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
         LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
             storageID, bpid, dir.getAbsolutePath());
       } else {
-        LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}", 
+        LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
             storageID, bpid, entries.size(), dir.getAbsolutePath());
       }
       cache = entries;
@@ -910,7 +911,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
     return VolumeCheckResult.HEALTHY;
   }
-    
+
   void getVolumeMap(ReplicaMap volumeMap,
                     final RamDiskReplicaTracker ramDiskReplicaMap)
       throws IOException {
@@ -918,7 +919,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
       s.getVolumeMap(volumeMap, ramDiskReplicaMap);
     }
   }
-  
+
   void getVolumeMap(String bpid, ReplicaMap volumeMap,
                     final RamDiskReplicaTracker ramDiskReplicaMap)
       throws IOException {
@@ -966,7 +967,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
     bpSlices.put(bpid, bp);
   }
-  
+
   void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
     BlockPoolSlice bp = bpSlices.get(bpid);
     if (bp != null) {
@@ -992,7 +993,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
     return true;
   }
-  
+
   void deleteBPDirectories(String bpid, boolean force) throws IOException {
     File volumeCurrentDir = this.getCurrentDir();
     File bpDir = new File(volumeCurrentDir, bpid);
@@ -1000,7 +1001,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
       // nothing to be deleted
       return;
     }
-    File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); 
+    File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
     File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
     File finalizedDir = new File(bpCurrentDir,
         DataStorage.STORAGE_DIR_FINALIZED);
@@ -1049,12 +1050,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
   public String getStorageID() {
     return storageID;
   }
-  
+
   @Override
   public StorageType getStorageType() {
     return storageType;
   }
-  
+
   DatanodeStorage toDatanodeStorage() {
     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index db5042b..dd2ca99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -589,7 +589,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
 
     registerMBean(datanodeUuid);
-    this.fileIoProvider = new FileIoProvider(conf);
+    this.fileIoProvider = new FileIoProvider(conf, datanode);
     this.storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
         conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 25f4d5a..600769b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -942,7 +942,7 @@ public class TestDataNodeHotSwapVolumes {
     DataNodeTestUtils.injectDataDirFailure(dirToFail);
     // Call and wait DataNode to detect disk failure.
     long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
-    dn.checkDiskErrorAsync();
+    dn.checkDiskErrorAsync(failedVolume);
     while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
       Thread.sleep(100);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
index 23ec8a7..79a52bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
@@ -76,7 +76,7 @@ public class TestDataNodeVolumeFailureReporting {
   final int WAIT_FOR_HEARTBEATS = 3000;
 
   // Wait at least (2 * re-check + 10 * heartbeat) seconds for
-  // a datanode to be considered dead by the namenode.  
+  // a datanode to be considered dead by the namenode.
   final int WAIT_FOR_DEATH = 15000;
 
   @Before
@@ -158,7 +158,7 @@ public class TestDataNodeVolumeFailureReporting {
     assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
 
     // Eventually the NN should report two volume failures
-    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, 
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
     checkAggregateFailuresAtNameNode(true, 2);
     checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@@ -185,7 +185,7 @@ public class TestDataNodeVolumeFailureReporting {
      * did not grow or shrink the data volume while the test was running).
      */
     dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
-    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3, 
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3,
         origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS);
     checkAggregateFailuresAtNameNode(true, 3);
     checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@@ -211,7 +211,7 @@ public class TestDataNodeVolumeFailureReporting {
         dn3Vol2.getAbsolutePath());
 
     // The NN considers the DN dead
-    DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2, 
+    DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2,
         origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
     checkAggregateFailuresAtNameNode(true, 2);
     checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@@ -236,7 +236,7 @@ public class TestDataNodeVolumeFailureReporting {
      * and that the volume failure count should be reported as zero by
      * both the metrics and the NN.
      */
-    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity, 
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity,
         WAIT_FOR_HEARTBEATS);
     checkAggregateFailuresAtNameNode(true, 0);
     dns = cluster.getDataNodes();
@@ -259,8 +259,8 @@ public class TestDataNodeVolumeFailureReporting {
     long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
     long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
 
-    // Fail the first volume on both datanodes (we have to keep the 
-    // third healthy so one node in the pipeline will not fail). 
+    // Fail the first volume on both datanodes (we have to keep the
+    // third healthy so one node in the pipeline will not fail).
     File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
     File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
     DataNodeTestUtils.injectDataDirFailure(dn1Vol1, dn2Vol1);
@@ -271,7 +271,7 @@ public class TestDataNodeVolumeFailureReporting {
     ArrayList<DataNode> dns = cluster.getDataNodes();
 
     // The NN reports two volumes failures
-    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, 
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
     checkAggregateFailuresAtNameNode(true, 2);
     checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
@@ -318,6 +318,12 @@ public class TestDataNodeVolumeFailureReporting {
     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
     DFSTestUtil.waitReplication(fs, file1, (short)3);
 
+    // Create additional file to trigger failure based volume check on dn1Vol2
+    // and dn2Vol2.
+    Path file2 = new Path("/test2");
+    DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
+    DFSTestUtil.waitReplication(fs, file2, (short)3);
+
     ArrayList<DataNode> dns = cluster.getDataNodes();
     assertTrue("DN1 should be up", dns.get(0).isDatanodeUp());
     assertTrue("DN2 should be up", dns.get(1).isDatanodeUp());
@@ -536,8 +542,6 @@ public class TestDataNodeVolumeFailureReporting {
   private void checkFailuresAtDataNode(DataNode dn,
       long expectedVolumeFailuresCounter, boolean expectCapacityKnown,
       String... expectedFailedVolumes) throws Exception {
-    assertCounter("VolumeFailures", expectedVolumeFailuresCounter,
-        getMetrics(dn.getMetrics().name()));
     FsDatasetSpi<?> fsd = dn.getFSDataset();
     assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes());
     assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
index 940e73b..d6b4af9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -103,24 +104,28 @@ public class TestDatasetVolumeChecker {
     /**
      * Request a check and ensure it triggered {@link FsVolumeSpi#check}.
      */
-    checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
-      @Override
-      public void call(Set<FsVolumeSpi> healthyVolumes,
-                       Set<FsVolumeSpi> failedVolumes) {
-        numCallbackInvocations.incrementAndGet();
-        if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
-          assertThat(healthyVolumes.size(), is(1));
-          assertThat(failedVolumes.size(), is(0));
-        } else {
-          assertThat(healthyVolumes.size(), is(0));
-          assertThat(failedVolumes.size(), is(1));
-        }
-      }
-    });
+    boolean result =
+        checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
+          @Override
+          public void call(Set<FsVolumeSpi> healthyVolumes,
+                           Set<FsVolumeSpi> failedVolumes) {
+            numCallbackInvocations.incrementAndGet();
+            if (expectedVolumeHealth != null &&
+                expectedVolumeHealth != FAILED) {
+              assertThat(healthyVolumes.size(), is(1));
+              assertThat(failedVolumes.size(), is(0));
+            } else {
+              assertThat(healthyVolumes.size(), is(0));
+              assertThat(failedVolumes.size(), is(1));
+            }
+          }
+        });
 
     // Ensure that the check was invoked at least once.
     verify(volume, times(1)).check(any(VolumeCheckContext.class));
-    assertThat(numCallbackInvocations.get(), is(1L));
+    if (result) {
+      assertThat(numCallbackInvocations.get(), is(1L));
+    }
   }
 
   /**
@@ -172,7 +177,7 @@ public class TestDatasetVolumeChecker {
     checker.setDelegateChecker(new DummyChecker());
     final AtomicLong numCallbackInvocations = new AtomicLong(0);
 
-    checker.checkAllVolumesAsync(
+    boolean result = checker.checkAllVolumesAsync(
         dataset, new DatasetVolumeChecker.Callback() {
           @Override
           public void call(
@@ -192,7 +197,9 @@ public class TestDatasetVolumeChecker {
         });
 
     // The callback should be invoked exactly once.
-    assertThat(numCallbackInvocations.get(), is(1L));
+    if (result) {
+      assertThat(numCallbackInvocations.get(), is(1L));
+    }
 
     // Ensure each volume's check() method was called exactly once.
     for (FsVolumeSpi volume : volumes) {
@@ -206,15 +213,18 @@ public class TestDatasetVolumeChecker {
    */
   static class DummyChecker
       implements AsyncChecker<VolumeCheckContext, VolumeCheckResult> {
+
     @Override
-    public ListenableFuture<VolumeCheckResult> schedule(
+    public Optional<ListenableFuture<VolumeCheckResult>> schedule(
         Checkable<VolumeCheckContext, VolumeCheckResult> target,
         VolumeCheckContext context) {
       try {
-        return Futures.immediateFuture(target.check(context));
+        return Optional.of(
+            Futures.immediateFuture(target.check(context)));
       } catch (Exception e) {
         LOG.info("check routine threw exception " + e);
-        return Futures.immediateFailedFuture(e);
+        return Optional.of(
+            Futures.<VolumeCheckResult>immediateFailedFuture(e));
       }
     }
 
@@ -259,4 +269,4 @@ public class TestDatasetVolumeChecker {
     }
     return volumes;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
index 6f3d748..04d8bcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
@@ -131,29 +131,6 @@ public class TestDatasetVolumeCheckerFailures {
     assertThat(checker.getNumSkippedChecks(), is(1L));
   }
 
-  @Test(timeout=60000)
-  public void testMinGapIsEnforcedForASyncChecks() throws Exception {
-    final List<FsVolumeSpi> volumes =
-        TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
-    final FsDatasetSpi<FsVolumeSpi> dataset =
-        TestDatasetVolumeChecker.makeDataset(volumes);
-    final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
-
-    checker.checkAllVolumesAsync(dataset, null);
-    assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
-
-    // Re-check without advancing the timer. Ensure the check is skipped.
-    checker.checkAllVolumesAsync(dataset, null);
-    assertThat(checker.getNumAsyncDatasetChecks(), is(1L));
-    assertThat(checker.getNumSkippedChecks(), is(1L));
-
-    // Re-check after advancing the timer. Ensure the check is performed.
-    timer.advance(MIN_DISK_CHECK_GAP_MS);
-    checker.checkAllVolumesAsync(dataset, null);
-    assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
-    assertThat(checker.getNumSkippedChecks(), is(1L));
-  }
-
   /**
    * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine
    * hangs forever.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
index 30bdc08..1109772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java
@@ -18,15 +18,14 @@
 
 package org.apache.hadoop.hdfs.server.datanode.checker;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.FakeTimer;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +37,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.core.Is.isA;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -53,9 +49,6 @@ public class TestThrottledAsyncChecker {
       LoggerFactory.getLogger(TestThrottledAsyncChecker.class);
   private static final long MIN_ERROR_CHECK_GAP = 1000;
 
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
   /**
    * Test various scheduling combinations to ensure scheduling and
    * throttling behave as expected.
@@ -70,34 +63,34 @@ public class TestThrottledAsyncChecker {
                                     getExecutorService());
 
     // check target1 and ensure we get back the expected result.
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
 
     // Check target1 again without advancing the timer. target1 should not
-    // be checked again and the cached result should be returned.
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
+    // be checked again.
+    assertFalse(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
 
     // Schedule target2 scheduled without advancing the timer.
     // target2 should be checked as it has never been checked before.
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target2.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target2, true).isPresent());
+    waitTestCheckableCheckCount(target2, 1L);
 
     // Advance the timer but just short of the min gap.
     // Neither target1 nor target2 should be checked again.
     timer.advance(MIN_ERROR_CHECK_GAP - 1);
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target2.numChecks.get(), is(1L));
+    assertFalse(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
+    assertFalse(checker.schedule(target2, true).isPresent());
+    waitTestCheckableCheckCount(target2, 1L);
 
     // Advance the timer again.
     // Both targets should be checked now.
     timer.advance(MIN_ERROR_CHECK_GAP);
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(2L));
-    assertTrue(checker.schedule(target2, true).get());
-    assertThat(target1.numChecks.get(), is(2L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 2L);
+    assertTrue(checker.schedule(target2, true).isPresent());
+    waitTestCheckableCheckCount(target2, 2L);
   }
 
   @Test (timeout=60000)
@@ -109,13 +102,16 @@ public class TestThrottledAsyncChecker {
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
                                     getExecutorService());
 
-    ListenableFuture<Boolean> lf = checker.schedule(target, true);
-    Futures.addCallback(lf, callback);
+    Optional<ListenableFuture<Boolean>> olf =
+        checker.schedule(target, true);
+    if (olf.isPresent()) {
+      Futures.addCallback(olf.get(), callback);
+    }
 
     // Request immediate cancellation.
     checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
     try {
-      assertFalse(lf.get());
+      assertFalse(olf.get().get());
       fail("Failed to get expected InterruptedException");
     } catch (ExecutionException ee) {
       assertTrue(ee.getCause() instanceof InterruptedException);
@@ -130,27 +126,33 @@ public class TestThrottledAsyncChecker {
     final ThrottledAsyncChecker<Boolean, Boolean> checker =
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
                                     getExecutorService());
-    final ListenableFuture<Boolean> lf1 = checker.schedule(target, true);
-    final ListenableFuture<Boolean> lf2 = checker.schedule(target, true);
+    final Optional<ListenableFuture<Boolean>> olf1 =
+        checker.schedule(target, true);
 
-    // Ensure that concurrent requests return the same future object.
-    assertTrue(lf1 == lf2);
+    final Optional<ListenableFuture<Boolean>> olf2 =
+        checker.schedule(target, true);
+
+    // Ensure that concurrent requests return the future object
+    // for the first caller.
+    assertTrue(olf1.isPresent());
+    assertFalse(olf2.isPresent());
 
     // Unblock the latch and wait for it to finish execution.
     target.latch.countDown();
-    lf1.get();
+    olf1.get().get();
 
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        // We should not get back the same future as before.
+        // We should get an absent Optional.
         // This can take a short while until the internal callback in
         // ThrottledAsyncChecker is scheduled for execution.
         // Also this should not trigger a new check operation as the timer
         // was not advanced. If it does trigger a new check then the test
         // will fail with a timeout.
-        final ListenableFuture<Boolean> lf3 = checker.schedule(target, true);
-        return lf3 != lf2;
+        final Optional<ListenableFuture<Boolean>> olf3 =
+            checker.schedule(target, true);
+        return !olf3.isPresent();
       }
     }, 100, 10000);
   }
@@ -168,15 +170,30 @@ public class TestThrottledAsyncChecker {
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
             getExecutorService());
 
-    assertTrue(checker.schedule(target1, true).get());
-    assertThat(target1.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
     timer.advance(MIN_ERROR_CHECK_GAP + 1);
-    assertFalse(checker.schedule(target1, false).get());
-    assertThat(target1.numChecks.get(), is(2L));
+    assertTrue(checker.schedule(target1, false).isPresent());
+    waitTestCheckableCheckCount(target1, 2L);
+
   }
 
+  private void waitTestCheckableCheckCount(
+      final TestCheckableBase target,
+      final long expectedChecks) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        // This can take a short while until the internal callback in
+        // ThrottledAsyncChecker is scheduled for execution.
+        // If it does trigger a new check then the test
+        // will fail with a timeout.
+        return target.getTotalChecks() == expectedChecks;
+      }
+    }, 100, 10000);
+  }
   /**
-   * Ensure that the exeption from a failed check is cached
+   * Ensure that the exception from a failed check is cached
    * and returned without re-running the check when the minimum
    * gap has not elapsed.
    *
@@ -190,13 +207,11 @@ public class TestThrottledAsyncChecker {
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
             getExecutorService());
 
-    thrown.expectCause(isA(DummyException.class));
-    checker.schedule(target1, true).get();
-    assertThat(target1.numChecks.get(), is(1L));
+    assertTrue(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
 
-    thrown.expectCause(isA(DummyException.class));
-    checker.schedule(target1, true).get();
-    assertThat(target1.numChecks.get(), is(2L));
+    assertFalse(checker.schedule(target1, true).isPresent());
+    waitTestCheckableCheckCount(target1, 1L);
   }
 
   /**
@@ -206,28 +221,38 @@ public class TestThrottledAsyncChecker {
     return new ScheduledThreadPoolExecutor(1);
   }
 
+  private abstract static class TestCheckableBase
+      implements Checkable<Boolean, Boolean> {
+    protected final AtomicLong numChecks = new AtomicLong(0);
+
+    public long getTotalChecks() {
+      return numChecks.get();
+    }
+
+    public void incrTotalChecks() {
+      numChecks.incrementAndGet();
+    }
+  }
+
   /**
    * A Checkable that just returns its input.
    */
   private static class NoOpCheckable
-      implements Checkable<Boolean, Boolean> {
-    private final AtomicLong numChecks = new AtomicLong(0);
+      extends TestCheckableBase {
     @Override
     public Boolean check(Boolean context) {
-      numChecks.incrementAndGet();
+      incrTotalChecks();
       return context;
     }
   }
 
   private static class ThrowingCheckable
-      implements Checkable<Boolean, Boolean> {
-    private final AtomicLong numChecks = new AtomicLong(0);
+      extends TestCheckableBase {
     @Override
     public Boolean check(Boolean context) throws DummyException {
-      numChecks.incrementAndGet();
+      incrTotalChecks();
       throw new DummyException();
     }
-
   }
 
   private static class DummyException extends Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eafaddca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 10b2f56..77b5258 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -159,7 +159,7 @@ public class TestFsDatasetImpl {
     this.conf = new Configuration();
     this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
 
-    final FileIoProvider fileIoProvider = new FileIoProvider(conf);
+    final FileIoProvider fileIoProvider = new FileIoProvider(conf, null);
     when(datanode.getFileIoProvider()).thenReturn(fileIoProvider);
     when(datanode.getConf()).thenReturn(conf);
     final DNConf dnConf = new DNConf(datanode);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message