hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [2/2] hadoop git commit: HDFS-10958. Add instrumentation hooks around Datanode disk IO.
Date Wed, 14 Dec 2016 19:19:29 GMT
HDFS-10958. Add instrumentation hooks around Datanode disk IO.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ba9587d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ba9587d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ba9587d

Branch: refs/heads/trunk
Commit: 6ba9587d370fbf39c129c08c00ebbb894ccc1389
Parents: 72bff19
Author: Arpit Agarwal <arp@apache.org>
Authored: Wed Dec 14 11:18:58 2016 -0800
Committer: Arpit Agarwal <arp@apache.org>
Committed: Wed Dec 14 11:18:58 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/io/nativeio/NativeIO.java |   40 +-
 .../server/datanode/BlockMetadataHeader.java    |   29 +-
 .../dev-support/findbugsExcludeFile.xml         |   27 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |    2 +
 .../hdfs/server/datanode/BlockReceiver.java     |   14 +-
 .../hdfs/server/datanode/BlockSender.java       |   10 +-
 .../server/datanode/CountingFileIoEvents.java   |  107 ++
 .../hadoop/hdfs/server/datanode/DataNode.java   |   12 +
 .../hdfs/server/datanode/DataNodeMXBean.java    |    5 +
 .../hdfs/server/datanode/DataStorage.java       |    6 +
 .../hdfs/server/datanode/DatanodeUtil.java      |   19 +-
 .../server/datanode/DefaultFileIoEvents.java    |   67 ++
 .../hdfs/server/datanode/FileIoEvents.java      |   97 ++
 .../hdfs/server/datanode/FileIoProvider.java    | 1006 ++++++++++++++++++
 .../hdfs/server/datanode/LocalReplica.java      |  133 +--
 .../server/datanode/LocalReplicaInPipeline.java |   43 +-
 .../hdfs/server/datanode/ReplicaInPipeline.java |    4 +-
 .../hdfs/server/datanode/ReplicaInfo.java       |   17 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  |    3 +
 .../datanode/fsdataset/ReplicaInputStreams.java |   11 +-
 .../fsdataset/ReplicaOutputStreams.java         |   72 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  122 +--
 .../impl/FsDatasetAsyncDiskService.java         |    6 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   46 +-
 .../datanode/fsdataset/impl/FsDatasetUtil.java  |    5 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   89 +-
 .../fsdataset/impl/FsVolumeImplBuilder.java     |   12 +-
 .../org/apache/hadoop/hdfs/TestFileAppend.java  |    2 +-
 .../server/datanode/SimulatedFSDataset.java     |   18 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |    2 +-
 .../server/datanode/TestDirectoryScanner.java   |    5 +
 .../server/datanode/TestSimulatedFSDataset.java |    2 +-
 .../extdataset/ExternalDatasetImpl.java         |    2 +-
 .../extdataset/ExternalReplicaInPipeline.java   |    6 +-
 .../datanode/extdataset/ExternalVolumeImpl.java |    6 +
 .../hadoop/tools/TestHdfsConfigFields.java      |    2 +
 36 files changed, 1684 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index a123f18..f3ff1c7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -742,47 +742,19 @@ public class NativeIO {
   }
 
   /**
-   * Create a FileInputStream that shares delete permission on the
-   * file opened, i.e. other process can delete the file the
-   * FileInputStream is reading. Only Windows implementation uses
-   * the native interface.
-   */
-  public static FileInputStream getShareDeleteFileInputStream(File f)
-      throws IOException {
-    if (!Shell.WINDOWS) {
-      // On Linux the default FileInputStream shares delete permission
-      // on the file opened.
-      //
-      return new FileInputStream(f);
-    } else {
-      // Use Windows native interface to create a FileInputStream that
-      // shares delete permission on the file opened.
-      //
-      FileDescriptor fd = Windows.createFile(
-          f.getAbsolutePath(),
-          Windows.GENERIC_READ,
-          Windows.FILE_SHARE_READ |
-              Windows.FILE_SHARE_WRITE |
-              Windows.FILE_SHARE_DELETE,
-          Windows.OPEN_EXISTING);
-      return new FileInputStream(fd);
-    }
-  }
-
-  /**
-   * Create a FileInputStream that shares delete permission on the
+   * Create a FileDescriptor that shares delete permission on the
    * file opened at a given offset, i.e. other process can delete
-   * the file the FileInputStream is reading. Only Windows implementation
+   * the file the FileDescriptor is reading. Only Windows implementation
    * uses the native interface.
    */
-  public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
-      throws IOException {
+  public static FileDescriptor getShareDeleteFileDescriptor(
+      File f, long seekOffset) throws IOException {
     if (!Shell.WINDOWS) {
       RandomAccessFile rf = new RandomAccessFile(f, "r");
       if (seekOffset > 0) {
         rf.seek(seekOffset);
       }
-      return new FileInputStream(rf.getFD());
+      return rf.getFD();
     } else {
       // Use Windows native interface to create a FileInputStream that
       // shares delete permission on the file opened, and set it to the
@@ -797,7 +769,7 @@ public class NativeIO {
           NativeIO.Windows.OPEN_EXISTING);
       if (seekOffset > 0)
         NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
-      return new FileInputStream(fd);
+      return fd;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index eb19492..738f496 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -31,7 +31,6 @@ import java.nio.channels.FileChannel;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -79,18 +78,15 @@ public class BlockMetadataHeader {
 
   /**
    * Read the checksum header from the meta file.
+   * inputStream must be closed by the caller.
    * @return the data checksum obtained from the header.
    */
-  public static DataChecksum readDataChecksum(File metaFile, int bufSize)
+  public static DataChecksum readDataChecksum(
+      FileInputStream inputStream, int bufSize, File metaFile)
       throws IOException {
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(new BufferedInputStream(
-        new FileInputStream(metaFile), bufSize));
-      return readDataChecksum(in, metaFile);
-    } finally {
-      IOUtils.closeStream(in);
-    }
+    DataInputStream in = new DataInputStream(new BufferedInputStream(
+        inputStream, bufSize));
+    return readDataChecksum(in, metaFile);
   }
 
   /**
@@ -111,6 +107,7 @@ public class BlockMetadataHeader {
 
   /**
    * Read the header without changing the position of the FileChannel.
+   * This is used by the client for short-circuit reads.
    *
    * @param fc The FileChannel to read.
    * @return the Metadata Header.
@@ -144,18 +141,16 @@ public class BlockMetadataHeader {
 
   /**
    * Reads header at the top of metadata file and returns the header.
+   * Closes the input stream after reading the header.
    *
    * @return metadata header for the block
    * @throws IOException
    */
-  public static BlockMetadataHeader readHeader(File file) throws IOException {
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(new BufferedInputStream(
-                               new FileInputStream(file)));
+  public static BlockMetadataHeader readHeader(
+      FileInputStream fis) throws IOException {
+    try (DataInputStream in = new DataInputStream(
+        new BufferedInputStream(fis))) {
       return readHeader(in);
-    } finally {
-      IOUtils.closeStream(in);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index e6e4057..3fa4e8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -74,6 +74,33 @@
      </Match>
 
      <!--
+       This class exposes stream constructors. The newly created streams are not
+       supposed to be closed in the constructor. Ignore the OBL warning.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileOutputStream" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
+     <!--
+       This class exposes stream constructors. The newly created streams are not
+       supposed to be closed in the constructor. Ignore the OBL warning.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileInputStream" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
+     <!--
+       This class exposes stream constructors. The newly created streams are not
+       supposed to be closed in the constructor. Ignore the OBL warning.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedRandomAccessFile" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
+     <!--
       lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
       See the comments in BackupImage for justification.
      -->

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index df21857..cffc4bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -687,6 +687,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
   public static final String  DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
+  public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY =
+      "dfs.datanode.fileio.events.class";
   public static final String  DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
   public static final long    DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
   public static final String  DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index f372072..441bd91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -244,8 +244,7 @@ class BlockReceiver implements Closeable {
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      streams = replicaInfo.createStreams(isCreate, requestedChecksum,
-          datanodeSlowLogThresholdMs);
+      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
       assert streams != null : "null streams!";
 
       // read checksum meta information
@@ -400,9 +399,8 @@ class BlockReceiver implements Closeable {
       checksumOut.flush();
       long flushEndNanos = System.nanoTime();
       if (isSync) {
-        long fsyncStartNanos = flushEndNanos;
         streams.syncChecksumOut();
-        datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
+        datanode.metrics.addFsyncNanos(System.nanoTime() - flushEndNanos);
       }
       flushTotalNanos += flushEndNanos - flushStartNanos;
     }
@@ -703,8 +701,10 @@ class BlockReceiver implements Closeable {
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
           
           // Write data to disk.
-          long duration = streams.writeToDisk(dataBuf.array(),
+          long begin = Time.monotonicNow();
+          streams.writeDataToDisk(dataBuf.array(),
               startByteToDisk, numBytesToDisk);
+          long duration = Time.monotonicNow() - begin;
 
           if (duration > maxWriteToDiskMs) {
             maxWriteToDiskMs = duration;
@@ -1029,9 +1029,7 @@ class BlockReceiver implements Closeable {
    * will be overwritten.
    */
   private void adjustCrcFilePosition() throws IOException {
-    if (streams.getDataOut() != null) {
-      streams.flushDataOut();
-    }
+    streams.flushDataOut();
     if (checksumOut != null) {
       checksumOut.flush();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 9182c88..d7aebd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -166,6 +166,7 @@ class BlockSender implements java.io.Closeable {
   private final boolean dropCacheBehindAllReads;
   
   private long lastCacheDropOffset;
+  private final FileIoProvider fileIoProvider;
   
   @VisibleForTesting
   static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@@ -197,6 +198,7 @@ class BlockSender implements java.io.Closeable {
     InputStream blockIn = null;
     DataInputStream checksumIn = null;
     FsVolumeReference volumeRef = null;
+    this.fileIoProvider = datanode.getFileIoProvider();
     try {
       this.block = block;
       this.corruptChecksumOk = corruptChecksumOk;
@@ -401,7 +403,8 @@ class BlockSender implements java.io.Closeable {
         DataNode.LOG.debug("replica=" + replica);
       }
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
-      ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
+      ris = new ReplicaInputStreams(
+          blockIn, checksumIn, volumeRef, fileIoProvider);
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
       throw ioe;
@@ -568,8 +571,9 @@ class BlockSender implements java.io.Closeable {
         FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
         LongWritable waitTime = new LongWritable();
         LongWritable transferTime = new LongWritable();
-        sockOut.transferToFully(fileCh, blockInPosition, dataLen,
-            waitTime, transferTime);
+        fileIoProvider.transferToSocketFully(
+            ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition,
+            dataLen, waitTime, transferTime);
         datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
         datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
         blockInPosition += dataLen;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
new file mode 100644
index 0000000..a70c151
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link FileIoEvents} that simply counts the number of operations.
+ * Not meant to be used outside of testing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CountingFileIoEvents implements FileIoEvents {
+  private final Map<OPERATION, Counts> counts;
+
+  private static class Counts {
+    private final AtomicLong successes = new AtomicLong(0);
+    private final AtomicLong failures = new AtomicLong(0);
+
+    @JsonProperty("Successes")
+    public long getSuccesses() {
+      return successes.get();
+    }
+
+    @JsonProperty("Failures")
+    public long getFailures() {
+      return failures.get();
+    }
+  }
+
+  public CountingFileIoEvents() {
+    counts = new HashMap<>();
+    for (OPERATION op : OPERATION.values()) {
+      counts.put(op, new Counts());
+    }
+  }
+
+  @Override
+  public long beforeMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op) {
+    return 0;
+  }
+
+  @Override
+  public void afterMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+    counts.get(op).successes.incrementAndGet();
+  }
+
+  @Override
+  public long beforeFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+    return 0;
+  }
+
+  @Override
+  public void afterFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+    counts.get(op).successes.incrementAndGet();
+  }
+
+  @Override
+  public void onFailure(
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+    counts.get(op).failures.incrementAndGet();
+
+  }
+
+  @Override
+  public String getStatistics() {
+    ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      return objectMapper.writeValueAsString(counts);
+    } catch (JsonProcessingException e) {
+      // Failed to serialize. Don't log the exception call stack.
+      FileIoProvider.LOG.error("Failed to serialize statistics" + e);
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b845da0..794b1ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -299,6 +299,7 @@ public class DataNode extends ReconfigurableBase
   public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
 
   private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
+  private final FileIoProvider fileIoProvider;
 
   /**
    * Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -411,6 +412,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+    this.fileIoProvider = new FileIoProvider(conf);
     this.fileDescriptorPassingDisabledReason = null;
     this.maxNumberOfBlocksToLog = 0;
     this.confVersion = null;
@@ -437,6 +439,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+    this.fileIoProvider = new FileIoProvider(conf);
     this.blockScanner = new BlockScanner(this);
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -617,6 +620,10 @@ public class DataNode extends ReconfigurableBase
         PipelineAck.ECN.SUPPORTED;
   }
 
+  public FileIoProvider getFileIoProvider() {
+    return fileIoProvider;
+  }
+
   /**
    * Contains the StorageLocations for changed data volumes.
    */
@@ -3008,6 +3015,11 @@ public class DataNode extends ReconfigurableBase
     }
   }
   
+  @Override // DataNodeMXBean
+  public String getFileIoProviderStatistics() {
+    return fileIoProvider.getStatistics();
+  }
+
   public void refreshNamenodes(Configuration conf) throws IOException {
     blockPoolManager.refreshNamenodes(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 90c38d7..37f9635 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -120,4 +120,9 @@ public interface DataNodeMXBean {
    * @return  DiskBalancer Status
    */
   String getDiskBalancerStatus();
+
+  /**
+   * Gets the {@link FileIoProvider} statistics.
+   */
+  String getFileIoProviderStatistics();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index f4deb6d..5163e6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -1356,6 +1356,12 @@ public class DataStorage extends Storage {
     bpStorageMap.remove(bpId);
   }
 
+  /**
+   * Prefer FileIoProvider#fullydelete.
+   * @param dir
+   * @return
+   */
+  @Deprecated
   public static boolean fullyDelete(final File dir) {
     boolean result = FileUtil.fullyDelete(dir);
     return result;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
index ad054a8..c98ff54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 
 /** Provide utility methods for Datanode. */
@@ -55,15 +56,17 @@ public class DatanodeUtil {
    * @throws IOException 
    * if the file already exists or if the file cannot be created.
    */
-  public static File createTmpFile(Block b, File f) throws IOException {
-    if (f.exists()) {
+  public static File createFileWithExistsCheck(
+      FsVolumeSpi volume, Block b, File f,
+      FileIoProvider fileIoProvider) throws IOException {
+    if (fileIoProvider.exists(volume, f)) {
       throw new IOException("Failed to create temporary file for " + b
           + ".  File " + f + " should not be present, but is.");
     }
     // Create the zero-length temp file
     final boolean fileCreated;
     try {
-      fileCreated = f.createNewFile();
+      fileCreated = fileIoProvider.createFile(volume, f);
     } catch (IOException ioe) {
       throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
     }
@@ -92,13 +95,17 @@ public class DatanodeUtil {
    * @return true if there are no files
    * @throws IOException if unable to list subdirectories
    */
-  public static boolean dirNoFilesRecursive(File dir) throws IOException {
-    File[] contents = dir.listFiles();
+  public static boolean dirNoFilesRecursive(
+      FsVolumeSpi volume, File dir,
+      FileIoProvider fileIoProvider) throws IOException {
+    File[] contents = fileIoProvider.listFiles(volume, dir);
     if (contents == null) {
       throw new IOException("Cannot list contents of " + dir);
     }
     for (File f : contents) {
-      if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+      if (!f.isDirectory() ||
+          (f.isDirectory() && !dirNoFilesRecursive(
+              volume, f, fileIoProvider))) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
new file mode 100644
index 0000000..bd4932b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The default implementation of {@link FileIoEvents} that do nothing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class DefaultFileIoEvents implements FileIoEvents {
+  @Override
+  public long beforeMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op) {
+    return 0;
+  }
+
+  @Override
+  public void afterMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+  }
+
+  @Override
+  public long beforeFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+    return 0;
+  }
+
+  @Override
+  public void afterFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+  }
+
+  @Override
+  public void onFailure(
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+  }
+
+  @Override
+  public @Nullable String getStatistics() {
+    // null is valid JSON.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
new file mode 100644
index 0000000..48e703f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The following hooks can be implemented for instrumentation/fault
+ * injection.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface FileIoEvents {
+
+  /**
+   * Invoked before a filesystem metadata operation.
+   *
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @return  timestamp at which the operation was started. 0 if
+   *          unavailable.
+   */
+  long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
+
+  /**
+   * Invoked after a filesystem metadata operation has completed.
+   *
+   * @param volume  target volume for the operation.  Null if unavailable.
+   * @param op  type of operation.
+   * @param begin  timestamp at which the operation was started. 0
+   *               if unavailable.
+   */
+  void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
+
+  /**
+   * Invoked before a read/write/flush/channel transfer operation.
+   *
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param len  length of the file IO. 0 for flush.
+   * @return  timestamp at which the operation was started. 0 if
+   *          unavailable.
+   */
+  long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
+
+
+  /**
+   * Invoked after a read/write/flush/channel transfer operation
+   * has completed.
+   *
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param len   of the file IO. 0 for flush.
+   * @return  timestamp at which the operation was started. 0 if
+   *          unavailable.
+   */
+  void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+                   long begin, long len);
+
+  /**
+   * Invoked if an operation fails with an exception.
+   *  @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param e  Exception encountered during the operation.
+   * @param begin  time at which the operation was started.
+   */
+  void onFailure(
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
+
+  /**
+   * Return statistics as a JSON string.
+   * @return
+   */
+  @Nullable String getStatistics();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
new file mode 100644
index 0000000..2344114
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
@@ -0,0 +1,1006 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.CopyOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*;
+
+/**
+ * This class abstracts out various file IO operations performed by the
+ * DataNode and invokes event hooks before and after each file IO.
+ *
+ * Behavior can be injected into these events by implementing
+ * {@link FileIoEvents} and replacing the default implementation
+ * with {@link DFSConfigKeys#DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY}.
+ *
+ * Most functions accept an optional {@link FsVolumeSpi} parameter for
+ * instrumentation/logging.
+ *
+ * Some methods may look redundant, especially the multiple variations of
+ * move/rename/list. They exist to retain behavior compatibility for existing
+ * code.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileIoProvider {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      FileIoProvider.class);
+
+  private final FileIoEvents eventHooks;
+
+  /**
+   * @param conf  Configuration object. May be null. When null,
+   *              the event handlers are no-ops.
+   */
+  public FileIoProvider(@Nullable Configuration conf) {
+    if (conf != null) {
+      final Class<? extends FileIoEvents> clazz = conf.getClass(
+          DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
+          DefaultFileIoEvents.class,
+          FileIoEvents.class);
+      eventHooks = ReflectionUtils.newInstance(clazz, conf);
+    } else {
+      eventHooks = new DefaultFileIoEvents();
+    }
+  }
+
+  /**
+   * Lists the types of file system operations. Passed to the
+   * IO hooks so implementations can choose behavior based on
+   * specific operations.
+   */
+  public enum OPERATION {
+    OPEN,
+    EXISTS,
+    LIST,
+    DELETE,
+    MOVE,
+    MKDIRS,
+    TRANSFER,
+    SYNC,
+    FADVISE,
+    READ,
+    WRITE,
+    FLUSH,
+    NATIVE_COPY
+  }
+
+  /**
+   * Retrieve statistics from the underlying {@link FileIoEvents}
+   * implementation as a JSON string, if it maintains them.
+   * @return statistics as a JSON string. May be null.
+   */
+  public @Nullable String getStatistics() {
+    return eventHooks.getStatistics();
+  }
+
+  /**
+   * See {@link Flushable#flush()}.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void flush(
+      @Nullable FsVolumeSpi volume, Flushable f) throws IOException {
+    final long begin = eventHooks.beforeFileIo(volume, FLUSH, 0);
+    try {
+      f.flush();
+      eventHooks.afterFileIo(volume, FLUSH, begin, 0);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, FLUSH, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Sync the given {@link FileOutputStream}.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void sync(
+      @Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException {
+    final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+    try {
+      fos.getChannel().force(true);
+      eventHooks.afterFileIo(volume, SYNC, begin, 0);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, SYNC, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Call sync_file_range on the given file descriptor.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void syncFileRange(
+      @Nullable FsVolumeSpi volume, FileDescriptor outFd,
+      long offset, long numBytes, int flags) throws NativeIOException {
+    final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+    try {
+      NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
+      eventHooks.afterFileIo(volume, SYNC, begin, 0);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, SYNC, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Call posix_fadvise on the given file descriptor.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void posixFadvise(
+      @Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd,
+      long offset, long length, int flags) throws NativeIOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, FADVISE);
+    try {
+      NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+          identifier, outFd, offset, length, flags);
+      eventHooks.afterMetadataOp(volume, FADVISE, begin);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, FADVISE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Delete a file.
+   * @param volume  target volume. null if unavailable.
+   * @param f  File to delete.
+   * @return  true if the file was successfully deleted.
+   */
+  public boolean delete(@Nullable FsVolumeSpi volume, File f) {
+    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    try {
+      boolean deleted = f.delete();
+      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      return deleted;
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, DELETE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Delete a file, first checking to see if it exists.
+   * @param volume  target volume. null if unavailable.
+   * @param f  File to delete
+   * @return  true if the file was successfully deleted or if it never
+   *          existed.
+   */
+  public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) {
+    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    try {
+      boolean deleted = !f.exists() || f.delete();
+      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      if (!deleted) {
+        LOG.warn("Failed to delete file {}", f);
+      }
+      return deleted;
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, DELETE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Transfer data from a FileChannel to a SocketOutputStream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param sockOut  SocketOutputStream to write the data.
+   * @param fileCh  FileChannel from which to read data.
+   * @param position  position within the channel where the transfer begins.
+   * @param count  number of bytes to transfer.
+   * @param waitTime  returns the nanoseconds spent waiting for the socket
+   *                  to become writable.
+   * @param transferTime  returns the nanoseconds spent transferring data.
+   * @throws IOException
+   */
+  public void transferToSocketFully(
+      @Nullable FsVolumeSpi volume, SocketOutputStream sockOut,
+      FileChannel fileCh, long position, int count,
+      LongWritable waitTime, LongWritable transferTime) throws IOException {
+    final long begin = eventHooks.beforeFileIo(volume, TRANSFER, count);
+    try {
+      sockOut.transferToFully(fileCh, position, count,
+          waitTime, transferTime);
+      eventHooks.afterFileIo(volume, TRANSFER, begin, count);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, TRANSFER, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a file.
+   * @param volume  target volume. null if unavailable.
+   * @param f  File to be created.
+   * @return  true if the file does not exist and was successfully created.
+   *          false if the file already exists.
+   * @throws IOException
+   */
+  public boolean createFile(
+      @Nullable FsVolumeSpi volume, File f) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    try {
+      boolean created = f.createNewFile();
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return created;
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileInputStream using
+   * {@link FileInputStream#FileInputStream(File)}.
+   *
+   * Wraps the created input stream to intercept read calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @return  FileInputStream to the given file.
+   * @throws  FileNotFoundException
+   */
+  public FileInputStream getFileInputStream(
+      @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileInputStream fis = null;
+    try {
+      fis = new WrappedFileInputStream(volume, f);
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fis;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fis);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileOutputStream using
+   * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+   *
+   * Wraps the created output stream to intercept write calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param append  if true, then bytes will be written to the end of the
+   *                file rather than the beginning.
+   * @param  FileOutputStream to the given file object.
+   * @throws FileNotFoundException
+   */
+  public FileOutputStream getFileOutputStream(
+      @Nullable FsVolumeSpi volume, File f,
+      boolean append) throws FileNotFoundException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileOutputStream fos = null;
+    try {
+      fos = new WrappedFileOutputStream(volume, f, append);
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fos;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fos);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileOutputStream using
+   * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+   *
+   * Wraps the created output stream to intercept write calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @return  FileOutputStream to the given file object.
+   * @throws  FileNotFoundException
+   */
+  public FileOutputStream getFileOutputStream(
+      @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+    return getFileOutputStream(volume, f, false);
+  }
+
+  /**
+   * Create a FileOutputStream using
+   * {@link FileOutputStream#FileOutputStream(FileDescriptor)}.
+   *
+   * Wraps the created output stream to intercept write calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @return  FileOutputStream to the given file object.
+   * @throws  FileNotFoundException
+   */
+  public FileOutputStream getFileOutputStream(
+      @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+    return new WrappedFileOutputStream(volume, fd);
+  }
+
+  /**
+   * Create a FileInputStream using
+   * {@link NativeIO#getShareDeleteFileDescriptor}.
+   * Wraps the created input stream to intercept input calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param offset  the offset position, measured in bytes from the
+   *                beginning of the file, at which to set the file
+   *                pointer.
+   * @return FileOutputStream to the given file object.
+   * @throws FileNotFoundException
+   */
+  public FileInputStream getShareDeleteFileInputStream(
+      @Nullable FsVolumeSpi volume, File f,
+      long offset) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileInputStream fis = null;
+    try {
+      fis = new WrappedFileInputStream(volume,
+          NativeIO.getShareDeleteFileDescriptor(f, offset));
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fis;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fis);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileInputStream using
+   * {@link FileInputStream#FileInputStream(File)} and position
+   * it at the given offset.
+   *
+   * Wraps the created input stream to intercept read calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param offset  the offset position, measured in bytes from the
+   *                beginning of the file, at which to set the file
+   *                pointer.
+   * @throws FileNotFoundException
+   */
+  public FileInputStream openAndSeek(
+      @Nullable FsVolumeSpi volume, File f, long offset) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileInputStream fis = null;
+    try {
+      fis = new WrappedFileInputStream(volume,
+          FsDatasetUtil.openAndSeek(f, offset));
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fis;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fis);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a RandomAccessFile using
+   * {@link RandomAccessFile#RandomAccessFile(File, String)}.
+   *
+   * Wraps the created input stream to intercept IO calls
+   * before delegating to the wrapped RandomAccessFile.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param mode  See {@link RandomAccessFile} for a description
+   *              of the mode string.
+   * @return RandomAccessFile representing the given file.
+   * @throws FileNotFoundException
+   */
+  public RandomAccessFile getRandomAccessFile(
+      @Nullable FsVolumeSpi volume, File f,
+      String mode) throws FileNotFoundException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    RandomAccessFile raf = null;
+    try {
+      raf = new WrappedRandomAccessFile(volume, f, mode);
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return raf;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(raf);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Delete the given directory using {@link FileUtil#fullyDelete(File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param dir  directory to be deleted.
+   * @return true on success false on failure.
+   */
+  public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
+    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    try {
+      boolean deleted = FileUtil.fullyDelete(dir);
+      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      return deleted;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, DELETE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link FileUtil#replaceFile(File, File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @throws IOException
+   */
+  public void replaceFile(
+      @Nullable FsVolumeSpi volume, File src, File target) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      FileUtil.replaceFile(src, target);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link Storage#rename(File, File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @throws IOException
+   */
+  public void rename(
+      @Nullable FsVolumeSpi volume, File src, File target)
+      throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      Storage.rename(src, target);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link FileUtils#moveFile(File, File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @throws IOException
+   */
+  public void moveFile(
+      @Nullable FsVolumeSpi volume, File src, File target)
+      throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      FileUtils.moveFile(src, target);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link Files#move(Path, Path, CopyOption...)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @param options  See {@link Files#move} for a description
+   *                of the options.
+   * @throws IOException
+   */
+  public void move(
+      @Nullable FsVolumeSpi volume, Path src, Path target,
+      CopyOption... options) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      Files.move(src, target, options);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * See {@link Storage#nativeCopyFileUnbuffered(File, File, boolean)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  an existing file to copy, must not be {@code null}
+   * @param target  the new file, must not be {@code null}
+   * @param preserveFileDate  true if the file date of the copy
+   *                         should be the same as the original
+   * @throws IOException
+   */
+  public void nativeCopyFileUnbuffered(
+      @Nullable FsVolumeSpi volume, File src, File target,
+      boolean preserveFileDate) throws IOException {
+    final long length = src.length();
+    final long begin = eventHooks.beforeFileIo(volume, NATIVE_COPY, length);
+    try {
+      Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
+      eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, NATIVE_COPY, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * See {@link File#mkdirs()}.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param dir  directory to be created.
+   * @return  true only if the directory was created. false if
+   *          the directory already exists.
+   * @throws IOException if a directory with the given name does
+   *                     not exist and could not be created.
+   */
+  public boolean mkdirs(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+    boolean created = false;
+    boolean isDirectory;
+    try {
+      created = dir.mkdirs();
+      isDirectory = !created && dir.isDirectory();
+      eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      throw e;
+    }
+
+    if (!created && !isDirectory) {
+      throw new IOException("Mkdirs failed to create " + dir);
+    }
+    return created;
+  }
+
+  /**
+   * Create the target directory using {@link File#mkdirs()} only if
+   * it doesn't exist already.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param dir  directory to be created.
+   * @throws IOException  if the directory could not created
+   */
+  public void mkdirsWithExistsCheck(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+    boolean succeeded = false;
+    try {
+      succeeded = dir.isDirectory() || dir.mkdirs();
+      eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      throw e;
+    }
+
+    if (!succeeded) {
+      throw new IOException("Mkdirs failed to create " + dir);
+    }
+  }
+
+  /**
+   * Get a listing of the given directory using
+   * {@link FileUtil#listFiles(File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param dir  Directory to be listed.
+   * @return  array of file objects representing the directory entries.
+   * @throws IOException
+   */
+  public File[] listFiles(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      File[] children = FileUtil.listFiles(dir);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return children;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Get a listing of the given directory using
+   * {@link FileUtil#listFiles(File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param   Driectory to be listed.
+   * @return  array of strings representing the directory entries.
+   * @throws IOException
+   */
+  public String[] list(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      String[] children = FileUtil.list(dir);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return children;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Get a listing of the given directory using
+   * {@link IOUtils#listDirectory(File, FilenameFilter)}.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param dir Directory to list.
+   * @param filter {@link FilenameFilter} to filter the directory entries.
+   * @throws IOException
+   */
+  public List<String> listDirectory(
+      @Nullable FsVolumeSpi volume, File dir,
+      FilenameFilter filter) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      List<String> children = IOUtils.listDirectory(dir, filter);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return children;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Retrieves the number of links to the specified file.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param f file whose link count is being queried.
+   * @return number of hard-links to the given file, including the
+   *         given path itself.
+   * @throws IOException
+   */
+  public int getHardLinkCount(
+      @Nullable FsVolumeSpi volume, File f) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      int count = HardLink.getLinkCount(f);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return count;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Check for file existence using {@link File#exists()}.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param f file object.
+   * @return true if the file exists.
+   */
+  public boolean exists(@Nullable FsVolumeSpi volume, File f) {
+    final long begin = eventHooks.beforeMetadataOp(volume, EXISTS);
+    try {
+      boolean exists = f.exists();
+      eventHooks.afterMetadataOp(volume, EXISTS, begin);
+      return exists;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, EXISTS, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * A thin wrapper over {@link FileInputStream} that allows
+   * instrumenting disk IO.
+   */
+  private final class WrappedFileInputStream extends FileInputStream {
+    private @Nullable final FsVolumeSpi volume;
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileInputStream(@Nullable FsVolumeSpi volume, File f)
+        throws FileNotFoundException {
+      super(f);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileInputStream(
+        @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+      super(fd);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public int read() throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+      try {
+        int b = super.read();
+        eventHooks.afterFileIo(volume, READ, begin, 1);
+        return b;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public int read(@Nonnull byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+      try {
+        int numBytesRead = super.read(b);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public int read(@Nonnull byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, len);
+      try {
+        int numBytesRead = super.read(b, off, len);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * A thin wrapper over {@link FileOutputStream} that allows
+   * instrumenting disk IO.
+   */
+  private final class WrappedFileOutputStream extends FileOutputStream {
+    private @Nullable final FsVolumeSpi volume;
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileOutputStream(
+        @Nullable FsVolumeSpi volume, File f,
+        boolean append) throws FileNotFoundException {
+      super(f, append);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileOutputStream(
+        @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+      super(fd);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void write(int b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, 1);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void write(@Nonnull byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void write(@Nonnull byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+      try {
+        super.write(b, off, len);
+        eventHooks.afterFileIo(volume, WRITE, begin, len);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * A thin wrapper over {@link FileInputStream} that allows
+   * instrumenting IO.
+   */
+  private final class WrappedRandomAccessFile extends RandomAccessFile {
+    private @Nullable final FsVolumeSpi volume;
+
+    public WrappedRandomAccessFile(
+        @Nullable FsVolumeSpi volume, File f, String mode)
+        throws FileNotFoundException {
+      super(f, mode);
+      this.volume = volume;
+    }
+
+    @Override
+    public int read() throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+      try {
+        int b = super.read();
+        eventHooks.afterFileIo(volume, READ, begin, 1);
+        return b;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, len);
+      try {
+        int numBytesRead = super.read(b, off, len);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+      try {
+        int numBytesRead = super.read(b);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, 1);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public void write(@Nonnull byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+      try {
+        super.write(b, off, len);
+        eventHooks.afterFileIo(volume, WRITE, begin, len);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
index e6f7e12..1d46ddd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -29,17 +29,13 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
@@ -187,20 +183,23 @@ abstract public class LocalReplica extends ReplicaInfo {
    * be recovered (especially on Windows) on datanode restart.
    */
   private void breakHardlinks(File file, Block b) throws IOException {
-    File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
-    try (FileInputStream in = new FileInputStream(file)) {
-      try (FileOutputStream out = new FileOutputStream(tmpFile)){
-        copyBytes(in, out, 16 * 1024);
+    final FileIoProvider fileIoProvider = getFileIoProvider();
+    final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+        getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
+    try (FileInputStream in = fileIoProvider.getFileInputStream(
+        getVolume(), file)) {
+      try (FileOutputStream out = fileIoProvider.getFileOutputStream(
+          getVolume(), tmpFile)) {
+        IOUtils.copyBytes(in, out, 16 * 1024);
       }
       if (file.length() != tmpFile.length()) {
         throw new IOException("Copy of file " + file + " size " + file.length()+
                               " into file " + tmpFile +
                               " resulted in a size of " + tmpFile.length());
       }
-      replaceFile(tmpFile, file);
+      fileIoProvider.replaceFile(getVolume(), tmpFile, file);
     } catch (IOException e) {
-      boolean done = tmpFile.delete();
-      if (!done) {
+      if (!fileIoProvider.delete(getVolume(), tmpFile)) {
         DataNode.LOG.info("detachFile failed to delete temporary file " +
                           tmpFile);
       }
@@ -226,19 +225,20 @@ abstract public class LocalReplica extends ReplicaInfo {
    * @throws IOException
    */
   public boolean breakHardLinksIfNeeded() throws IOException {
-    File file = getBlockFile();
+    final File file = getBlockFile();
+    final FileIoProvider fileIoProvider = getFileIoProvider();
     if (file == null || getVolume() == null) {
       throw new IOException("detachBlock:Block not found. " + this);
     }
     File meta = getMetaFile();
 
-    int linkCount = getHardLinkCount(file);
+    int linkCount = fileIoProvider.getHardLinkCount(getVolume(), file);
     if (linkCount > 1) {
       DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
           "block " + this);
       breakHardlinks(file, this);
     }
-    if (getHardLinkCount(meta) > 1) {
+    if (fileIoProvider.getHardLinkCount(getVolume(), meta) > 1) {
       breakHardlinks(meta, this);
     }
     return true;
@@ -256,17 +256,18 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   @Override
   public OutputStream getDataOutputStream(boolean append) throws IOException {
-    return new FileOutputStream(getBlockFile(), append);
+    return getFileIoProvider().getFileOutputStream(
+        getVolume(), getBlockFile(), append);
   }
 
   @Override
   public boolean blockDataExists() {
-    return getBlockFile().exists();
+    return getFileIoProvider().exists(getVolume(), getBlockFile());
   }
 
   @Override
   public boolean deleteBlockData() {
-    return fullyDelete(getBlockFile());
+    return getFileIoProvider().fullyDelete(getVolume(), getBlockFile());
   }
 
   @Override
@@ -282,9 +283,10 @@ abstract public class LocalReplica extends ReplicaInfo {
   @Override
   public LengthInputStream getMetadataInputStream(long offset)
       throws IOException {
-    File meta = getMetaFile();
+    final File meta = getMetaFile();
     return new LengthInputStream(
-        FsDatasetUtil.openAndSeek(meta, offset), meta.length());
+        getFileIoProvider().openAndSeek(getVolume(), meta, offset),
+        meta.length());
   }
 
   @Override
@@ -295,12 +297,12 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   @Override
   public boolean metadataExists() {
-    return getMetaFile().exists();
+    return getFileIoProvider().exists(getVolume(), getMetaFile());
   }
 
   @Override
   public boolean deleteMetadata() {
-    return fullyDelete(getMetaFile());
+    return getFileIoProvider().fullyDelete(getVolume(), getMetaFile());
   }
 
   @Override
@@ -320,7 +322,7 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   private boolean renameFile(File srcfile, File destfile) throws IOException {
     try {
-      rename(srcfile, destfile);
+      getFileIoProvider().rename(getVolume(), srcfile, destfile);
       return true;
     } catch (IOException e) {
       throw new IOException("Failed to move block file for " + this
@@ -360,9 +362,9 @@ abstract public class LocalReplica extends ReplicaInfo {
   @Override
   public void bumpReplicaGS(long newGS) throws IOException {
     long oldGS = getGenerationStamp();
-    File oldmeta = getMetaFile();
+    final File oldmeta = getMetaFile();
     setGenerationStamp(newGS);
-    File newmeta = getMetaFile();
+    final File newmeta = getMetaFile();
 
     // rename meta file to new GS
     if (LOG.isDebugEnabled()) {
@@ -370,7 +372,7 @@ abstract public class LocalReplica extends ReplicaInfo {
     }
     try {
       // calling renameMeta on the ReplicaInfo doesn't work here
-      rename(oldmeta, newmeta);
+      getFileIoProvider().rename(getVolume(), oldmeta, newmeta);
     } catch (IOException e) {
       setGenerationStamp(oldGS); // restore old GS
       throw new IOException("Block " + this + " reopen failed. " +
@@ -381,7 +383,8 @@ abstract public class LocalReplica extends ReplicaInfo {
 
   @Override
   public void truncateBlock(long newLength) throws IOException {
-    truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength);
+    truncateBlock(getVolume(), getBlockFile(), getMetaFile(),
+        getNumBytes(), newLength, getFileIoProvider());
   }
 
   @Override
@@ -392,32 +395,15 @@ abstract public class LocalReplica extends ReplicaInfo {
   @Override
   public void copyMetadata(URI destination) throws IOException {
     //for local replicas, we assume the destination URI is file
-    nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true);
+    getFileIoProvider().nativeCopyFileUnbuffered(
+        getVolume(), getMetaFile(), new File(destination), true);
   }
 
   @Override
   public void copyBlockdata(URI destination) throws IOException {
     //for local replicas, we assume the destination URI is file
-    nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true);
-  }
-
-  public void renameMeta(File newMetaFile) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile);
-    }
-    renameFile(getMetaFile(), newMetaFile);
-  }
-
-  public void renameBlock(File newBlockFile) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile
-          + ", file length=" + getBlockFile().length());
-    }
-    renameFile(getBlockFile(), newBlockFile);
-  }
-
-  public static void rename(File from, File to) throws IOException {
-    Storage.rename(from, to);
+    getFileIoProvider().nativeCopyFileUnbuffered(
+        getVolume(), getBlockFile(), new File(destination), true);
   }
 
   /**
@@ -430,11 +416,13 @@ abstract public class LocalReplica extends ReplicaInfo {
   private FileInputStream getDataInputStream(File f, long seekOffset)
       throws IOException {
     FileInputStream fis;
+    final FileIoProvider fileIoProvider = getFileIoProvider();
     if (NativeIO.isAvailable()) {
-      fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset);
+      fis = fileIoProvider.getShareDeleteFileInputStream(
+          getVolume(), f, seekOffset);
     } else {
       try {
-        fis = FsDatasetUtil.openAndSeek(f, seekOffset);
+        fis = fileIoProvider.openAndSeek(getVolume(), f, seekOffset);
       } catch (FileNotFoundException fnfe) {
         throw new IOException("Expected block file at " + f +
             " does not exist.");
@@ -443,30 +431,6 @@ abstract public class LocalReplica extends ReplicaInfo {
     return fis;
   }
 
-  private void nativeCopyFileUnbuffered(File srcFile, File destFile,
-      boolean preserveFileDate) throws IOException {
-    Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate);
-  }
-
-  private void copyBytes(InputStream in, OutputStream out, int
-      buffSize) throws IOException{
-    IOUtils.copyBytes(in, out, buffSize);
-  }
-
-  private void replaceFile(File src, File target) throws IOException {
-    FileUtil.replaceFile(src, target);
-  }
-
-  public static boolean fullyDelete(final File dir) {
-    boolean result = DataStorage.fullyDelete(dir);
-    return result;
-  }
-
-  public static int getHardLinkCount(File fileName) throws IOException {
-    int linkCount = HardLink.getLinkCount(fileName);
-    return linkCount;
-  }
-
   /**
    *  Get pin status of a file by checking the sticky bit.
    * @param localFS local file system
@@ -495,8 +459,10 @@ abstract public class LocalReplica extends ReplicaInfo {
     localFS.setPermission(path, permission);
   }
 
-  public static void truncateBlock(File blockFile, File metaFile,
-      long oldlen, long newlen) throws IOException {
+  public static void truncateBlock(
+      FsVolumeSpi volume, File blockFile, File metaFile,
+      long oldlen, long newlen, FileIoProvider fileIoProvider)
+      throws IOException {
     LOG.info("truncateBlock: blockFile=" + blockFile
         + ", metaFile=" + metaFile
         + ", oldlen=" + oldlen
@@ -510,7 +476,10 @@ abstract public class LocalReplica extends ReplicaInfo {
           + ") to newlen (=" + newlen + ")");
     }
 
-    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+    // fis is closed by BlockMetadataHeader.readHeader.
+    final FileInputStream fis = fileIoProvider.getFileInputStream(
+        volume, metaFile);
+    DataChecksum dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
     int checksumsize = dcs.getChecksumSize();
     int bpc = dcs.getBytesPerChecksum();
     long n = (newlen - 1)/bpc + 1;
@@ -519,16 +488,14 @@ abstract public class LocalReplica extends ReplicaInfo {
     int lastchunksize = (int)(newlen - lastchunkoffset);
     byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
 
-    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
-    try {
+    try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
+        volume, blockFile, "rw")) {
       //truncate blockFile
       blockRAF.setLength(newlen);
 
       //read last chunk
       blockRAF.seek(lastchunkoffset);
       blockRAF.readFully(b, 0, lastchunksize);
-    } finally {
-      blockRAF.close();
     }
 
     //compute checksum
@@ -536,13 +503,11 @@ abstract public class LocalReplica extends ReplicaInfo {
     dcs.writeValue(b, 0, false);
 
     //update metaFile
-    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
-    try {
+    try (RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile(
+        volume, metaFile, "rw")) {
       metaRAF.setLength(newmetalen);
       metaRAF.seek(newmetalen - checksumsize);
       metaRAF.write(b, 0, checksumsize);
-    } finally {
-      metaRAF.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
index 1387155..003f96f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
@@ -245,10 +245,9 @@ public class LocalReplicaInPipeline extends LocalReplica
 
   @Override // ReplicaInPipeline
   public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum, long slowLogThresholdMs)
-      throws IOException {
-    File blockFile = getBlockFile();
-    File metaFile = getMetaFile();
+      DataChecksum requestedChecksum) throws IOException {
+    final File blockFile = getBlockFile();
+    final File metaFile = getMetaFile();
     if (DataNode.LOG.isDebugEnabled()) {
       DataNode.LOG.debug("writeTo blockfile is " + blockFile +
                          " of size " + blockFile.length());
@@ -262,14 +261,16 @@ public class LocalReplicaInPipeline extends LocalReplica
     // may differ from requestedChecksum for appends.
     final DataChecksum checksum;
 
-    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    final RandomAccessFile metaRAF =
+        getFileIoProvider().getRandomAccessFile(getVolume(), metaFile, "rw");
 
     if (!isCreate) {
       // For append or recovery, we must enforce the existing checksum.
       // Also, verify that the file has correct lengths, etc.
       boolean checkedMeta = false;
       try {
-        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+        BlockMetadataHeader header =
+            BlockMetadataHeader.readHeader(metaRAF);
         checksum = header.getChecksum();
 
         if (checksum.getBytesPerChecksum() !=
@@ -302,20 +303,24 @@ public class LocalReplicaInPipeline extends LocalReplica
       checksum = requestedChecksum;
     }
 
+    final FileIoProvider fileIoProvider = getFileIoProvider();
     FileOutputStream blockOut = null;
     FileOutputStream crcOut = null;
     try {
-      blockOut = new FileOutputStream(
-          new RandomAccessFile(blockFile, "rw").getFD());
-      crcOut = new FileOutputStream(metaRAF.getFD());
+      blockOut = fileIoProvider.getFileOutputStream(
+          getVolume(),
+          fileIoProvider.getRandomAccessFile(getVolume(), blockFile, "rw")
+              .getFD());
+      crcOut = fileIoProvider.getFileOutputStream(getVolume(), metaRAF.getFD());
       if (!isCreate) {
         blockOut.getChannel().position(blockDiskSize);
         crcOut.getChannel().position(crcDiskSize);
       }
       return new ReplicaOutputStreams(blockOut, crcOut, checksum,
-          getVolume().isTransientStorage(), slowLogThresholdMs);
+          getVolume(), fileIoProvider);
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);
+      IOUtils.closeStream(crcOut);
       IOUtils.closeStream(metaRAF);
       throw e;
     }
@@ -326,11 +331,11 @@ public class LocalReplicaInPipeline extends LocalReplica
     File blockFile = getBlockFile();
     File restartMeta = new File(blockFile.getParent()  +
         File.pathSeparator + "." + blockFile.getName() + ".restart");
-    if (restartMeta.exists() && !restartMeta.delete()) {
+    if (!getFileIoProvider().deleteWithExistsCheck(getVolume(), restartMeta)) {
       DataNode.LOG.warn("Failed to delete restart meta file: " +
           restartMeta.getPath());
     }
-    return new FileOutputStream(restartMeta);
+    return getFileIoProvider().getFileOutputStream(getVolume(), restartMeta);
   }
 
   @Override
@@ -373,12 +378,14 @@ public class LocalReplicaInPipeline extends LocalReplica
           + " should be derived from LocalReplica");
     }
 
-    LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
-    File oldmeta = oldReplica.getMetaFile();
-    File newmeta = getMetaFile();
+    final LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
+    final File oldBlockFile = oldReplica.getBlockFile();
+    final File oldmeta = oldReplica.getMetaFile();
+    final File newmeta = getMetaFile();
+    final FileIoProvider fileIoProvider = getFileIoProvider();
 
     try {
-      oldReplica.renameMeta(newmeta);
+      fileIoProvider.rename(getVolume(), oldmeta, newmeta);
     } catch (IOException e) {
       throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
                             " Unable to move meta file  " + oldmeta +
@@ -386,10 +393,10 @@ public class LocalReplicaInPipeline extends LocalReplica
     }
 
     try {
-      oldReplica.renameBlock(newBlkFile);
+      fileIoProvider.rename(getVolume(), oldBlockFile, newBlkFile);
     } catch (IOException e) {
       try {
-        renameMeta(oldmeta);
+        fileIoProvider.rename(getVolume(), newmeta, oldmeta);
       } catch (IOException ex) {
         LOG.warn("Cannot move meta file " + newmeta +
             "back to the finalized directory " + oldmeta, ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 5fdbec0..efa6ea6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -69,13 +69,11 @@ public interface ReplicaInPipeline extends Replica {
    *
    * @param isCreate if it is for creation
    * @param requestedChecksum the checksum the writer would prefer to use
-   * @param slowLogThresholdMs slow io threshold for logging
    * @return output streams for writing
    * @throws IOException if any error occurs
    */
   public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum, long slowLogThresholdMs)
-      throws IOException;
+      DataChecksum requestedChecksum) throws IOException;
 
   /**
    * Create an output stream to write restart metadata in case of datanode


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message