hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [28/50] [abbrv] hadoop git commit: HDFS-10636. Modify ReplicaInfo to remove the assumption that replica metadata and data are stored in java.io.File. (Virajith Jalaparti via lei)
Date Wed, 14 Sep 2016 02:44:06 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e9f1dc1..54b2ce8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -22,12 +22,10 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileDescriptor;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.RandomAccessFile;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
@@ -53,10 +51,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -77,17 +73,13 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -192,13 +184,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      File blockfile = getFile(bpid, blkid, false);
-      if (blockfile == null) {
+      ReplicaInfo r = volumeMap.get(bpid, blkid);
+      if (r == null) {
         return null;
       }
-      final File metafile = FsDatasetUtil.findMetaFile(blockfile);
-      final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
-      return new Block(blkid, blockfile.length(), gs);
+      return new Block(blkid, r.getBytesOnDisk(), r.getGenerationStamp());
     }
   }
 
@@ -209,19 +199,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
     ReplicaInfo r = volumeMap.get(bpid, blockId);
-    if(r == null)
+    if (r == null) {
       return null;
+    }
     switch(r.getState()) {
     case FINALIZED:
-      return new FinalizedReplica((FinalizedReplica)r);
     case RBW:
-      return new ReplicaBeingWritten((ReplicaBeingWritten)r);
     case RWR:
-      return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r);
     case RUR:
-      return new ReplicaUnderRecovery((ReplicaUnderRecovery)r);
     case TEMPORARY:
-      return new ReplicaInPipeline((ReplicaInPipeline)r);
+      return new ReplicaBuilder(r.getState()).from(r).build();
     }
     return null;
   }
@@ -229,16 +216,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
       throws IOException {
-    File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
-    if (meta == null || !meta.exists()) {
+    ReplicaInfo info = getBlockReplica(b);
+    if (info == null || !info.metadataExists()) {
       return null;
     }
-    if (isNativeIOAvailable) {
-      return new LengthInputStream(
-          NativeIO.getShareDeleteFileInputStream(meta),
-          meta.length());
-    }
-    return new LengthInputStream(new FileInputStream(meta), meta.length());
+    return info.getMetadataInputStream(0);
   }
     
   final DataNode datanode;
@@ -738,62 +720,45 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public long getLength(ExtendedBlock b) throws IOException {
-    return getBlockFile(b).length();
+    return getBlockReplica(b).getBlockDataLength();
   }
 
   /**
    * Get File name for a given block.
    */
-  private File getBlockFile(ExtendedBlock b) throws IOException {
-    return getBlockFile(b.getBlockPoolId(), b.getBlockId());
+  private ReplicaInfo getBlockReplica(ExtendedBlock b) throws IOException {
+    return getBlockReplica(b.getBlockPoolId(), b.getBlockId());
   }
   
   /**
    * Get File name for a given block.
    */
-  File getBlockFile(String bpid, long blockId) throws IOException {
-    File f = validateBlockFile(bpid, blockId);
-    if(f == null) {
+  ReplicaInfo getBlockReplica(String bpid, long blockId) throws IOException {
+    ReplicaInfo r = validateBlockFile(bpid, blockId);
+    if (r == null) {
       throw new IOException("BlockId " + blockId + " is not valid.");
     }
-    return f;
-  }
-  
-  /**
-   * Return the File associated with a block, without first
-   * checking that it exists. This should be used when the
-   * next operation is going to open the file for read anyway,
-   * and thus the exists check is redundant.
-   *
-   * @param touch if true then update the last access timestamp of the
-   *              block. Currently used for blocks on transient storage.
-   */
-  private File getBlockFileNoExistsCheck(ExtendedBlock b,
-                                         boolean touch)
-      throws IOException {
-    final File f;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
-      f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
-    }
-    if (f == null) {
-      throw new IOException("Block " + b + " is not valid");
-    }
-    return f;
+    return r;
   }
 
   @Override // FsDatasetSpi
   public InputStream getBlockInputStream(ExtendedBlock b,
       long seekOffset) throws IOException {
-    File blockFile = getBlockFileNoExistsCheck(b, true);
-    if (isNativeIOAvailable) {
-      return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+
+    ReplicaInfo info;
+    synchronized(this) {
+      info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
+    }
+
+    if (info != null && info.getVolume().isTransientStorage()) {
+      ramDiskReplicaTracker.touch(b.getBlockPoolId(), b.getBlockId());
+      datanode.getMetrics().incrRamDiskBlocksReadHits();
+    }
+
+    if(info != null && info.blockDataExists()) {
+      return info.getDataInputStream(seekOffset);
     } else {
-      try {
-        return openAndSeek(blockFile, seekOffset);
-      } catch (FileNotFoundException fnfe) {
-        throw new IOException("Block " + b + " is not valid. " +
-            "Expected block file at " + blockFile + " does not exist.");
-      }
+      throw new IOException("No data exists for block " + b);
     }
   }
 
@@ -814,7 +779,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
     return info;
   }
-  
+
   /**
    * Get the meta info of a block stored in volumeMap. Block is looked up
    * without matching the generation stamp.
@@ -824,7 +789,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @throws ReplicaNotFoundException if no entry is in the map or 
    *                        there is a generation stamp mismatch
    */
-  private ReplicaInfo getReplicaInfo(String bpid, long blkid)
+  @VisibleForTesting
+  ReplicaInfo getReplicaInfo(String bpid, long blkid)
       throws ReplicaNotFoundException {
     ReplicaInfo info = volumeMap.get(bpid, blkid);
     if (info == null) {
@@ -833,7 +799,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
     return info;
   }
-  
+
   /**
    * Returns handles to the block file and its metadata file
    */
@@ -844,10 +810,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
-        InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
+        InputStream blockInStream = info.getDataInputStream(blkOffset);
         try {
-          InputStream metaInStream =
-              openAndSeek(info.getMetaFile(), metaOffset);
+          InputStream metaInStream = info.getMetadataInputStream(metaOffset);
           return new ReplicaInputStreams(blockInStream, metaInStream, ref);
         } catch (IOException e) {
           IOUtils.cleanup(null, blockInStream);
@@ -860,41 +825,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  private static FileInputStream openAndSeek(File file, long offset)
-      throws IOException {
-    RandomAccessFile raf = null;
-    try {
-      raf = new RandomAccessFile(file, "r");
-      if (offset > 0) {
-        raf.seek(offset);
-      }
-      return new FileInputStream(raf.getFD());
-    } catch(IOException ioe) {
-      IOUtils.cleanup(null, raf);
-      throw ioe;
-    }
-  }
-
-  static File moveBlockFiles(Block b, File srcfile, File destdir)
+  static File moveBlockFiles(Block b, ReplicaInfo replicaInfo, File destdir)
       throws IOException {
     final File dstfile = new File(destdir, b.getBlockName());
-    final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
     final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
     try {
-      NativeIO.renameTo(srcmeta, dstmeta);
+      replicaInfo.renameMeta(dstmeta.toURI());
     } catch (IOException e) {
       throw new IOException("Failed to move meta file for " + b
-          + " from " + srcmeta + " to " + dstmeta, e);
+          + " from " + replicaInfo.getMetadataURI() + " to " + dstmeta, e);
     }
     try {
-      NativeIO.renameTo(srcfile, dstfile);
+      replicaInfo.renameData(dstfile.toURI());
     } catch (IOException e) {
       throw new IOException("Failed to move block file for " + b
-          + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
+          + " from " + replicaInfo.getBlockURI() + " to "
+          + dstfile.getAbsolutePath(), e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta
-          + " and " + srcfile + " to " + dstfile);
+      LOG.debug("addFinalizedBlock: Moved " + replicaInfo.getMetadataURI()
+          + " to " + dstmeta + " and " + replicaInfo.getBlockURI()
+          + " to " + dstfile);
     }
     return dstfile;
   }
@@ -904,41 +855,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @return the new meta and block files.
    * @throws IOException
    */
-  static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
-      File srcFile, File destRoot, boolean calculateChecksum,
+  static File[] copyBlockFiles(long blockId, long genStamp,
+      ReplicaInfo srcReplica, File destRoot, boolean calculateChecksum,
       int smallBufferSize, final Configuration conf) throws IOException {
     final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
-    final File dstFile = new File(destDir, srcFile.getName());
+    // blockName is same as the filename for the block
+    final File dstFile = new File(destDir, srcReplica.getBlockName());
     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
-    return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum,
+    return copyBlockFiles(srcReplica, dstMeta, dstFile, calculateChecksum,
         smallBufferSize, conf);
   }
 
-  static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
+  static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
                                File dstFile, boolean calculateChecksum,
                                int smallBufferSize, final Configuration conf)
       throws IOException {
+
     if (calculateChecksum) {
-      computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize, conf);
+      computeChecksum(srcReplica, dstMeta, smallBufferSize, conf);
     } else {
       try {
-        Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
+        srcReplica.copyMetadata(dstMeta.toURI());
       } catch (IOException e) {
-        throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
+        throw new IOException("Failed to copy " + srcReplica + " metadata to "
+            + dstMeta, e);
       }
     }
-
     try {
-      Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
+      srcReplica.copyBlockdata(dstFile.toURI());
     } catch (IOException e) {
-      throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
+      throw new IOException("Failed to copy " + srcReplica + " block file to "
+          + dstFile, e);
     }
     if (LOG.isDebugEnabled()) {
       if (calculateChecksum) {
-        LOG.debug("Copied " + srcMeta + " to " + dstMeta
-            + " and calculated checksum");
+        LOG.debug("Copied " + srcReplica.getMetadataURI() + " meta to "
+            + dstMeta + " and calculated checksum");
       } else {
-        LOG.debug("Copied " + srcFile + " to " + dstFile);
+        LOG.debug("Copied " + srcReplica.getBlockURI() + " to " + dstFile);
       }
     }
     return new File[] {dstMeta, dstFile};
@@ -1002,18 +956,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
                                 FsVolumeReference volumeRef) throws
       IOException {
-    File oldBlockFile = replicaInfo.getBlockFile();
-    File oldMetaFile = replicaInfo.getMetaFile();
+
     FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
     // Copy files to temp dir first
     File[] blockFiles = copyBlockFiles(block.getBlockId(),
-        block.getGenerationStamp(), oldMetaFile, oldBlockFile,
+        block.getGenerationStamp(), replicaInfo,
         targetVolume.getTmpDir(block.getBlockPoolId()),
         replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
 
-    ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
-        replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
-        targetVolume, blockFiles[0].getParentFile(), 0);
+    ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
+        .setBlockId(replicaInfo.getBlockId())
+        .setGenerationStamp(replicaInfo.getGenerationStamp())
+        .setFsVolume(targetVolume)
+        .setDirectoryToUse(blockFiles[0].getParentFile())
+        .setBytesToReserve(0)
+        .build();
     newReplicaInfo.setNumBytes(blockFiles[1].length());
     // Finalize the copied files
     newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
@@ -1023,8 +980,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
     }
 
-    removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
-        oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+    removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
     return newReplicaInfo;
   }
 
@@ -1065,16 +1021,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Compute and store the checksum for a block file that does not already have
    * its checksum computed.
    *
-   * @param srcMeta source meta file, containing only the checksum header, not a
-   *     calculated checksum
+   * @param srcReplica source {@link ReplicaInfo}, containing only the checksum
+   *     header, not a calculated checksum
    * @param dstMeta destination meta file, into which this method will write a
    *     full computed checksum
-   * @param blockFile block file for which the checksum will be computed
+   * @param smallBufferSize buffer size to use
+   * @param conf the {@link Configuration}
    * @throws IOException
    */
-  private static void computeChecksum(File srcMeta, File dstMeta,
-      File blockFile, int smallBufferSize, final Configuration conf)
+  private static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
+      int smallBufferSize, final Configuration conf)
       throws IOException {
+    File srcMeta = new File(srcReplica.getMetadataURI());
     final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
         DFSUtilClient.getIoFileBufferSize(conf));
     final byte[] data = new byte[1 << 16];
@@ -1094,9 +1052,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       BlockMetadataHeader.writeHeader(metaOut, checksum);
 
       int offset = 0;
-      try (InputStream dataIn = isNativeIOAvailable ?
-          NativeIO.getShareDeleteFileInputStream(blockFile) :
-          new FileInputStream(blockFile)) {
+      try (InputStream dataIn = srcReplica.getDataInputStream(0)) {
 
         for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
           if (n > 0) {
@@ -1118,58 +1074,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
       metaOut.write(crcs, 0, 4);
     } finally {
-      IOUtils.cleanup(LOG, metaOut);
-    }
-  }
-
-  static private void truncateBlock(File blockFile, File metaFile,
-      long oldlen, long newlen) throws IOException {
-    LOG.info("truncateBlock: blockFile=" + blockFile
-        + ", metaFile=" + metaFile
-        + ", oldlen=" + oldlen
-        + ", newlen=" + newlen);
-
-    if (newlen == oldlen) {
-      return;
-    }
-    if (newlen > oldlen) {
-      throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
-          + ") to newlen (=" + newlen + ")");
-    }
-
-    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 
-    int checksumsize = dcs.getChecksumSize();
-    int bpc = dcs.getBytesPerChecksum();
-    long n = (newlen - 1)/bpc + 1;
-    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
-    long lastchunkoffset = (n - 1)*bpc;
-    int lastchunksize = (int)(newlen - lastchunkoffset); 
-    byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 
-
-    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
-    try {
-      //truncate blockFile 
-      blockRAF.setLength(newlen);
- 
-      //read last chunk
-      blockRAF.seek(lastchunkoffset);
-      blockRAF.readFully(b, 0, lastchunksize);
-    } finally {
-      blockRAF.close();
-    }
-
-    //compute checksum
-    dcs.update(b, 0, lastchunksize);
-    dcs.writeValue(b, 0, false);
-
-    //update metaFile 
-    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
-    try {
-      metaRAF.setLength(newmetalen);
-      metaRAF.seek(newmetalen - checksumsize);
-      metaRAF.write(b, 0, checksumsize);
-    } finally {
-      metaRAF.close();
+      IOUtils.cleanup(null, metaOut);
     }
   }
 
@@ -1202,10 +1107,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
-      ReplicaBeingWritten replica = null;
+      ReplicaInPipeline replica = null;
       try {
-        replica = append(b.getBlockPoolId(),
-            (FinalizedReplica) replicaInfo, newGS,
+        replica = append(b.getBlockPoolId(), replicaInfo, newGS,
             b.getNumBytes());
       } catch (IOException e) {
         IOUtils.cleanup(null, ref);
@@ -1227,70 +1131,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @throws IOException if moving the replica from finalized directory 
    *         to rbw directory fails
    */
-  private ReplicaBeingWritten append(String bpid,
-      FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
+  private ReplicaInPipeline append(String bpid,
+      ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       // If the block is cached, start uncaching it.
+      if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+        throw new IOException("Only a Finalized replica can be appended to; "
+            + "Replica with blk id " + replicaInfo.getBlockId() + " has state "
+            + replicaInfo.getState());
+      }
+      // If the block is cached, start uncaching it.
       cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
 
-      // If there are any hardlinks to the block, break them.  This ensures we
-      // are not appending to a file that is part of a previous/ directory.
+      // If there are any hardlinks to the block, break them.  This ensures
+      // we are not appending to a file that is part of a previous/ directory.
       replicaInfo.breakHardLinksIfNeeded();
 
-      // construct a RBW replica with the new GS
-      File blkfile = replicaInfo.getBlockFile();
-      FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
-      long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
-      if (v.getAvailable() < bytesReserved) {
-        throw new DiskOutOfSpaceException("Insufficient space for appending to "
-            + replicaInfo);
-      }
-      File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
-      File oldmeta = replicaInfo.getMetaFile();
-      ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
-          replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
-          v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
-      File newmeta = newReplicaInfo.getMetaFile();
-
-      // rename meta file to rbw directory
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-      }
-      try {
-        NativeIO.renameTo(oldmeta, newmeta);
-      } catch (IOException e) {
-        throw new IOException("Block " + replicaInfo + " reopen failed. " +
-            " Unable to move meta file  " + oldmeta +
-            " to rbw dir " + newmeta, e);
-      }
-
-      // rename block file to rbw directory
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Renaming " + blkfile + " to " + newBlkFile
-            + ", file length=" + blkfile.length());
-      }
-      try {
-        NativeIO.renameTo(blkfile, newBlkFile);
-      } catch (IOException e) {
-        try {
-          NativeIO.renameTo(newmeta, oldmeta);
-        } catch (IOException ex) {
-          LOG.warn("Cannot move meta file " + newmeta +
-              "back to the finalized directory " + oldmeta, ex);
-        }
-        throw new IOException("Block " + replicaInfo + " reopen failed. " +
-            " Unable to move block file " + blkfile +
-            " to rbw dir " + newBlkFile, e);
+      FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
+      ReplicaInPipeline rip = v.append(bpid, replicaInfo,
+          newGS, estimateBlockLen);
+      if (rip.getReplicaInfo().getState() != ReplicaState.RBW) {
+        throw new IOException("Append on block " + replicaInfo.getBlockId() +
+            " returned a replica of state " + rip.getReplicaInfo().getState()
+            + "; expected RBW");
       }
-
       // Replace finalized replica by a RBW replica in replicas map
-      volumeMap.add(bpid, newReplicaInfo);
-      v.reserveSpaceForReplica(bytesReserved);
-      return newReplicaInfo;
+      volumeMap.add(bpid, rip.getReplicaInfo());
+      return rip;
     }
   }
 
+  @SuppressWarnings("serial")
   private static class MustStopExistingWriter extends Exception {
     private final ReplicaInPipeline rip;
 
@@ -1298,7 +1170,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       this.rip = rip;
     }
 
-    ReplicaInPipeline getReplica() {
+    ReplicaInPipeline getReplicaInPipeline() {
       return rip;
     }
   }
@@ -1327,7 +1199,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // stop the previous writer before check a replica's length
     long replicaLen = replicaInfo.getNumBytes();
     if (replicaInfo.getState() == ReplicaState.RBW) {
-      ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+      ReplicaInPipeline rbw = (ReplicaInPipeline) replicaInfo;
       if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
         throw new MustStopExistingWriter(rbw);
       }
@@ -1360,17 +1232,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         try (AutoCloseableLock lock = datasetLock.acquire()) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
-
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
-          ReplicaBeingWritten replica;
+          ReplicaInPipeline replica;
           try {
             // change the replica's state/gs etc.
             if (replicaInfo.getState() == ReplicaState.FINALIZED) {
-              replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
+              replica = append(b.getBlockPoolId(), replicaInfo,
                                newGS, b.getNumBytes());
             } else { //RBW
-              bumpReplicaGS(replicaInfo, newGS);
-              replica = (ReplicaBeingWritten) replicaInfo;
+              replicaInfo.bumpReplicaGS(newGS);
+              replica = (ReplicaInPipeline) replicaInfo;
             }
           } catch (IOException e) {
             IOUtils.cleanup(null, ref);
@@ -1379,7 +1250,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           return new ReplicaHandler(replica, ref);
         }
       } catch (MustStopExistingWriter e) {
-        e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
+        e.getReplicaInPipeline()
+            .stopWriter(datanode.getDnConf().getXceiverStopTimeout());
       }
     }
   }
@@ -1394,7 +1266,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
-          bumpReplicaGS(replicaInfo, newGS);
+          replicaInfo.bumpReplicaGS(newGS);
           // finalize the replica if RBW
           if (replicaInfo.getState() == ReplicaState.RBW) {
             finalizeReplica(b.getBlockPoolId(), replicaInfo);
@@ -1402,40 +1274,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           return replicaInfo;
         }
       } catch (MustStopExistingWriter e) {
-        e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
+        e.getReplicaInPipeline()
+            .stopWriter(datanode.getDnConf().getXceiverStopTimeout());
       }
     }
   }
   
-  /**
-   * Bump a replica's generation stamp to a new one.
-   * Its on-disk meta file name is renamed to be the new one too.
-   * 
-   * @param replicaInfo a replica
-   * @param newGS new generation stamp
-   * @throws IOException if rename fails
-   */
-  private void bumpReplicaGS(ReplicaInfo replicaInfo, 
-      long newGS) throws IOException { 
-    long oldGS = replicaInfo.getGenerationStamp();
-    File oldmeta = replicaInfo.getMetaFile();
-    replicaInfo.setGenerationStamp(newGS);
-    File newmeta = replicaInfo.getMetaFile();
-
-    // rename meta file to new GS
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-    }
-    try {
-      NativeIO.renameTo(oldmeta, newmeta);
-    } catch (IOException e) {
-      replicaInfo.setGenerationStamp(oldGS); // restore old GS
-      throw new IOException("Block " + replicaInfo + " reopen failed. " +
-                            " Unable to move meta file  " + oldmeta +
-                            " to " + newmeta, e);
-    }
-  }
-
   @Override // FsDatasetSpi
   public ReplicaHandler createRbw(
       StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
@@ -1482,18 +1326,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         datanode.getMetrics().incrRamDiskBlocksWriteFallback();
       }
 
-      File f;
+      ReplicaInPipeline newReplicaInfo;
       try {
-        f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+        newReplicaInfo = v.createRbw(b);
+        if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
+          throw new IOException("CreateRBW returned a replica of state "
+              + newReplicaInfo.getReplicaInfo().getState()
+              + " for block " + b.getBlockId());
+        }
       } catch (IOException e) {
         IOUtils.cleanup(null, ref);
         throw e;
       }
 
-      ReplicaBeingWritten newReplicaInfo =
-          new ReplicaBeingWritten(b.getBlockId(),
-          b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
-      volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+      volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
       return new ReplicaHandler(newReplicaInfo, ref);
     }
   }
@@ -1507,14 +1353,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     while (true) {
       try {
         try (AutoCloseableLock lock = datasetLock.acquire()) {
-          ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
-          
+          ReplicaInfo replicaInfo =
+              getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
           // check the replica's state
           if (replicaInfo.getState() != ReplicaState.RBW) {
             throw new ReplicaNotFoundException(
                 ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
           }
-          ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+          ReplicaInPipeline rbw = (ReplicaInPipeline)replicaInfo;
           if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
             throw new MustStopExistingWriter(rbw);
           }
@@ -1522,12 +1368,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
         }
       } catch (MustStopExistingWriter e) {
-        e.getReplica().stopWriter(datanode.getDnConf().getXceiverStopTimeout());
+        e.getReplicaInPipeline().stopWriter(
+            datanode.getDnConf().getXceiverStopTimeout());
       }
     }
   }
 
-  private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
+  private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
@@ -1551,20 +1398,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             minBytesRcvd + ", " + maxBytesRcvd + "].");
       }
 
-      FsVolumeReference ref = rbw.getVolume().obtainReference();
+      FsVolumeReference ref = rbw.getReplicaInfo()
+          .getVolume().obtainReference();
       try {
         // Truncate the potentially corrupt portion.
         // If the source was client and the last node in the pipeline was lost,
         // any corrupt data written after the acked length can go unnoticed.
         if (numBytes > bytesAcked) {
-          final File replicafile = rbw.getBlockFile();
-          truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+          rbw.getReplicaInfo().truncateBlock(bytesAcked);
           rbw.setNumBytes(bytesAcked);
           rbw.setLastChecksumAndDataLen(bytesAcked, null);
         }
 
         // bump the replica's generation stamp to newGS
-        bumpReplicaGS(rbw, newGS);
+        rbw.getReplicaInfo().bumpReplicaGS(newGS);
       } catch (IOException e) {
         IOUtils.cleanup(null, ref);
         throw e;
@@ -1576,6 +1423,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
+
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       final long blockId = b.getBlockId();
       final long expectedGs = b.getGenerationStamp();
@@ -1583,21 +1431,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       LOG.info("Convert " + b + " from Temporary to RBW, visible length="
           + visible);
 
-      final ReplicaInPipeline temp;
-
-      // get replica
-      final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
-      if (r == null) {
-        throw new ReplicaNotFoundException(
-            ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-      }
-      // check the replica's state
-      if (r.getState() != ReplicaState.TEMPORARY) {
-        throw new ReplicaAlreadyExistsException(
-            "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+      final ReplicaInfo temp;
+      {
+        // get replica
+        final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
+        if (r == null) {
+          throw new ReplicaNotFoundException(
+              ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+        }
+        // check the replica's state
+        if (r.getState() != ReplicaState.TEMPORARY) {
+          throw new ReplicaAlreadyExistsException(
+              "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+        }
+        temp = r;
       }
-      temp = (ReplicaInPipeline) r;
-
       // check generation stamp
       if (temp.getGenerationStamp() != expectedGs) {
         throw new ReplicaAlreadyExistsException(
@@ -1621,17 +1469,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         throw new IOException("r.getVolume() = null, temp=" + temp);
       }
 
-      // move block files to the rbw directory
-      BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
-      final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(),
-          bpslice.getRbwDir());
-      // create RBW
-      final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
-          blockId, numBytes, expectedGs,
-          v, dest.getParentFile(), Thread.currentThread(), 0);
-      rbw.setBytesAcked(visible);
+      final ReplicaInPipeline rbw = v.convertTemporaryToRbw(b, temp);
+
+      if(rbw.getState() != ReplicaState.RBW) {
+        throw new IOException("Expected replica state: " + ReplicaState.RBW
+            + " obtained " + rbw.getState() + " for converting block "
+            + b.getBlockId());
+      }
       // overwrite the RBW in the volume map
-      volumeMap.add(b.getBlockPoolId(), rbw);
+      volumeMap.add(b.getBlockPoolId(), rbw.getReplicaInfo());
       return rbw;
     }
   }
@@ -1653,22 +1499,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           FsVolumeReference ref =
               volumes.getNextVolume(storageType, b.getNumBytes());
           FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
-          // create a temporary file to hold block in the designated volume
-          File f;
+          ReplicaInPipeline newReplicaInfo;
           try {
-            f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+            newReplicaInfo = v.createTemporary(b);
           } catch (IOException e) {
             IOUtils.cleanup(null, ref);
             throw e;
           }
-          ReplicaInPipeline newReplicaInfo =
-              new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
-                  f.getParentFile(), b.getLocalBlock().getNumBytes());
-          volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+
+          volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
           return new ReplicaHandler(newReplicaInfo, ref);
         } else {
-          if (!(currentReplicaInfo.getGenerationStamp() < b
-              .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
+          if (!(currentReplicaInfo.getGenerationStamp() < b.getGenerationStamp()
+                && (currentReplicaInfo.getState() == ReplicaState.TEMPORARY
+                    || currentReplicaInfo.getState() == ReplicaState.RBW))) {
             throw new ReplicaAlreadyExistsException("Block " + b
                 + " already exists in state " + currentReplicaInfo.getState()
                 + " and thus cannot be created.");
@@ -1687,8 +1531,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       // Stop the previous writer
-      ((ReplicaInPipeline) lastFoundReplicaInfo)
-          .stopWriter(writerStopTimeoutMs);
+      ((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
     } while (true);
   }
 
@@ -1737,29 +1580,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
   }
-  
-  private FinalizedReplica finalizeReplica(String bpid,
+
+  private ReplicaInfo finalizeReplica(String bpid,
       ReplicaInfo replicaInfo) throws IOException {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      FinalizedReplica newReplicaInfo = null;
+      ReplicaInfo newReplicaInfo = null;
       if (replicaInfo.getState() == ReplicaState.RUR &&
-          ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
-              == ReplicaState.FINALIZED) {
-        newReplicaInfo = (FinalizedReplica)
-            ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
+          replicaInfo.getOriginalReplica().getState()
+          == ReplicaState.FINALIZED) {
+        newReplicaInfo = replicaInfo.getOriginalReplica();
       } else {
-        FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
-        File f = replicaInfo.getBlockFile();
+        FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
         if (v == null) {
-          throw new IOException("No volume for temporary file " + f +
-              " for block " + replicaInfo);
+          throw new IOException("No volume for block " + replicaInfo);
         }
 
-        File dest = v.addFinalizedBlock(
-            bpid, replicaInfo, f, replicaInfo.getBytesReserved());
-        newReplicaInfo =
-            new FinalizedReplica(replicaInfo, v, dest.getParentFile());
-
+        newReplicaInfo = v.addFinalizedBlock(
+            bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved());
         if (v.isTransientStorage()) {
           releaseLockedMemory(
               replicaInfo.getOriginalBytesReserved()
@@ -1770,8 +1607,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
         }
       }
+      assert newReplicaInfo.getState() == ReplicaState.FINALIZED
+          : "Replica should be finalized";
       volumeMap.add(bpid, newReplicaInfo);
-
       return newReplicaInfo;
     }
   }
@@ -1784,14 +1622,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getLocalBlock());
-      if (replicaInfo != null
-          && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+      if (replicaInfo != null &&
+          replicaInfo.getState() == ReplicaState.TEMPORARY) {
         // remove from volumeMap
         volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
 
         // delete the on-disk temp file
-        if (delBlockFromDisk(replicaInfo.getBlockFile(),
-            replicaInfo.getMetaFile(), b.getLocalBlock())) {
+        if (delBlockFromDisk(replicaInfo)) {
           LOG.warn("Block " + b + " unfinalized and removed. ");
         }
         if (replicaInfo.getVolume().isTransientStorage()) {
@@ -1804,23 +1641,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   /**
    * Remove a block from disk
-   * @param blockFile block file
-   * @param metaFile block meta file
-   * @param b a block
-   * @return true if on-disk files are deleted; false otherwise
+   * @param info the replica that needs to be deleted
+   * @return true if data for the replica are deleted; false otherwise
    */
-  private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
-    if (blockFile == null) {
-      LOG.warn("No file exists for block: " + b);
-      return true;
-    }
+  private boolean delBlockFromDisk(ReplicaInfo info) {
     
-    if (!blockFile.delete()) {
-      LOG.warn("Not able to delete the block file: " + blockFile);
+    if (!info.deleteBlockData()) {
+      LOG.warn("Not able to delete the block data for replica " + info);
       return false;
     } else { // remove the meta file
-      if (metaFile != null && !metaFile.delete()) {
-        LOG.warn("Not able to delete the meta block file: " + metaFile);
+      if (!info.deleteMetadata()) {
+        LOG.warn("Not able to delete the meta data for replica " + info);
         return false;
       }
     }
@@ -1859,20 +1690,19 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           continue;
         }
         switch(b.getState()) {
-          case FINALIZED:
-          case RBW:
-          case RWR:
-            builders.get(b.getVolume().getStorageID()).add(b);
-            break;
-          case RUR:
-            ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
-            builders.get(rur.getVolume().getStorageID())
-                .add(rur.getOriginalReplica());
-            break;
-          case TEMPORARY:
-            break;
-          default:
-            assert false : "Illegal ReplicaInfo state.";
+        case FINALIZED:
+        case RBW:
+        case RWR:
+          builders.get(b.getVolume().getStorageID()).add(b);
+          break;
+        case RUR:
+          ReplicaInfo orig = b.getOriginalReplica();
+          builders.get(b.getVolume().getStorageID()).add(orig);
+          break;
+        case TEMPORARY:
+          break;
+        default:
+          assert false : "Illegal ReplicaInfo state.";
         }
       }
     }
@@ -1889,13 +1719,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   @Override
-  public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      ArrayList<FinalizedReplica> finalized =
-          new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+      ArrayList<ReplicaInfo> finalized =
+          new ArrayList<ReplicaInfo>(volumeMap.size(bpid));
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
         if (b.getState() == ReplicaState.FINALIZED) {
-          finalized.add(new FinalizedReplica((FinalizedReplica) b));
+          finalized.add(new ReplicaBuilder(ReplicaState.FINALIZED)
+              .from(b).build());
         }
       }
       return finalized;
@@ -1906,15 +1737,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   @Override
-  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(
+  public List<ReplicaInfo> getFinalizedBlocksOnPersistentStorage(
       String bpid) {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      ArrayList<FinalizedReplica> finalized =
-          new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+      ArrayList<ReplicaInfo> finalized =
+          new ArrayList<ReplicaInfo>(volumeMap.size(bpid));
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
-        if (!b.getVolume().isTransientStorage() &&
+        if(!b.getVolume().isTransientStorage() &&
             b.getState() == ReplicaState.FINALIZED) {
-          finalized.add(new FinalizedReplica((FinalizedReplica) b));
+          finalized.add(new ReplicaBuilder(ReplicaState.FINALIZED)
+              .from(b).build());
         }
       }
       return finalized;
@@ -1951,8 +1783,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (replicaInfo.getState() != state) {
       throw new UnexpectedReplicaStateException(b,state);
     }
-    if (!replicaInfo.getBlockFile().exists()) {
-      throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
+    if (!replicaInfo.blockDataExists()) {
+      throw new FileNotFoundException(replicaInfo.getBlockURI().toString());
     }
     long onDiskLength = getLength(b);
     if (onDiskLength < minLength) {
@@ -1991,46 +1823,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   /**
    * Find the file corresponding to the block and return it if it exists.
    */
-  File validateBlockFile(String bpid, long blockId) {
+  ReplicaInfo validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
-    final File f;
+    final ReplicaInfo r;
     try (AutoCloseableLock lock = datasetLock.acquire()) {
-      f = getFile(bpid, blockId, false);
+      r = volumeMap.get(bpid, blockId);
     }
-    
-    if(f != null ) {
-      if(f.exists())
-        return f;
-   
+
+    if (r != null) {
+      if (r.blockDataExists()) {
+        return r;
+      }
       // if file is not null, but doesn't exist - possibly disk failed
       datanode.checkDiskErrorAsync();
     }
-    
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("blockId=" + blockId + ", f=" + f);
+      LOG.debug("blockId=" + blockId + ", replica=" + r);
     }
     return null;
   }
 
   /** Check the files of a replica. */
   static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
-    //check replica's file
-    final File f = r.getBlockFile();
-    if (!f.exists()) {
-      throw new FileNotFoundException("File " + f + " not found, r=" + r);
+    //check replica's data exists
+    if (!r.blockDataExists()) {
+      throw new FileNotFoundException("Block data not found, r=" + r);
     }
-    if (r.getBytesOnDisk() != f.length()) {
-      throw new IOException("File length mismatched.  The length of "
-          + f + " is " + f.length() + " but r=" + r);
+    if (r.getBytesOnDisk() != r.getBlockDataLength()) {
+      throw new IOException("Block length mismatch, len="
+          + r.getBlockDataLength() + " but r=" + r);
     }
 
     //check replica's meta file
-    final File metafile = FsDatasetUtil.getMetaFile(f, r.getGenerationStamp());
-    if (!metafile.exists()) {
-      throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
+    if (!r.metadataExists()) {
+      throw new IOException(r.getMetadataURI() + " does not exist, r=" + r);
     }
-    if (metafile.length() == 0) {
-      throw new IOException("Metafile " + metafile + " is empty, r=" + r);
+    if (r.getMetadataLength() == 0) {
+      throw new IOException("Metafile is empty, r=" + r);
     }
   }
 
@@ -2041,7 +1871,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
     final List<String> errors = new ArrayList<String>();
     for (int i = 0; i < invalidBlks.length; i++) {
-      final File f;
+      final ReplicaInfo removing;
       final FsVolumeImpl v;
       try (AutoCloseableLock lock = datasetLock.acquire()) {
         final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
@@ -2056,27 +1886,31 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               + ": GenerationStamp not matched, info=" + info);
           continue;
         }
-        f = info.getBlockFile();
         v = (FsVolumeImpl)info.getVolume();
         if (v == null) {
           errors.add("Failed to delete replica " + invalidBlks[i]
-              +  ". No volume for this replica, file=" + f);
+              +  ". No volume for replica " + info);
           continue;
         }
-        File parent = f.getParentFile();
-        if (parent == null) {
-          errors.add("Failed to delete replica " + invalidBlks[i]
-              +  ". Parent not found for file " + f);
-          continue;
+        try {
+          File blockFile = new File(info.getBlockURI());
+          if (blockFile != null && blockFile.getParentFile() == null) {
+            errors.add("Failed to delete replica " + invalidBlks[i]
+                +  ". Parent not found for block file: " + blockFile);
+            continue;
+          }
+        } catch(IllegalArgumentException e) {
+          LOG.warn("Parent directory check failed; replica " + info
+              + " is not backed by a local file");
         }
-        ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]);
+        removing = volumeMap.remove(bpid, invalidBlks[i]);
         addDeletingBlock(bpid, removing.getBlockId());
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Block file " + removing.getBlockFile().getName()
+          LOG.debug("Block file " + removing.getBlockURI()
               + " is to be deleted");
         }
-        if (removing instanceof ReplicaInPipelineInterface) {
-          ((ReplicaInPipelineInterface) removing).releaseAllBytesReserved();
+        if (removing instanceof ReplicaInPipeline) {
+          ((ReplicaInPipeline) removing).releaseAllBytesReserved();
         }
       }
 
@@ -2104,10 +1938,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // It's ok to unlink the block file before the uncache operation
       // finishes.
       try {
-        asyncDiskService.deleteAsync(v.obtainReference(), f,
-            FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
+        asyncDiskService.deleteAsync(v.obtainReference(), removing,
             new ExtendedBlock(bpid, invalidBlks[i]),
-            dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+            dataStorage.getTrashDirectoryForReplica(bpid, removing));
       } catch (ClosedChannelException e) {
         LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
             "block " + invalidBlks[i]);
@@ -2192,7 +2025,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           cacheManager.numBlocksFailedToCache.incrementAndGet();
         }
       }
-      blockFileName = info.getBlockFile().getAbsolutePath();
+      blockFileName = info.getBlockURI().toString();
       length = info.getVisibleLength();
       genstamp = info.getGenerationStamp();
       volumeExecutor = volume.getCacheExecutor();
@@ -2224,26 +2057,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public boolean contains(final ExtendedBlock block) {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       final long blockId = block.getLocalBlock().getBlockId();
-      return getFile(block.getBlockPoolId(), blockId, false) != null;
-    }
-  }
-
-  /**
-   * Turn the block identifier into a filename
-   * @param bpid Block pool Id
-   * @param blockId a block's id
-   * @return on disk data file path; null if the replica does not exist
-   */
-  File getFile(final String bpid, final long blockId, boolean touch) {
-    ReplicaInfo info = volumeMap.get(bpid, blockId);
-    if (info != null) {
-      if (touch && info.getVolume().isTransientStorage()) {
-        ramDiskReplicaTracker.touch(bpid, blockId);
-        datanode.getMetrics().incrRamDiskBlocksReadHits();
-      }
-      return info.getBlockFile();
+      final String bpid = block.getBlockPoolId();
+      final ReplicaInfo r = volumeMap.get(bpid, blockId);
+      return (r != null && r.blockDataExists());
     }
-    return null;    
   }
 
   /**
@@ -2373,7 +2190,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           }
           return;
         }
-        if (!memBlockInfo.getBlockFile().exists()) {
+        if (!memBlockInfo.blockDataExists()) {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
           volumeMap.remove(bpid, blockId);
@@ -2396,8 +2213,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
        */
       if (memBlockInfo == null) {
         // Block is missing in memory - add the block to volumeMap
-        ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
-            diskFile.length(), diskGS, vol, diskFile.getParentFile());
+        ReplicaInfo diskBlockInfo = new ReplicaBuilder(ReplicaState.FINALIZED)
+            .setBlockId(blockId)
+            .setLength(diskFile.length())
+            .setGenerationStamp(diskGS)
+            .setFsVolume(vol)
+            .setDirectoryToUse(diskFile.getParentFile())
+            .build();
         volumeMap.add(bpid, diskBlockInfo);
         if (vol.isTransientStorage()) {
           long lockedBytesReserved =
@@ -2413,21 +2235,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
        * Block exists in volumeMap and the block file exists on the disk
        */
       // Compare block files
-      File memFile = memBlockInfo.getBlockFile();
-      if (memFile.exists()) {
-        if (memFile.compareTo(diskFile) != 0) {
+      if (memBlockInfo.blockDataExists()) {
+        if (memBlockInfo.getBlockURI().compareTo(diskFile.toURI()) != 0) {
           if (diskMetaFile.exists()) {
-            if (memBlockInfo.getMetaFile().exists()) {
+            if (memBlockInfo.metadataExists()) {
               // We have two sets of block+meta files. Decide which one to
               // keep.
-              ReplicaInfo diskBlockInfo = new FinalizedReplica(
-                  blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile());
-              ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas(
-                  memBlockInfo, diskBlockInfo, volumeMap);
+              ReplicaInfo diskBlockInfo =
+                  new ReplicaBuilder(ReplicaState.FINALIZED)
+                    .setBlockId(blockId)
+                    .setLength(diskFile.length())
+                    .setGenerationStamp(diskGS)
+                    .setFsVolume(vol)
+                    .setDirectoryToUse(diskFile.getParentFile())
+                    .build();
+              ((FsVolumeImpl) vol).getBlockPoolSlice(bpid)
+                  .resolveDuplicateReplicas(
+                      memBlockInfo, diskBlockInfo, volumeMap);
             }
           } else {
             if (!diskFile.delete()) {
-              LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan");
+              LOG.warn("Failed to delete " + diskFile);
             }
           }
         }
@@ -2436,12 +2264,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         // Update the block with the file found on the disk. Since the block
         // file and metadata file are found as a pair on the disk, update
         // the block based on the metadata file found on the disk
-        LOG.warn("Block file in volumeMap "
-            + memFile.getAbsolutePath()
+        LOG.warn("Block file in replica "
+            + memBlockInfo.getBlockURI()
             + " does not exist. Updating it to the file found during scan "
             + diskFile.getAbsolutePath());
-        memBlockInfo.setDir(diskFile.getParentFile());
-        memFile = diskFile;
+        memBlockInfo.updateWithReplica(
+            StorageLocation.parse(diskFile.toString()));
 
         LOG.warn("Updating generation stamp for block " + blockId
             + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
@@ -2463,24 +2291,31 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Metadata file corresponding to block in memory is missing
           // If metadata file found during the scan is on the same directory
           // as the block file, then use the generation stamp from it
-          long gs = diskMetaFile != null && diskMetaFile.exists()
-              && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
-              : HdfsConstants.GRANDFATHER_GENERATION_STAMP;
-
-          LOG.warn("Updating generation stamp for block " + blockId
-              + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
-
-          memBlockInfo.setGenerationStamp(gs);
+          try {
+            File memFile = new File(memBlockInfo.getBlockURI());
+            long gs = diskMetaFile != null && diskMetaFile.exists()
+                && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
+                : HdfsConstants.GRANDFATHER_GENERATION_STAMP;
+
+            LOG.warn("Updating generation stamp for block " + blockId
+                + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
+
+            memBlockInfo.setGenerationStamp(gs);
+          } catch (IllegalArgumentException e) {
+            //exception arises because the URI cannot be converted to a file
+            LOG.warn("Block URI could not be resolved to a file", e);
+          }
         }
       }
 
       // Compare block size
-      if (memBlockInfo.getNumBytes() != memFile.length()) {
+      if (memBlockInfo.getNumBytes() != memBlockInfo.getBlockDataLength()) {
         // Update the length based on the block file
         corruptBlock = new Block(memBlockInfo);
         LOG.warn("Updating size of block " + blockId + " from "
-            + memBlockInfo.getNumBytes() + " to " + memFile.length());
-        memBlockInfo.setNumBytes(memFile.length());
+            + memBlockInfo.getNumBytes() + " to "
+            + memBlockInfo.getBlockDataLength());
+        memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength());
       }
     }
 
@@ -2531,7 +2366,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
         }
       } catch (MustStopExistingWriter e) {
-        e.getReplica().stopWriter(xceiverStopTimeout);
+        e.getReplicaInPipeline().stopWriter(xceiverStopTimeout);
       }
     }
   }
@@ -2549,20 +2384,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     //stop writer if there is any
-    if (replica instanceof ReplicaInPipeline) {
+    if (replica.getState() == ReplicaState.TEMPORARY ||
+        replica.getState() == ReplicaState.RBW) {
       final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
       if (!rip.attemptToSetWriter(null, Thread.currentThread())) {
         throw new MustStopExistingWriter(rip);
       }
 
       //check replica bytes on disk.
-      if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
+      if (replica.getBytesOnDisk() < replica.getVisibleLength()) {
         throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
-            + " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
+            + " getBytesOnDisk() < getVisibleLength(), rip=" + replica);
       }
 
       //check the replica's files
-      checkReplicaFiles(rip);
+      checkReplicaFiles(replica);
     }
 
     //check generation stamp
@@ -2580,9 +2416,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     //check RUR
-    final ReplicaUnderRecovery rur;
+    final ReplicaInfo rur;
     if (replica.getState() == ReplicaState.RUR) {
-      rur = (ReplicaUnderRecovery)replica;
+      rur = replica;
       if (rur.getRecoveryID() >= recoveryId) {
         throw new RecoveryInProgressException(
             "rur.getRecoveryID() >= recoveryId = " + recoveryId
@@ -2594,7 +2430,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + " from " + oldRecoveryID + " to " + recoveryId);
     }
     else {
-      rur = new ReplicaUnderRecovery(replica, recoveryId);
+      rur = new ReplicaBuilder(ReplicaState.RUR)
+          .from(replica).setRecoveryId(recoveryId).build();
       map.add(bpid, rur);
       LOG.info("initReplicaRecovery: changing replica state for "
           + block + " from " + replica.getState()
@@ -2640,8 +2477,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       checkReplicaFiles(replica);
 
       //update replica
-      final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
-              .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
+      final ReplicaInfo finalized = updateReplicaUnderRecovery(oldBlock
+          .getBlockPoolId(), replica, recoveryId,
           newBlockId, newlength);
 
       boolean copyTruncate = newBlockId != oldBlock.getBlockId();
@@ -2661,7 +2498,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             + ", len=" + oldBlock.getNumBytes()
             + ", finalized=" + finalized;
       }
-
       //check replica files after update
       checkReplicaFiles(finalized);
 
@@ -2669,9 +2505,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  private FinalizedReplica updateReplicaUnderRecovery(
+  private ReplicaInfo updateReplicaUnderRecovery(
                                           String bpid,
-                                          ReplicaUnderRecovery rur,
+                                          ReplicaInfo rur,
                                           long recoveryId,
                                           long newBlockId,
                                           long newlength) throws IOException {
@@ -2682,18 +2518,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId;
-    File blockFile;
-    File metaFile;
     // bump rur's GS to be recovery id
     if(!copyOnTruncate) {
-      bumpReplicaGS(rur, recoveryId);
-      blockFile = rur.getBlockFile();
-      metaFile = rur.getMetaFile();
-    } else {
-      File[] copiedReplicaFiles =
-          copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
-      blockFile = copiedReplicaFiles[1];
-      metaFile = copiedReplicaFiles[0];
+      rur.bumpReplicaGS(recoveryId);
     }
 
     //update length
@@ -2701,48 +2528,34 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throw new IOException("rur.getNumBytes() < newlength = " + newlength
           + ", rur=" + rur);
     }
+
     if (rur.getNumBytes() > newlength) {
-      rur.breakHardLinksIfNeeded();
-      truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
       if(!copyOnTruncate) {
+        rur.breakHardLinksIfNeeded();
+        rur.truncateBlock(newlength);
         // update RUR with the new length
         rur.setNumBytes(newlength);
       } else {
         // Copying block to a new block with new blockId.
         // Not truncating original block.
-        FsVolumeSpi volume = rur.getVolume();
-        String blockPath = blockFile.getAbsolutePath();
-        String volumePath = volume.getBasePath();
-        assert blockPath.startsWith(volumePath) :
-            "New block file: " + blockPath + " must be on " +
-                "same volume as recovery replica: " + volumePath;
-        ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
-            newBlockId, recoveryId, volume, blockFile.getParentFile(),
-            newlength);
+        FsVolumeImpl volume = (FsVolumeImpl) rur.getVolume();
+        ReplicaInPipeline newReplicaInfo = volume.updateRURCopyOnTruncate(
+            rur, bpid, newBlockId, recoveryId, newlength);
+        if (newReplicaInfo.getState() != ReplicaState.RBW) {
+          throw new IOException("Append on block " + rur.getBlockId()
+              + " returned a replica of state " + newReplicaInfo.getState()
+              + "; expected RBW");
+        }
+
         newReplicaInfo.setNumBytes(newlength);
-        volumeMap.add(bpid, newReplicaInfo);
-        finalizeReplica(bpid, newReplicaInfo);
+        volumeMap.add(bpid, newReplicaInfo.getReplicaInfo());
+        finalizeReplica(bpid, newReplicaInfo.getReplicaInfo());
       }
-   }
-
+    }
     // finalize the block
     return finalizeReplica(bpid, rur);
   }
 
-  private File[] copyReplicaWithNewBlockIdAndGS(
-      ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS)
-      throws IOException {
-    String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
-    FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
-    final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir();
-    final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
-    final File dstBlockFile = new File(destDir, blockFileName);
-    final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
-    return copyBlockFiles(replicaInfo.getMetaFile(),
-        replicaInfo.getBlockFile(),
-        dstMetaFile, dstBlockFile, true, smallBufferSize, conf);
-  }
-
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
@@ -2886,10 +2699,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
-    File datafile = getBlockFile(block);
-    File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
+    ReplicaInfo r = getBlockReplica(block);
+    File blockFile = new File(r.getBlockURI());
+    File metaFile = new File(r.getMetadataURI());
     BlockLocalPathInfo info = new BlockLocalPathInfo(block,
-        datafile.getAbsolutePath(), metafile.getAbsolutePath());
+        blockFile.getAbsolutePath(), metaFile.toString());
     return info;
   }
 
@@ -3001,8 +2815,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   private void removeOldReplica(ReplicaInfo replicaInfo,
-      ReplicaInfo newReplicaInfo, File blockFile, File metaFile,
-      long blockFileUsed, long metaFileUsed, final String bpid) {
+      ReplicaInfo newReplicaInfo, final String bpid) {
     // Before deleting the files from old storage we must notify the
     // NN that the files are on the new storage. Else a blockReport from
     // the transient storage might cause the NN to think the blocks are lost.
@@ -3019,11 +2832,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         newReplicaInfo.isOnTransientStorage());
 
     // Remove the old replicas
-    if (blockFile.delete() || !blockFile.exists()) {
+    if (replicaInfo.deleteBlockData() || !replicaInfo.blockDataExists()) {
       FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
-      volume.onBlockFileDeletion(bpid, blockFileUsed);
-      if (metaFile.delete() || !metaFile.exists()) {
-        volume.onMetaFileDeletion(bpid, metaFileUsed);
+      volume.onBlockFileDeletion(bpid, replicaInfo.getBytesOnDisk());
+      if (replicaInfo.deleteMetadata() || !replicaInfo.metadataExists()) {
+        volume.onMetaFileDeletion(bpid, replicaInfo.getMetadataLength());
       }
     }
 
@@ -3083,8 +2896,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               asyncLazyPersistService.submitLazyPersistTask(
                   block.getBlockPoolId(), block.getBlockId(),
                   replicaInfo.getGenerationStamp(), block.getCreationTime(),
-                  replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
-                  targetReference);
+                  replicaInfo, targetReference);
             }
           }
         }
@@ -3122,18 +2934,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         }
 
         ReplicaInfo replicaInfo, newReplicaInfo;
-        File blockFile, metaFile;
-        long blockFileUsed, metaFileUsed;
         final String bpid = replicaState.getBlockPoolId();
 
         try (AutoCloseableLock lock = datasetLock.acquire()) {
           replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
                                        replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
-          blockFile = replicaInfo.getBlockFile();
-          metaFile = replicaInfo.getMetaFile();
-          blockFileUsed = blockFile.length();
-          metaFileUsed = metaFile.length();
           ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
               replicaState.getBlockId(), false);
 
@@ -3141,16 +2947,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // the target volume
           BlockPoolSlice bpSlice =
               replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
-          File newBlockFile = bpSlice.activateSavedReplica(
-              replicaInfo, replicaState.getSavedMetaFile(),
-              replicaState.getSavedBlockFile());
 
           newReplicaInfo =
-              new FinalizedReplica(replicaInfo.getBlockId(),
-                                   replicaInfo.getBytesOnDisk(),
-                                   replicaInfo.getGenerationStamp(),
-                                   replicaState.getLazyPersistVolume(),
-                                   newBlockFile.getParentFile());
+              bpSlice.activateSavedReplica(replicaInfo, replicaState);
 
           // Update the volumeMap entry.
           volumeMap.add(bpid, newReplicaInfo);
@@ -3165,8 +2964,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
           // Delete the block+meta files from RAM disk and release locked
           // memory.
-          removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
-              blockFileUsed, metaFileUsed, bpid);
+          removeOldReplica(replicaInfo, newReplicaInfo, bpid);
         }
       }
     }
@@ -3205,16 +3003,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (!blockPinningEnabled) {
       return;
     }
-
-    File f = getBlockFile(block);
-    Path p = new Path(f.getAbsolutePath());
     
-    FsPermission oldPermission = localFS.getFileStatus(
-        new Path(f.getAbsolutePath())).getPermission();
-    //sticky bit is used for pinning purpose
-    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
-        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
-    localFS.setPermission(p, permission);
+    ReplicaInfo r = getBlockReplica(block);
+    r.setPinning(localFS);
   }
   
   @Override
@@ -3222,10 +3013,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (!blockPinningEnabled) {
       return  false;
     }
-    File f = getBlockFile(block);
-        
-    FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
-    return fss.getPermission().getStickyBit();
+    ReplicaInfo r = getBlockReplica(block);
+    return r.getPinning(localFS);
   }
   
   @Override
@@ -3308,10 +3097,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       for (String blockPoolId : volumeMap.getBlockPoolList()) {
         Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
         for (ReplicaInfo replicaInfo : replicas) {
-          if (replicaInfo instanceof ReplicaInPipeline
+          if ((replicaInfo.getState() == ReplicaState.TEMPORARY
+              || replicaInfo.getState() == ReplicaState.RBW)
               && replicaInfo.getVolume().equals(volume)) {
-            ReplicaInPipeline replicaInPipeline
-                = (ReplicaInPipeline) replicaInfo;
+            ReplicaInPipeline replicaInPipeline =
+                (ReplicaInPipeline) replicaInfo;
             replicaInPipeline.interruptThread();
           }
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index f695c8c..a4d433d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -18,14 +18,17 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.io.IOUtils;
 
 /** Utility methods. */
 @InterfaceAudience.Private
@@ -71,6 +74,21 @@ public class FsDatasetUtil {
     return matches[0];
   }
 
+  public static FileInputStream openAndSeek(File file, long offset)
+      throws IOException {
+    RandomAccessFile raf = null;
+    try {
+      raf = new RandomAccessFile(file, "r");
+      if (offset > 0) {
+        raf.seek(offset);
+      }
+      return new FileInputStream(raf.getFD());
+    } catch(IOException ioe) {
+      IOUtils.cleanup(null, raf);
+      throw ioe;
+    }
+  }
+
   /**
    * Find the meta-file for the specified block file
    * and then return the generation stamp from the name of the meta-file.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index afcc5dd..57fab66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -47,11 +47,19 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -102,7 +110,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   // Disk space reserved for blocks (RBW or Re-replicating) open for write.
   private AtomicLong reservedForReplicas;
   private long recentReserved = 0;
-
+  private final Configuration conf;
   // Capacity configured. This is useful when we want to
   // limit the visible capacity for tests. If negative, then we just
   // query from the filesystem.
@@ -130,6 +138,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.usage = new DF(parent, conf);
     this.storageType = storageType;
     this.configuredCapacity = -1;
+    this.conf = conf;
     cacheExecutor = initializeCacheExecutor(parent);
   }
 
@@ -896,10 +905,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
    * @return
    * @throws IOException
    */
-  File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved)
-      throws IOException {
+  ReplicaInfo addFinalizedBlock(String bpid, Block b, ReplicaInfo replicaInfo,
+      long bytesReserved) throws IOException {
     releaseReservedSpace(bytesReserved);
-    return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
+    File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo);
+    return new ReplicaBuilder(ReplicaState.FINALIZED)
+        .setBlock(replicaInfo)
+        .setFsVolume(this)
+        .setDirectoryToUse(dest.getParentFile())
+        .build();
   }
 
   Executor getCacheExecutor() {
@@ -950,18 +964,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
 
-  void addBlockPool(String bpid, Configuration conf) throws IOException {
-    addBlockPool(bpid, conf, null);
+  void addBlockPool(String bpid, Configuration c) throws IOException {
+    addBlockPool(bpid, c, null);
   }
 
-  void addBlockPool(String bpid, Configuration conf, Timer timer)
+  void addBlockPool(String bpid, Configuration c, Timer timer)
       throws IOException {
     File bpdir = new File(currentDir, bpid);
     BlockPoolSlice bp;
     if (timer == null) {
-      bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer());
+      bp = new BlockPoolSlice(bpid, this, bpdir, c, new Timer());
     } else {
-      bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer);
+      bp = new BlockPoolSlice(bpid, this, bpdir, c, timer);
     }
     bpSlices.put(bpid, bp);
   }
@@ -1053,5 +1067,127 @@ public class FsVolumeImpl implements FsVolumeSpi {
   DatanodeStorage toDatanodeStorage() {
     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
   }
+
+
+  public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
+      long newGS, long estimateBlockLen) throws IOException {
+
+    long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
+    if (getAvailable() < bytesReserved) {
+      throw new DiskOutOfSpaceException("Insufficient space for appending to "
+          + replicaInfo);
+    }
+
+    assert replicaInfo.getVolume() == this:
+      "The volume of the replica should be the same as this volume";
+
+    // construct a RBW replica with the new GS
+    File newBlkFile = new File(getRbwDir(bpid), replicaInfo.getBlockName());
+    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(replicaInfo.getBlockId())
+        .setLength(replicaInfo.getNumBytes())
+        .setGenerationStamp(newGS)
+        .setFsVolume(this)
+        .setDirectoryToUse(newBlkFile.getParentFile())
+        .setWriterThread(Thread.currentThread())
+        .setBytesToReserve(bytesReserved)
+        .buildLocalReplicaInPipeline();
+
+    // rename meta file to rbw directory
+    // rename block file to rbw directory
+    newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile);
+
+    reserveSpaceForReplica(bytesReserved);
+    return newReplicaInfo;
+  }
+
+  public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {
+
+    File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(b.getBlockId())
+        .setGenerationStamp(b.getGenerationStamp())
+        .setFsVolume(this)
+        .setDirectoryToUse(f.getParentFile())
+        .setBytesToReserve(b.getNumBytes())
+        .buildLocalReplicaInPipeline();
+    return newReplicaInfo;
+  }
+
+  public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b,
+      ReplicaInfo temp) throws IOException {
+
+    final long blockId = b.getBlockId();
+    final long expectedGs = b.getGenerationStamp();
+    final long visible = b.getNumBytes();
+    final long numBytes = temp.getNumBytes();
+
+    // move block files to the rbw directory
+    BlockPoolSlice bpslice = getBlockPoolSlice(b.getBlockPoolId());
+    final File dest = FsDatasetImpl.moveBlockFiles(b.getLocalBlock(), temp,
+        bpslice.getRbwDir());
+    // create RBW
+    final LocalReplicaInPipeline rbw = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(blockId)
+        .setLength(numBytes)
+        .setGenerationStamp(expectedGs)
+        .setFsVolume(this)
+        .setDirectoryToUse(dest.getParentFile())
+        .setWriterThread(Thread.currentThread())
+        .setBytesToReserve(0)
+        .buildLocalReplicaInPipeline();
+    rbw.setBytesAcked(visible);
+    return rbw;
+  }
+
+  public ReplicaInPipeline createTemporary(ExtendedBlock b) throws IOException {
+    // create a temporary file to hold block in the designated volume
+    File f = createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+    LocalReplicaInPipeline newReplicaInfo =
+        new ReplicaBuilder(ReplicaState.TEMPORARY)
+          .setBlockId(b.getBlockId())
+          .setGenerationStamp(b.getGenerationStamp())
+          .setDirectoryToUse(f.getParentFile())
+          .setBytesToReserve(b.getLocalBlock().getNumBytes())
+          .setFsVolume(this)
+          .buildLocalReplicaInPipeline();
+    return newReplicaInfo;
+  }
+
+  public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur,
+      String bpid, long newBlockId, long recoveryId, long newlength)
+      throws IOException {
+
+    rur.breakHardLinksIfNeeded();
+    File[] copiedReplicaFiles =
+        copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
+    File blockFile = copiedReplicaFiles[1];
+    File metaFile = copiedReplicaFiles[0];
+    LocalReplica.truncateBlock(blockFile, metaFile,
+        rur.getNumBytes(), newlength);
+
+    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(newBlockId)
+        .setGenerationStamp(recoveryId)
+        .setFsVolume(this)
+        .setDirectoryToUse(blockFile.getParentFile())
+        .setBytesToReserve(newlength)
+        .buildLocalReplicaInPipeline();
+    return newReplicaInfo;
+  }
+
+  private File[] copyReplicaWithNewBlockIdAndGS(
+      ReplicaInfo replicaInfo, String bpid, long newBlkId, long newGS)
+      throws IOException {
+    String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
+    FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+    final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir();
+    final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
+    final File dstBlockFile = new File(destDir, blockFileName);
+    final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
+    return FsDatasetImpl.copyBlockFiles(replicaInfo, dstMetaFile,
+        dstBlockFile, true, DFSUtilClient.getSmallBufferSize(conf), conf);
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 634ad42..80d3736 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -311,7 +311,7 @@ class FsVolumeList {
     } else {
       // If the volume is not put into a volume scanner, it does not need to
       // hold the reference.
-      IOUtils.cleanup(FsDatasetImpl.LOG, ref);
+      IOUtils.cleanup(null, ref);
     }
     // If the volume is used to replace a failed volume, it needs to reset the
     // volume failure info for this volume.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 9b467ea..9e549f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 
 import java.io.File;
@@ -182,8 +183,7 @@ class RamDiskAsyncLazyPersistService {
    */
   void submitLazyPersistTask(String bpId, long blockId,
       long genStamp, long creationTime,
-      File metaFile, File blockFile,
-      FsVolumeReference target) throws IOException {
+      ReplicaInfo replica, FsVolumeReference target) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
           + bpId + " block id: " + blockId);
@@ -198,31 +198,29 @@ class RamDiskAsyncLazyPersistService {
     }
 
     ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
-        bpId, blockId, genStamp, creationTime, blockFile, metaFile,
+        bpId, blockId, genStamp, creationTime, replica,
         target, lazyPersistDir);
     execute(volume.getCurrentDir(), lazyPersistTask);
   }
 
   class ReplicaLazyPersistTask implements Runnable {
-    final String bpId;
-    final long blockId;
-    final long genStamp;
-    final long creationTime;
-    final File blockFile;
-    final File metaFile;
-    final FsVolumeReference targetVolume;
-    final File lazyPersistDir;
+    private final String bpId;
+    private final long blockId;
+    private final long genStamp;
+    private final long creationTime;
+    private final ReplicaInfo replicaInfo;
+    private final FsVolumeReference targetVolume;
+    private final File lazyPersistDir;
 
     ReplicaLazyPersistTask(String bpId, long blockId,
         long genStamp, long creationTime,
-        File blockFile, File metaFile,
+        ReplicaInfo replicaInfo,
         FsVolumeReference targetVolume, File lazyPersistDir) {
       this.bpId = bpId;
       this.blockId = blockId;
       this.genStamp = genStamp;
       this.creationTime = creationTime;
-      this.blockFile = blockFile;
-      this.metaFile = metaFile;
+      this.replicaInfo = replicaInfo;
       this.targetVolume = targetVolume;
       this.lazyPersistDir = lazyPersistDir;
     }
@@ -232,8 +230,10 @@ class RamDiskAsyncLazyPersistService {
       // Called in AsyncLazyPersistService.execute for displaying error messages.
       return "LazyWriter async task of persist RamDisk block pool id:"
           + bpId + " block pool id: "
-          + blockId + " with block file " + blockFile
-          + " and meta file " + metaFile + " to target volume " + targetVolume;}
+          + blockId + " with block file " + replicaInfo.getBlockURI()
+          + " and meta file " + replicaInfo.getMetadataURI()
+          + " to target volume " + targetVolume;
+    }
 
     @Override
     public void run() {
@@ -243,7 +243,7 @@ class RamDiskAsyncLazyPersistService {
         int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF);
         // No FsDatasetImpl lock for the file copy
         File targetFiles[] = FsDatasetImpl.copyBlockFiles(
-            blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
+            blockId, genStamp, replicaInfo, lazyPersistDir, true,
             smallBufferSize, conf);
 
         // Lock FsDataSetImpl during onCompleteLazyPersist callback


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message