hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject [2/2] git commit: HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.
Date Mon, 27 Oct 2014 18:51:04 GMT
HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.

(cherry picked from commit 463aec11718e47d4aabb86a7a539cb973460aae6)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3d67da50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3d67da50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3d67da50

Branch: refs/heads/branch-2
Commit: 3d67da502aa21d314d672f8b465d5415d77b5df0
Parents: 57e82e6
Author: cnauroth <cnauroth@apache.org>
Authored: Mon Oct 27 09:38:30 2014 -0700
Committer: cnauroth <cnauroth@apache.org>
Committed: Mon Oct 27 11:50:47 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/FSOutputSummer.java    |  17 +-
 .../main/java/org/apache/hadoop/fs/Options.java |  20 +-
 .../org/apache/hadoop/io/nativeio/NativeIO.java |   3 +-
 .../org/apache/hadoop/util/DataChecksum.java    |  18 +-
 .../main/java/org/apache/hadoop/util/Shell.java | 111 ++++++
 .../java/org/apache/hadoop/util/TestShell.java  |  20 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |  13 +-
 .../apache/hadoop/hdfs/BlockReaderLocal.java    |  29 +-
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     |  33 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   9 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  77 ++--
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 110 +++---
 .../hadoop/hdfs/protocol/LocatedBlock.java      |   7 +-
 .../server/datanode/BlockMetadataHeader.java    |  43 +-
 .../hdfs/server/datanode/BlockReceiver.java     | 115 +++---
 .../hdfs/server/datanode/BlockSender.java       |  50 ++-
 .../hdfs/server/datanode/ReplicaInPipeline.java |   7 +-
 .../fsdataset/ReplicaOutputStreams.java         |   9 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |   9 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  82 +++-
 .../impl/RamDiskAsyncLazyPersistService.java    |   2 +-
 .../impl/RamDiskReplicaLruTracker.java          |   4 +-
 .../server/datanode/SimulatedFSDataset.java     |   3 +-
 .../fsdataset/impl/LazyPersistTestCase.java     | 389 +++++++++++++++++++
 .../fsdataset/impl/TestLazyPersistFiles.java    | 326 ++--------------
 .../fsdataset/impl/TestScrLazyPersistFiles.java | 356 ++++++++---------
 27 files changed, 1155 insertions(+), 710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 19cbb6f..934421a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -51,7 +51,7 @@ abstract public class FSOutputSummer extends OutputStream {
   protected FSOutputSummer(DataChecksum sum) {
     this.sum = sum;
     this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
-    this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
+    this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS];
     this.count = 0;
   }
   
@@ -188,7 +188,12 @@ abstract public class FSOutputSummer extends OutputStream {
   protected synchronized int getBufferedDataSize() {
     return count;
   }
-  
+
+  /** @return the size for a checksum. */
+  protected int getChecksumSize() {
+    return sum.getChecksumSize();
+  }
+
   /** Generate checksums for the given data chunks and output chunks & checksums
    * to the underlying output stream.
    */
@@ -197,9 +202,8 @@ abstract public class FSOutputSummer extends OutputStream {
     sum.calculateChunkedSums(b, off, len, checksum, 0);
     for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
       int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
-      int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
-      writeChunk(b, off + i, chunkLen, checksum, ckOffset,
-          sum.getChecksumSize());
+      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
+      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
     }
   }
 
@@ -226,8 +230,7 @@ abstract public class FSOutputSummer extends OutputStream {
    */
   protected synchronized void setChecksumBufSize(int size) {
     this.buf = new byte[size];
-    this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
-        sum.getChecksumSize()];
+    this.checksum = new byte[sum.getChecksumSize(size)];
     this.count = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index e070943..da75d1c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -234,15 +234,14 @@ public final class Options {
    * This is used in FileSystem and FileContext to specify checksum options.
    */
   public static class ChecksumOpt {
-    private final int crcBlockSize;
-    private final DataChecksum.Type crcType;
+    private final DataChecksum.Type checksumType;
+    private final int bytesPerChecksum;
 
     /**
      * Create a uninitialized one
      */
     public ChecksumOpt() {
-      crcBlockSize = -1;
-      crcType = DataChecksum.Type.DEFAULT;
+      this(DataChecksum.Type.DEFAULT, -1);
     }
 
     /**
@@ -251,16 +250,21 @@ public final class Options {
      * @param size bytes per checksum
      */
     public ChecksumOpt(DataChecksum.Type type, int size) {
-      crcBlockSize = size;
-      crcType = type;
+      checksumType = type;
+      bytesPerChecksum = size;
     }
 
     public int getBytesPerChecksum() {
-      return crcBlockSize;
+      return bytesPerChecksum;
     }
 
     public DataChecksum.Type getChecksumType() {
-      return crcType;
+      return checksumType;
+    }
+    
+    @Override
+    public String toString() {
+      return checksumType + ":" + bytesPerChecksum;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 2400958..f0aca3a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -869,7 +869,8 @@ public class NativeIO {
    * @throws IOException
    */
   public static void copyFileUnbuffered(File src, File dst) throws IOException {
-    if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
+    if ((nativeLoaded) &&
+        (Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) {
       copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
     } else {
       FileUtils.copyFile(src, dst);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 9f0ee35..a38ec32 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -37,9 +37,6 @@ import org.apache.hadoop.fs.ChecksumException;
 @InterfaceStability.Evolving
 public class DataChecksum implements Checksum {
   
-  // Misc constants
-  public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len
-  
   // checksum types
   public static final int CHECKSUM_NULL    = 0;
   public static final int CHECKSUM_CRC32   = 1;
@@ -103,7 +100,7 @@ public class DataChecksum implements Checksum {
    * @return DataChecksum of the type in the array or null in case of an error.
    */
   public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
-    if ( offset < 0 || bytes.length < offset + HEADER_LEN ) {
+    if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
       return null;
     }
     
@@ -116,8 +113,8 @@ public class DataChecksum implements Checksum {
   }
   
   /**
-   * This constructucts a DataChecksum by reading HEADER_LEN bytes from
-   * input stream <i>in</i>
+   * This constructs a DataChecksum by reading HEADER_LEN bytes from input
+   * stream <i>in</i>
    */
   public static DataChecksum newDataChecksum( DataInputStream in )
                                  throws IOException {
@@ -141,7 +138,7 @@ public class DataChecksum implements Checksum {
   }
 
   public byte[] getHeader() {
-    byte[] header = new byte[DataChecksum.HEADER_LEN];
+    byte[] header = new byte[getChecksumHeaderSize()];
     header[0] = (byte) (type.id & 0xff);
     // Writing in buffer just like DataOutput.WriteInt()
     header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff);
@@ -229,13 +226,18 @@ public class DataChecksum implements Checksum {
     bytesPerChecksum = chunkSize;
   }
   
-  // Accessors
+  /** @return the checksum algorithm type. */
   public Type getChecksumType() {
     return type;
   }
+  /** @return the size for a checksum. */
   public int getChecksumSize() {
     return type.size;
   }
+  /** @return the required checksum size given the data length. */
+  public int getChecksumSize(int dataSize) {
+    return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize(); 
+  }
   public int getBytesPerChecksum() {
     return bytesPerChecksum;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index bd25b9d..e2c00d1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -377,6 +377,117 @@ abstract public class Shell {
     return winUtilsPath;
   }
 
+  public static class LinuxKernelVersion implements Comparable<LinuxKernelVersion>{
+    private final short major;
+    private final short minor;
+    private final short revision;
+
+    public LinuxKernelVersion(short major, short minor, short revision) {
+      this.major = major;
+      this.minor = minor;
+      this.revision = revision;
+    }
+
+    /**
+     * Parse Linux kernel version string from output of POSIX command 'uname -r'
+     * @param version version string from POSIX command 'uname -r'
+     * @return LinuxKernelVersion
+     * @throws IllegalArgumentException
+     *
+     * Note:
+     * On CentOS 5.8: '2.6.18-308.24.1.el5'
+     * On Ubuntu 14:  '3.13.0-32-generic'
+     */
+    public static LinuxKernelVersion parseLinuxKernelVersion(String version)
+        throws IllegalArgumentException {
+      if (version == null) {
+        throw new IllegalArgumentException();
+      }
+      String parts[] = version.split("-")[0].split("\\.");
+      if (parts.length != 3) {
+        throw new IllegalArgumentException(version);
+      }
+      short major = Short.parseShort(parts[0]);
+      short minor = Short.parseShort(parts[1]);
+      short revision = Short.parseShort(parts[2]);
+      return new LinuxKernelVersion(major, minor, revision);
+    }
+
+    @Override
+    public int compareTo(LinuxKernelVersion o) {
+      if (this.major == o.major) {
+        if (this.minor == o.minor) {
+          return this.revision - o.revision;
+        } else {
+          return this.minor - o.minor;
+        }
+      } else {
+        return this.major - o.major;
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+      if (!(other instanceof LinuxKernelVersion)) {
+        return false;
+      }
+      return compareTo((LinuxKernelVersion) other) == 0;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%d.%d.%d", major, minor, revision);
+    }
+
+    @Override
+    public int hashCode(){
+      int hash = 41;
+      hash = (19 * hash) + major;
+      hash = (53 * hash) + minor;
+      hash = (29 * hash) + revision;
+      return hash;
+    }
+  }
+
+  /*
+   * sendfile() API between two file descriptors
+   * is only supported on Linux Kernel version 2.6.33+
+   * according to http://man7.org/linux/man-pages/man2/sendfile.2.html
+   */
+  public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported();
+  private static LinuxKernelVersion minLkvSupportSendfile =
+      new LinuxKernelVersion((short)2, (short)6, (short)33);
+
+  private static boolean isLinuxSendfileSupported() {
+    if (!Shell.LINUX) {
+      return false;
+    }
+    ShellCommandExecutor shexec = null;
+    boolean sendfileSupported = false;
+    try {
+      String[] args = {"uname", "bash", "-r"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+      String version = shexec.getOutput();
+      LinuxKernelVersion lkv =
+          LinuxKernelVersion.parseLinuxKernelVersion(version);
+      if (lkv.compareTo(minLkvSupportSendfile) > 0) {
+        sendfileSupported = true;
+      }
+    } catch (Exception e) {
+      LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e);
+    } finally {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("uname exited with exit code "
+            + (shexec != null ? shexec.getExitCode() : "(null executor)"));
+      }
+    }
+    return sendfileSupported;
+  }
+
   public static final boolean isSetsidAvailable = isSetsidSupported();
   private static boolean isSetsidSupported() {
     if (Shell.WINDOWS) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
index d9dc9ef..19589f8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
@@ -165,4 +165,24 @@ public class TestShell extends TestCase {
       assertEquals(2, command.getRunCount());
     }
   }
+
+  public void testLinuxKernelVersion() throws IOException {
+    Shell.LinuxKernelVersion v2_6_18 =
+        new Shell.LinuxKernelVersion((short)2, (short)6, (short)18);
+    Shell.LinuxKernelVersion v2_6_32 =
+        new Shell.LinuxKernelVersion((short)2, (short)6, (short)32);
+    assertTrue(v2_6_18.compareTo(v2_6_32) < 0);
+  }
+
+  public void testParseLinuxKernelVersion() throws Exception {
+    String centOs58Ver = new String("2.6.18-308.24.1.el5");
+    String ubuntu14Ver = new String("3.13.0-32-generic");
+    Shell.LinuxKernelVersion lkvCentOs58 =
+        Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver);
+    Shell.LinuxKernelVersion lkvUnbuntu14 =
+        Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver);
+    assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0);
+    assertFalse(lkvUnbuntu14.equals(lkvCentOs58));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 78b058d..7e4a01c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -778,6 +778,9 @@ Release 2.6.0 - UNRELEASED
       HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
       (Xiaoyu Yao via cnauroth)
 
+      HDFS-6934. Move checksum computation off the hot path when writing to RAM
+      disk. (cnauroth)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 3fb442b..13e0a52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -110,6 +110,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
   private DatanodeInfo datanode;
 
   /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
+  /**
    * If false, we won't try short-circuit local reads.
    */
   private boolean allowShortCircuitLocalReads;
@@ -201,6 +206,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
     return this;
   }
 
+  public BlockReaderFactory setStorageType(StorageType storageType) {
+    this.storageType = storageType;
+    return this;
+  }
+
   public BlockReaderFactory setAllowShortCircuitLocalReads(
       boolean allowShortCircuitLocalReads) {
     this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
@@ -353,7 +363,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
     try {
       return BlockReaderLocalLegacy.newBlockReader(conf,
           userGroupInformation, configuration, fileName, block, token,
-          datanode, startOffset, length);
+          datanode, startOffset, length, storageType);
     } catch (RemoteException remoteException) {
       ioe = remoteException.unwrapRemoteException(
                 InvalidToken.class, AccessControlException.class);
@@ -415,6 +425,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
         setShortCircuitReplica(info.getReplica()).
         setVerifyChecksum(verifyChecksum).
         setCachingStrategy(cachingStrategy).
+        setStorageType(storageType).
         build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index 3954755..2a9ce96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -69,6 +69,7 @@ class BlockReaderLocal implements BlockReader {
     private ShortCircuitReplica replica;
     private long dataPos;
     private ExtendedBlock block;
+    private StorageType storageType;
 
     public Builder(Conf conf) {
       this.maxReadahead = Integer.MAX_VALUE;
@@ -109,6 +110,11 @@ class BlockReaderLocal implements BlockReader {
       return this;
     }
 
+    public Builder setStorageType(StorageType storageType) {
+      this.storageType = storageType;
+      return this;
+    }
+
     public BlockReaderLocal build() {
       Preconditions.checkNotNull(replica);
       return new BlockReaderLocal(this);
@@ -212,6 +218,11 @@ class BlockReaderLocal implements BlockReader {
    */
   private ByteBuffer checksumBuf;
 
+  /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
   private BlockReaderLocal(Builder builder) {
     this.replica = builder.replica;
     this.dataIn = replica.getDataStream().getChannel();
@@ -240,6 +251,7 @@ class BlockReaderLocal implements BlockReader {
       this.zeroReadaheadRequested = false;
     }
     this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+    this.storageType = builder.storageType;
   }
 
   private synchronized void createDataBufIfNeeded() {
@@ -333,8 +345,8 @@ class BlockReaderLocal implements BlockReader {
           int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
           checksumBuf.clear();
           checksumBuf.limit(checksumsNeeded * checksumSize);
-          long checksumPos =
-              7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+          long checksumPos = BlockMetadataHeader.getHeaderSize()
+              + ((startDataPos / bytesPerChecksum) * checksumSize);
           while (checksumBuf.hasRemaining()) {
             int nRead = checksumIn.read(checksumBuf, checksumPos);
             if (nRead < 0) {
@@ -359,7 +371,14 @@ class BlockReaderLocal implements BlockReader {
 
   private boolean createNoChecksumContext() {
     if (verifyChecksum) {
-      return replica.addNoChecksumAnchor();
+      if (storageType != null && storageType.isTransient()) {
+        // Checksums are not stored for replicas on transient storage.  We do not
+        // anchor, because we do not intend for client activity to block eviction
+        // from transient storage on the DataNode side.
+        return true;
+      } else {
+        return replica.addNoChecksumAnchor();
+      }
     } else {
       return true;
     }
@@ -367,7 +386,9 @@ class BlockReaderLocal implements BlockReader {
 
   private void releaseNoChecksumContext() {
     if (verifyChecksum) {
-      replica.removeNoChecksumAnchor();
+      if (storageType == null || !storageType.isTransient()) {
+        replica.removeNoChecksumAnchor();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index d42b860..f7ff94a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -181,7 +181,8 @@ class BlockReaderLocalLegacy implements BlockReader {
       UserGroupInformation userGroupInformation,
       Configuration configuration, String file, ExtendedBlock blk,
       Token<BlockTokenIdentifier> token, DatanodeInfo node, 
-      long startOffset, long length) throws IOException {
+      long startOffset, long length, StorageType storageType)
+      throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
@@ -192,7 +193,7 @@ class BlockReaderLocalLegacy implements BlockReader {
       }
       pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
           configuration, conf.socketTimeout, token,
-          conf.connectToDnViaHostname);
+          conf.connectToDnViaHostname, storageType);
     }
 
     // check to see if the file exists. It may so happen that the
@@ -204,7 +205,8 @@ class BlockReaderLocalLegacy implements BlockReader {
     FileInputStream dataIn = null;
     FileInputStream checksumIn = null;
     BlockReaderLocalLegacy localBlockReader = null;
-    boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
+    boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
+        storageType.isTransient();
     try {
       // get a local file system
       File blkfile = new File(pathinfo.getBlockPath());
@@ -221,15 +223,8 @@ class BlockReaderLocalLegacy implements BlockReader {
         File metafile = new File(pathinfo.getMetaPath());
         checksumIn = new FileInputStream(metafile);
 
-        // read and handle the common header here. For now just a version
-        BlockMetadataHeader header = BlockMetadataHeader
-            .readHeader(new DataInputStream(checksumIn));
-        short version = header.getVersion();
-        if (version != BlockMetadataHeader.VERSION) {
-          LOG.warn("Wrong version (" + version + ") for metadata file for "
-              + blk + " ignoring ...");
-        }
-        DataChecksum checksum = header.getChecksum();
+        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+            new DataInputStream(checksumIn), blk);
         long firstChunkOffset = startOffset
             - (startOffset % checksum.getBytesPerChecksum());
         localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
@@ -270,8 +265,8 @@ class BlockReaderLocalLegacy implements BlockReader {
   
   private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
       ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
-      throws IOException {
+      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
+      StorageType storageType) throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
     BlockLocalPathInfo pathinfo = null;
     ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
@@ -279,7 +274,15 @@ class BlockReaderLocalLegacy implements BlockReader {
     try {
       // make RPC to local datanode to find local pathnames of blocks
       pathinfo = proxy.getBlockLocalPathInfo(blk, token);
-      if (pathinfo != null) {
+      // We cannot cache the path information for a replica on transient storage.
+      // If the replica gets evicted, then it moves to a different path.  Then,
+      // our next attempt to read from the cached path would fail to find the
+      // file.  Additionally, the failure would cause us to disable legacy
+      // short-circuit read for all subsequent use in the ClientContext.  Unlike
+      // the newer short-circuit read implementation, we have no communication
+      // channel for the DataNode to notify the client that the path has been
+      // invalidated.  Therefore, our only option is to skip caching.
+      if (pathinfo != null && !storageType.isTransient()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Cached location of block " + blk + " as " + pathinfo);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 5142b9d..d0583f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -98,6 +98,7 @@ import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
@@ -526,8 +527,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       return createChecksum(null);
     }
 
-    private DataChecksum createChecksum(ChecksumOpt userOpt) 
-        throws IOException {
+    private DataChecksum createChecksum(ChecksumOpt userOpt) {
       // Fill in any missing field with the default.
       ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
           defaultChecksumOpt, userOpt);
@@ -535,8 +535,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
           myOpt.getChecksumType(),
           myOpt.getBytesPerChecksum());
       if (dataChecksum == null) {
-        throw new IOException("Invalid checksum type specified: "
-            + myOpt.getChecksumType().name());
+        throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+            + userOpt + ", default=" + defaultChecksumOpt
+            + ", effective=null");
       }
       return dataChecksum;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index e8bcfcc..e83f067 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -570,6 +571,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       DNAddrPair retval = chooseDataNode(targetBlock, null);
       chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
+      StorageType storageType = retval.storageType;
 
       try {
         ExtendedBlock blk = targetBlock.getBlock();
@@ -578,6 +580,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setInetSocketAddress(targetAddr).
             setRemotePeerFactory(dfsClient).
             setDatanodeInfo(chosenNode).
+            setStorageType(storageType).
             setFileName(src).
             setBlock(blk).
             setBlockToken(accessToken).
@@ -885,12 +888,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   private DNAddrPair chooseDataNode(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
     while (true) {
-      DatanodeInfo[] nodes = block.getLocations();
       try {
-        return getBestNodeDNAddrPair(nodes, ignoredNodes);
+        return getBestNodeDNAddrPair(block, ignoredNodes);
       } catch (IOException ie) {
-        String errMsg =
-          getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
+        String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+          deadNodes, ignoredNodes);
         String blockInfo = block.getBlock() + " file=" + src;
         if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
           String description = "Could not obtain block: " + blockInfo;
@@ -899,7 +901,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           throw new BlockMissingException(src, description,
               block.getStartOffset());
         }
-        
+
+        DatanodeInfo[] nodes = block.getLocations();
         if (nodes == null || nodes.length == 0) {
           DFSClient.LOG.info("No node available for " + blockInfo);
         }
@@ -933,22 +936,44 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
-   * Get the best node.
-   * @param nodes Nodes to choose from.
-   * @param ignoredNodes Do not chose nodes in this array (may be null)
+   * Get the best node from which to stream the data.
+   * @param block LocatedBlock, containing nodes in priority order.
+   * @param ignoredNodes Do not choose nodes in this array (may be null)
    * @return The DNAddrPair of the best node.
    * @throws IOException
    */
-  private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
+  private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
-    DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
+    DatanodeInfo[] nodes = block.getLocations();
+    StorageType[] storageTypes = block.getStorageTypes();
+    DatanodeInfo chosenNode = null;
+    StorageType storageType = null;
+    if (nodes != null) {
+      for (int i = 0; i < nodes.length; i++) {
+        if (!deadNodes.containsKey(nodes[i])
+            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
+          chosenNode = nodes[i];
+          // Storage types are ordered to correspond with nodes, so use the same
+          // index to get storage type.
+          if (storageTypes != null && i < storageTypes.length) {
+            storageType = storageTypes[i];
+          }
+          break;
+        }
+      }
+    }
+    if (chosenNode == null) {
+      throw new IOException("No live nodes contain block " + block.getBlock() +
+          " after checking nodes = " + Arrays.toString(nodes) +
+          ", ignoredNodes = " + ignoredNodes);
+    }
     final String dnAddr =
         chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
     }
     InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
-    return new DNAddrPair(chosenNode, targetAddr);
+    return new DNAddrPair(chosenNode, targetAddr, storageType);
   }
 
   private static String getBestNodeDNAddrPairErrorString(
@@ -1039,6 +1064,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       }
       DatanodeInfo chosenNode = datanode.info;
       InetSocketAddress targetAddr = datanode.addr;
+      StorageType storageType = datanode.storageType;
       BlockReader reader = null;
 
       try {
@@ -1049,6 +1075,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setInetSocketAddress(targetAddr).
             setRemotePeerFactory(dfsClient).
             setDatanodeInfo(chosenNode).
+            setStorageType(storageType).
             setFileName(src).
             setBlock(block.getBlock()).
             setBlockToken(blockToken).
@@ -1174,7 +1201,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         // If no nodes to do hedged reads against, pass.
         try {
           try {
-            chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+            chosenNode = getBestNodeDNAddrPair(block, ignored);
           } catch (IOException ioe) {
             chosenNode = chooseDataNode(block, ignored);
           }
@@ -1529,31 +1556,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     throw new IOException("Mark/reset not supported");
   }
 
-  /**
-   * Pick the best node from which to stream the data.
-   * Entries in <i>nodes</i> are already in the priority order
-   */
-  static DatanodeInfo bestNode(DatanodeInfo nodes[],
-      AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes,
-      Collection<DatanodeInfo> ignoredNodes) throws IOException {
-    if (nodes != null) {
-      for (int i = 0; i < nodes.length; i++) {
-        if (!deadNodes.containsKey(nodes[i])
-            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
-          return nodes[i];
-        }
-      }
-    }
-    throw new IOException("No live nodes contain current block");
-  }
-
   /** Utility class to encapsulate data node info and its address. */
-  static class DNAddrPair {
+  private static final class DNAddrPair {
     final DatanodeInfo info;
     final InetSocketAddress addr;
-    DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+    final StorageType storageType;
+
+    DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
+        StorageType storageType) {
       this.info = info;
       this.addr = addr;
+      this.storageType = storageType;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 60178c7..a83c854 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -42,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CreateFlag;
@@ -89,9 +91,9 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
-
 import org.htrace.Span;
 import org.htrace.Trace;
 import org.htrace.TraceScope;
@@ -148,7 +150,10 @@ public class DFSOutputStream extends FSOutputSummer
   private String src;
   private final long fileId;
   private final long blockSize;
-  private final DataChecksum checksum;
+  /** Only for DataTransferProtocol.writeBlock(..) */
+  private final DataChecksum checksum4WriteBlock;
+  private final int bytesPerChecksum; 
+
   // both dataQueue and ackQueue are protected by dataQueue lock
   private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
   private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
@@ -245,6 +250,9 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     void writeChecksum(byte[] inarray, int off, int len) {
+      if (len == 0) {
+        return;
+      }
       if (checksumPos + len > dataStart) {
         throw new BufferOverflowException();
       }
@@ -378,18 +386,11 @@ public class DFSOutputStream extends FSOutputSummer
     private final Span traceSpan;
 
     /**
-     * Default construction for file create
-     */
-    private DataStreamer() {
-      this(null, null);
-    }
-
-    /**
      * construction with tracing info
      */
     private DataStreamer(HdfsFileStatus stat, Span span) {
       isAppend = false;
-      isLazyPersistFile = initLazyPersist(stat);
+      isLazyPersistFile = isLazyPersist(stat);
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
       traceSpan = span;
     }
@@ -409,7 +410,7 @@ public class DFSOutputStream extends FSOutputSummer
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
       accessToken = lastBlock.getBlockToken();
-      isLazyPersistFile = initLazyPersist(stat);
+      isLazyPersistFile = isLazyPersist(stat);
       long usedInLastBlock = stat.getLen() % blockSize;
       int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -452,13 +453,6 @@ public class DFSOutputStream extends FSOutputSummer
 
       }
     }
-    
-    private boolean initLazyPersist(HdfsFileStatus stat) {
-      final BlockStoragePolicy lpPolicy = blockStoragePolicySuite
-          .getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
-      return lpPolicy != null &&
-             stat.getStoragePolicy() == lpPolicy.getId();
-    }
 
     private void setPipeline(LocatedBlock lb) {
       setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
@@ -553,7 +547,7 @@ public class DFSOutputStream extends FSOutputSummer
             }
             // get packet to be sent.
             if (dataQueue.isEmpty()) {
-              one = new Packet(checksum.getChecksumSize());  // heartbeat packet
+              one = new Packet(getChecksumSize());  // heartbeat packet
             } else {
               one = dataQueue.getFirst(); // regular data packet
             }
@@ -1408,8 +1402,8 @@ public class DFSOutputStream extends FSOutputSummer
           // send the request
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
-              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
-              cachingStrategy.get(), isLazyPersistFile);
+              nodes.length, block.getNumBytes(), bytesSent, newGS,
+              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1618,9 +1612,23 @@ public class DFSOutputStream extends FSOutputSummer
     return value;
   }
 
+  /** 
+   * @return the object for computing checksum.
+   *         The type is NULL if checksum is not computed.
+   */
+  private static DataChecksum getChecksum4Compute(DataChecksum checksum,
+      HdfsFileStatus stat) {
+    if (isLazyPersist(stat) && stat.getReplication() == 1) {
+      // do not compute checksum for writing to single replica to memory
+      return DataChecksum.newDataChecksum(Type.NULL,
+          checksum.getBytesPerChecksum());
+    }
+    return checksum;
+  }
+ 
   private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
       HdfsFileStatus stat, DataChecksum checksum) throws IOException {
-    super(checksum);
+    super(getChecksum4Compute(checksum, stat));
     this.dfsClient = dfsClient;
     this.src = src;
     this.fileId = stat.getFileId();
@@ -1635,15 +1643,18 @@ public class DFSOutputStream extends FSOutputSummer
           "Set non-null progress callback on DFSOutputStream " + src);
     }
     
-    final int bytesPerChecksum = checksum.getBytesPerChecksum();
-    if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
-      throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
-                            ") and blockSize(" + blockSize + 
-                            ") do not match. " + "blockSize should be a " +
-                            "multiple of io.bytes.per.checksum");
-                            
-    }
-    this.checksum = checksum;
+    this.bytesPerChecksum = checksum.getBytesPerChecksum();
+    if (bytesPerChecksum <= 0) {
+      throw new HadoopIllegalArgumentException(
+          "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
+    }
+    if (blockSize % bytesPerChecksum != 0) {
+      throw new HadoopIllegalArgumentException("Invalid values: "
+          + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+          + ") must divide block size (=" + blockSize + ").");
+    }
+    this.checksum4WriteBlock = checksum;
+
     this.dfsclientSlowLogThresholdMs =
       dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
   }
@@ -1655,8 +1666,7 @@ public class DFSOutputStream extends FSOutputSummer
     this(dfsClient, src, progress, stat, checksum);
     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
-    computePacketChunkSize(dfsClient.getConf().writePacketSize,
-        checksum.getBytesPerChecksum());
+    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
 
     Span traceSpan = null;
     if (Trace.isTracing()) {
@@ -1734,11 +1744,9 @@ public class DFSOutputStream extends FSOutputSummer
     if (lastBlock != null) {
       // indicate that we are appending to an existing block
       bytesCurBlock = lastBlock.getBlockSize();
-      streamer = new DataStreamer(lastBlock, stat,
-          checksum.getBytesPerChecksum(), traceSpan);
+      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
     } else {
-      computePacketChunkSize(dfsClient.getConf().writePacketSize,
-          checksum.getBytesPerChecksum());
+      computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
       streamer = new DataStreamer(stat, traceSpan);
     }
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@@ -1752,9 +1760,15 @@ public class DFSOutputStream extends FSOutputSummer
     out.start();
     return out;
   }
+  
+  private static boolean isLazyPersist(HdfsFileStatus stat) {
+    final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
+        HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
+    return p != null && stat.getStoragePolicy() == p.getId();
+  }
 
   private void computePacketChunkSize(int psize, int csize) {
-    int chunkSize = csize + checksum.getChecksumSize();
+    final int chunkSize = csize + getChecksumSize();
     chunksPerPacket = Math.max(psize/chunkSize, 1);
     packetSize = chunkSize*chunksPerPacket;
     if (DFSClient.LOG.isDebugEnabled()) {
@@ -1811,21 +1825,19 @@ public class DFSOutputStream extends FSOutputSummer
     dfsClient.checkOpen();
     checkClosed();
 
-    int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
     if (len > bytesPerChecksum) {
       throw new IOException("writeChunk() buffer size is " + len +
                             " is larger than supported  bytesPerChecksum " +
                             bytesPerChecksum);
     }
-    if (cklen != this.checksum.getChecksumSize()) {
+    if (cklen != 0 && cklen != getChecksumSize()) {
       throw new IOException("writeChunk() checksum size is supposed to be " +
-                            this.checksum.getChecksumSize() + 
-                            " but found to be " + cklen);
+                            getChecksumSize() + " but found to be " + cklen);
     }
 
     if (currentPacket == null) {
       currentPacket = new Packet(packetSize, chunksPerPacket, 
-          bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+          bytesCurBlock, currentSeqno++, getChecksumSize());
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.seqno +
@@ -1873,7 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer
       //
       if (bytesCurBlock == blockSize) {
         currentPacket = new Packet(0, 0, bytesCurBlock, 
-            currentSeqno++, this.checksum.getChecksumSize());
+            currentSeqno++, getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1967,7 +1979,7 @@ public class DFSOutputStream extends FSOutputSummer
             // but sync was requested.
             // Send an empty packet
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+                bytesCurBlock, currentSeqno++, getChecksumSize());
           }
         } else {
           if (isSync && bytesCurBlock > 0) {
@@ -1976,7 +1988,7 @@ public class DFSOutputStream extends FSOutputSummer
             // and sync was requested.
             // So send an empty sync packet.
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+                bytesCurBlock, currentSeqno++, getChecksumSize());
           } else {
             // just discard the current packet since it is already been sent.
             currentPacket = null;
@@ -2180,8 +2192,7 @@ public class DFSOutputStream extends FSOutputSummer
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(0, 0, bytesCurBlock, 
-            currentSeqno++, this.checksum.getChecksumSize());
+        currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }
@@ -2245,8 +2256,7 @@ public class DFSOutputStream extends FSOutputSummer
   @VisibleForTesting
   public synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
-    packetSize = (checksum.getBytesPerChecksum() + 
-                  checksum.getChecksumSize()) * chunksPerPacket;
+    packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
   }
 
   synchronized void setTestFilename(String newname) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 16bcc0b..30368f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -185,7 +186,11 @@ public class LocatedBlock {
         + "; getBlockSize()=" + getBlockSize()
         + "; corrupt=" + corrupt
         + "; offset=" + offset
-        + "; locs=" + java.util.Arrays.asList(locs)
+        + "; locs=" + Arrays.asList(locs)
+        + "; storageIDs=" +
+            (storageIDs != null ? Arrays.asList(storageIDs) : null)
+        + "; storageTypes=" +
+            (storageTypes != null ? Arrays.asList(storageTypes) : null)
         + "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index b86cad4..51a6134 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -29,10 +29,13 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.DataChecksum;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -46,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class BlockMetadataHeader {
+  private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
 
   public static final short VERSION = 1;
   
@@ -74,6 +78,37 @@ public class BlockMetadataHeader {
   }
 
   /**
+   * Read the checksum header from the meta file.
+   * @return the data checksum obtained from the header.
+   */
+  public static DataChecksum readDataChecksum(File metaFile) throws IOException {
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new BufferedInputStream(
+        new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE));
+      return readDataChecksum(in, metaFile);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  /**
+   * Read the checksum header from the meta input stream.
+   * @return the data checksum obtained from the header.
+   */
+  public static DataChecksum readDataChecksum(final DataInputStream metaIn,
+      final Object name) throws IOException {
+    // read and handle the common header here. For now just a version
+    final BlockMetadataHeader header = readHeader(metaIn);
+    if (header.getVersion() != VERSION) {
+      LOG.warn("Unexpected meta-file version for " + name
+          + ": version in file is " + header.getVersion()
+          + " but expected version is " + VERSION);
+    }
+    return header.getChecksum();
+  }
+
+  /**
    * Read the header without changing the position of the FileChannel.
    *
    * @param fc The FileChannel to read.
@@ -82,7 +117,7 @@ public class BlockMetadataHeader {
    */
   public static BlockMetadataHeader preadHeader(FileChannel fc)
       throws IOException {
-    byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
+    final byte arr[] = new byte[getHeaderSize()];
     ByteBuffer buf = ByteBuffer.wrap(arr);
 
     while (buf.hasRemaining()) {
@@ -158,7 +193,7 @@ public class BlockMetadataHeader {
    * Writes all the fields till the beginning of checksum.
    * @throws IOException on error
    */
-  static void writeHeader(DataOutputStream out, DataChecksum checksum)
+  public static void writeHeader(DataOutputStream out, DataChecksum checksum)
                          throws IOException {
     writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 3d497f5..2e388f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -82,12 +82,12 @@ class BlockReceiver implements Closeable {
    * checksum polynomial than the block is stored with on disk,
    * the DataNode needs to recalculate checksums before writing.
    */
-  private boolean needsChecksumTranslation;
+  private final boolean needsChecksumTranslation;
   private OutputStream out = null; // to block file at local disk
   private FileDescriptor outFd;
   private DataOutputStream checksumOut = null; // to crc file at local disk
-  private int bytesPerChecksum;
-  private int checksumSize;
+  private final int bytesPerChecksum;
+  private final int checksumSize;
   
   private final PacketReceiver packetReceiver = new PacketReceiver(false);
   
@@ -99,7 +99,6 @@ class BlockReceiver implements Closeable {
   private DataTransferThrottler throttler;
   private ReplicaOutputStreams streams;
   private DatanodeInfo srcDataNode = null;
-  private Checksum partialCrc = null;
   private final DataNode datanode;
   volatile private boolean mirrorError;
 
@@ -490,7 +489,7 @@ class BlockReceiver implements Closeable {
     long offsetInBlock = header.getOffsetInBlock();
     long seqno = header.getSeqno();
     boolean lastPacketInBlock = header.isLastPacketInBlock();
-    int len = header.getDataLen();
+    final int len = header.getDataLen();
     boolean syncBlock = header.getSyncBlock();
 
     // avoid double sync'ing on close
@@ -499,7 +498,7 @@ class BlockReceiver implements Closeable {
     }
 
     // update received bytes
-    long firstByteInBlock = offsetInBlock;
+    final long firstByteInBlock = offsetInBlock;
     offsetInBlock += len;
     if (replicaInfo.getNumBytes() < offsetInBlock) {
       replicaInfo.setNumBytes(offsetInBlock);
@@ -539,16 +538,15 @@ class BlockReceiver implements Closeable {
         flushOrSync(true);
       }
     } else {
-      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
-                                                            checksumSize;
+      final int checksumLen = diskChecksum.getChecksumSize(len);
+      final int checksumReceivedLen = checksumBuf.capacity();
 
-      if ( checksumBuf.capacity() != checksumLen) {
-        throw new IOException("Length of checksums in packet " +
-            checksumBuf.capacity() + " does not match calculated checksum " +
-            "length " + checksumLen);
+      if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
+        throw new IOException("Invalid checksum length: received length is "
+            + checksumReceivedLen + " but expected length is " + checksumLen);
       }
 
-      if (shouldVerifyChecksum()) {
+      if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
         try {
           verifyChunks(dataBuf, checksumBuf);
         } catch (IOException ioe) {
@@ -572,11 +570,17 @@ class BlockReceiver implements Closeable {
           translateChunks(dataBuf, checksumBuf);
         }
       }
+
+      if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
+        // checksum is missing, need to calculate it
+        checksumBuf = ByteBuffer.allocate(checksumLen);
+        diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
+      }
       
       // by this point, the data in the buffer uses the disk checksum
 
-      byte[] lastChunkChecksum;
-      
+      final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
+          && streams.isTransientStorage();
       try {
         long onDiskLen = replicaInfo.getBytesOnDisk();
         if (onDiskLen<offsetInBlock) {
@@ -588,14 +592,16 @@ class BlockReceiver implements Closeable {
           }
           
           // If this is a partial chunk, then read in pre-existing checksum
-          if (firstByteInBlock % bytesPerChecksum != 0) {
-            LOG.info("Packet starts at " + firstByteInBlock +
-                     " for " + block +
-                     " which is not a multiple of bytesPerChecksum " +
-                     bytesPerChecksum);
+          Checksum partialCrc = null;
+          if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("receivePacket for " + block 
+                  + ": bytesPerChecksum=" + bytesPerChecksum                  
+                  + " does not divide firstByteInBlock=" + firstByteInBlock);
+            }
             long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
                 onDiskLen / bytesPerChecksum * checksumSize;
-            computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
+            partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
           }
 
           int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
@@ -612,41 +618,40 @@ class BlockReceiver implements Closeable {
                 + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
           }
 
-          // If this is a partial chunk, then verify that this is the only
-          // chunk in the packet. Calculate new crc for this chunk.
-          if (partialCrc != null) {
+          final byte[] lastCrc;
+          if (shouldNotWriteChecksum) {
+            lastCrc = null;
+          } else if (partialCrc != null) {
+            // If this is a partial chunk, then verify that this is the only
+            // chunk in the packet. Calculate new crc for this chunk.
             if (len > bytesPerChecksum) {
-              throw new IOException("Got wrong length during writeBlock(" + 
-                                    block + ") from " + inAddr + " " +
-                                    "A packet can have only one partial chunk."+
-                                    " len = " + len + 
-                                    " bytesPerChecksum " + bytesPerChecksum);
+              throw new IOException("Unexpected packet data length for "
+                  +  block + " from " + inAddr + ": a partial chunk must be "
+                  + " sent in an individual packet (data length = " + len
+                  +  " > bytesPerChecksum = " + bytesPerChecksum + ")");
             }
             partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
-            lastChunkChecksum = Arrays.copyOfRange(
-              buf, buf.length - checksumSize, buf.length
-            );
+            lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
             checksumOut.write(buf);
             if(LOG.isDebugEnabled()) {
               LOG.debug("Writing out partial crc for data len " + len);
             }
             partialCrc = null;
           } else {
-            lastChunkChecksum = Arrays.copyOfRange(
-                checksumBuf.array(),
-                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
-                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
-            checksumOut.write(checksumBuf.array(),
-                checksumBuf.arrayOffset() + checksumBuf.position(),
-                checksumLen);
+            // write checksum
+            final int offset = checksumBuf.arrayOffset() +
+                checksumBuf.position();
+            final int end = offset + checksumLen;
+            lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
+                end);
+            checksumOut.write(checksumBuf.array(), offset, checksumLen);
           }
+
           /// flush entire packet, sync if requested
           flushOrSync(syncBlock);
           
-          replicaInfo.setLastChecksumAndDataLen(
-            offsetInBlock, lastChunkChecksum
-          );
+          replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
 
           datanode.metrics.incrBytesWritten(len);
 
@@ -686,6 +691,10 @@ class BlockReceiver implements Closeable {
     return lastPacketInBlock?-1:len;
   }
 
+  private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
+    return Arrays.copyOfRange(array, end - size, end);
+  }
+
   private void manageWriterOsCache(long offsetInBlock) {
     try {
       if (outFd != null &&
@@ -921,18 +930,19 @@ class BlockReceiver implements Closeable {
    * reads in the partial crc chunk and computes checksum
    * of pre-existing data in partial chunk.
    */
-  private void computePartialChunkCrc(long blkoff, long ckoff, 
-                                      int bytesPerChecksum) throws IOException {
+  private Checksum computePartialChunkCrc(long blkoff, long ckoff)
+      throws IOException {
 
     // find offset of the beginning of partial chunk.
     //
     int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
-    int checksumSize = diskChecksum.getChecksumSize();
     blkoff = blkoff - sizePartialChunk;
-    LOG.info("computePartialChunkCrc sizePartialChunk " + 
-              sizePartialChunk + " " + block +
-              " block offset " + blkoff +
-              " metafile offset " + ckoff);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("computePartialChunkCrc for " + block
+          + ": sizePartialChunk=" + sizePartialChunk
+          + ", block offset=" + blkoff
+          + ", metafile offset=" + ckoff);
+    }
 
     // create an input stream from the block file
     // and read in partial crc chunk into temporary buffer
@@ -951,10 +961,12 @@ class BlockReceiver implements Closeable {
     }
 
     // compute crc of partial chunk from data read in the block file.
-    partialCrc = DataChecksum.newDataChecksum(
+    final Checksum partialCrc = DataChecksum.newDataChecksum(
         diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
     partialCrc.update(buf, 0, sizePartialChunk);
-    LOG.info("Read in partial CRC chunk from disk for " + block);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Read in partial CRC chunk from disk for " + block);
+    }
 
     // paranoia! verify that the pre-computed crc matches what we
     // recalculated just now
@@ -965,6 +977,7 @@ class BlockReceiver implements Closeable {
                    checksum2long(crcbuf);
       throw new IOException(msg);
     }
+    return partialCrc;
   }
   
   private static enum PacketResponderType {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 0082fcd..ce0e1d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
@@ -265,26 +266,37 @@ class BlockSender implements java.io.Closeable {
        */
       DataChecksum csum = null;
       if (verifyChecksum || sendChecksum) {
-        final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
-        if (!corruptChecksumOk || metaIn != null) {
-          if (metaIn == null) {
-            //need checksum but meta-data not found
-            throw new FileNotFoundException("Meta-data not found for " + block);
-          }
-
-          checksumIn = new DataInputStream(
-              new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+        LengthInputStream metaIn = null;
+        boolean keepMetaInOpen = false;
+        try {
+          metaIn = datanode.data.getMetaDataInputStream(block);
+          if (!corruptChecksumOk || metaIn != null) {
+            if (metaIn == null) {
+              //need checksum but meta-data not found
+              throw new FileNotFoundException("Meta-data not found for " +
+                  block);
+            }
+
+            // The meta file will contain only the header if the NULL checksum
+            // type was used, or if the replica was written to transient storage.
+            // Checksum verification is not performed for replicas on transient
+            // storage.  The header is important for determining the checksum
+            // type later when lazy persistence copies the block to non-transient
+            // storage and computes the checksum.
+            if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
+              checksumIn = new DataInputStream(new BufferedInputStream(
+                  metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
   
-          // read and handle the common header here. For now just a version
-          BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-          short version = header.getVersion();
-          if (version != BlockMetadataHeader.VERSION) {
-            LOG.warn("Wrong version (" + version + ") for metadata file for "
-                + block + " ignoring ...");
+              csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
+              keepMetaInOpen = true;
+            }
+          } else {
+            LOG.warn("Could not find metadata file for " + block);
+          }
+        } finally {
+          if (!keepMetaInOpen) {
+            IOUtils.closeStream(metaIn);
           }
-          csum = header.getChecksum();
-        } else {
-          LOG.warn("Could not find metadata file for " + block);
         }
       }
       if (csum == null) {
@@ -343,7 +355,7 @@ class BlockSender implements java.io.Closeable {
       endOffset = end;
 
       // seek to the right offsets
-      if (offset > 0) {
+      if (offset > 0 && checksumIn != null) {
         long checksumSkip = (offset / chunkSize) * checksumSize;
         // note blockInStream is seeked when created below
         if (checksumSkip > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 45862ca..6a26640 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -213,7 +213,7 @@ public class ReplicaInPipeline extends ReplicaInfo
     
     // the checksum that should actually be used -- this
     // may differ from requestedChecksum for appends.
-    DataChecksum checksum;
+    final DataChecksum checksum;
     
     RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
     
@@ -250,7 +250,7 @@ public class ReplicaInPipeline extends ReplicaInfo
         }
       }
     } else {
-			// for create, we can use the requested checksum
+      // for create, we can use the requested checksum
       checksum = requestedChecksum;
     }
     
@@ -264,7 +264,8 @@ public class ReplicaInPipeline extends ReplicaInfo
         blockOut.getChannel().position(blockDiskSize);
         crcOut.getChannel().position(crcDiskSize);
       }
-      return new ReplicaOutputStreams(blockOut, crcOut, checksum);
+      return new ReplicaOutputStreams(blockOut, crcOut, checksum,
+          getVolume().isTransientStorage());
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);
       IOUtils.closeStream(metaRAF);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
index 95044c8..bd1461a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
@@ -32,16 +32,18 @@ public class ReplicaOutputStreams implements Closeable {
   private final OutputStream dataOut;
   private final OutputStream checksumOut;
   private final DataChecksum checksum;
+  private final boolean isTransientStorage;
 
   /**
    * Create an object with a data output stream, a checksum output stream
    * and a checksum.
    */
   public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
-      DataChecksum checksum) {
+      DataChecksum checksum, boolean isTransientStorage) {
     this.dataOut = dataOut;
     this.checksumOut = checksumOut;
     this.checksum = checksum;
+    this.isTransientStorage = isTransientStorage;
   }
 
   /** @return the data output stream. */
@@ -59,6 +61,11 @@ public class ReplicaOutputStreams implements Closeable {
     return checksum;
   }
 
+  /** @return is writing to a transient storage? */
+  public boolean isTransientStorage() {
+    return isTransientStorage;
+  }
+
   @Override
   public void close() {
     IOUtils.closeStream(dataOut);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index dce2ff8..e3d1607 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -599,13 +599,8 @@ class BlockPoolSlice {
               HdfsConstants.IO_FILE_BUFFER_SIZE));
 
       // read and handle the common header here. For now just a version
-      BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-      short version = header.getVersion();
-      if (version != BlockMetadataHeader.VERSION) {
-        FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file "
-            + metaFile + " ignoring ...");
-      }
-      DataChecksum checksum = header.getChecksum();
+      final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+          checksumIn, metaFile);
       int bytesPerChecksum = checksum.getBytesPerChecksum();
       int checksumSize = checksum.getChecksumSize();
       long numChunks = Math.min(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e77ea34..f130d05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileInputStream;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -92,6 +95,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -634,7 +638,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Get the meta info of a block stored in volumeMap. To find a block,
    * block pool Id, block Id and generation stamp must match.
    * @param b extended block
-   * @return the meta replica information; null if block was not found
+   * @return the meta replica information
    * @throws ReplicaNotFoundException if no entry is in the map or 
    *                        there is a generation stamp mismatch
    */
@@ -722,23 +726,80 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
     final File dstFile = new File(destDir, srcFile.getName());
     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
-    try {
-      Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
-    } catch (IOException e) {
-      throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
-    }
+    computeChecksum(srcMeta, dstMeta, srcFile);
+
     try {
       Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
     } catch (IOException e) {
       throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Copied " + srcMeta + " to " + dstMeta);
+      LOG.debug("Copied " + srcMeta + " to " + dstMeta +
+          " and calculated checksum");
       LOG.debug("Copied " + srcFile + " to " + dstFile);
     }
     return new File[] {dstMeta, dstFile};
   }
 
+  /**
+   * Compute and store the checksum for a block file that does not already have
+   * its checksum computed.
+   *
+   * @param srcMeta source meta file, containing only the checksum header, not a
+   *     calculated checksum
+   * @param dstMeta destination meta file, into which this method will write a
+   *     full computed checksum
+   * @param blockFile block file for which the checksum will be computed
+   * @throws IOException
+   */
+  private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
+      throws IOException {
+    final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
+    final byte[] data = new byte[1 << 16];
+    final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
+
+    DataOutputStream metaOut = null;
+    InputStream dataIn = null;
+    try {
+      File parentFile = dstMeta.getParentFile();
+      if (parentFile != null) {
+        if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
+          throw new IOException("Destination '" + parentFile
+              + "' directory cannot be created");
+        }
+      }
+      metaOut = new DataOutputStream(new BufferedOutputStream(
+          new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
+      BlockMetadataHeader.writeHeader(metaOut, checksum);
+
+      dataIn = isNativeIOAvailable ?
+          NativeIO.getShareDeleteFileInputStream(blockFile) :
+          new FileInputStream(blockFile);
+
+      int offset = 0;
+      for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
+        if (n > 0) {
+          n += offset;
+          offset = n % checksum.getBytesPerChecksum();
+          final int length = n - offset;
+
+          if (length > 0) {
+            checksum.calculateChunkedSums(data, 0, length, crcs, 0);
+            metaOut.write(crcs, 0, checksum.getChecksumSize(length));
+
+            System.arraycopy(data, length, data, 0, offset);
+          }
+        }
+      }
+
+      // calculate and write the last crc
+      checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
+      metaOut.write(crcs, 0, 4);
+    } finally {
+      IOUtils.cleanup(LOG, dataIn, metaOut);
+    }
+  }
+
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     LOG.info("truncateBlock: blockFile=" + blockFile
@@ -1641,6 +1702,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  @Override
   public boolean isCached(String bpid, long blockId) {
     return cacheManager.isCached(bpid, blockId);
   }
@@ -2556,8 +2618,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         // Before deleting the files from transient storage we must notify the
         // NN that the files are on the new storage. Else a blockReport from
         // the transient storage might cause the NN to think the blocks are lost.
+        // Replicas must be evicted from client short-circuit caches, because the
+        // storage will no longer be transient, and thus will require validating
+        // checksum.  This also stops a client from holding file descriptors,
+        // which would prevent the OS from reclaiming the memory.
         ExtendedBlock extendedBlock =
             new ExtendedBlock(bpid, newReplicaInfo);
+        datanode.getShortCircuitRegistry().processBlockInvalidation(
+            ExtendedBlockId.fromExtendedBlock(extendedBlock));
         datanode.notifyNamenodeReceivedBlock(
             extendedBlock, null, newReplicaInfo.getStorageUuid());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 76acbea..5fdcc2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -241,7 +241,7 @@ class RamDiskAsyncLazyPersistService {
       } catch (Exception e){
         FsDatasetImpl.LOG.warn(
             "LazyWriter failed to async persist RamDisk block pool id: "
-            + bpId + "block Id: " + blockId);
+            + bpId + "block Id: " + blockId, e);
       } finally {
         if (!succeeded) {
           datanode.getFSDataset().onFailLazyPersist(bpId, blockId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
index a843d9a..c01a6cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -168,9 +168,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
 
   @Override
   synchronized RamDiskReplicaLru getNextCandidateForEviction() {
-    Iterator it = replicasPersisted.values().iterator();
+    final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
     while (it.hasNext()) {
-      RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
+      final RamDiskReplicaLru ramDiskReplicaLru = it.next();
       it.remove();
 
       Map<Long, RamDiskReplicaLru> replicaMap =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d67da50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 0786bc6..83b476f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -248,7 +248,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
             + theBlock);
       } else {
         SimulatedOutputStream crcStream = new SimulatedOutputStream();
-        return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum);
+        return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
+            volume.isTransientStorage());
       }
     }
 


Mime
View raw message