Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A11261761B for ; Mon, 27 Oct 2014 16:41:47 +0000 (UTC) Received: (qmail 43835 invoked by uid 500); 27 Oct 2014 16:41:47 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 43579 invoked by uid 500); 27 Oct 2014 16:41:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 43563 invoked by uid 99); 27 Oct 2014 16:41:47 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Oct 2014 16:41:47 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 190A090A297; Mon, 27 Oct 2014 16:41:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cnauroth@apache.org To: common-commits@hadoop.apache.org Date: Mon, 27 Oct 2014 16:41:48 -0000 Message-Id: <0ede03d6aef4408598b41fd91813cc16@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth. HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/463aec11 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/463aec11 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/463aec11 Branch: refs/heads/trunk Commit: 463aec11718e47d4aabb86a7a539cb973460aae6 Parents: 0058ead Author: cnauroth Authored: Mon Oct 27 09:38:30 2014 -0700 Committer: cnauroth Committed: Mon Oct 27 09:38:30 2014 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/fs/FSOutputSummer.java | 17 +- .../main/java/org/apache/hadoop/fs/Options.java | 20 +- .../org/apache/hadoop/io/nativeio/NativeIO.java | 3 +- .../org/apache/hadoop/util/DataChecksum.java | 18 +- .../main/java/org/apache/hadoop/util/Shell.java | 111 ++++++ .../java/org/apache/hadoop/util/TestShell.java | 20 + hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/BlockReaderFactory.java | 13 +- .../apache/hadoop/hdfs/BlockReaderLocal.java | 29 +- .../hadoop/hdfs/BlockReaderLocalLegacy.java | 33 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 9 +- .../org/apache/hadoop/hdfs/DFSInputStream.java | 77 ++-- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 110 +++--- .../hadoop/hdfs/protocol/LocatedBlock.java | 7 +- .../server/datanode/BlockMetadataHeader.java | 43 +- .../hdfs/server/datanode/BlockReceiver.java | 115 +++--- .../hdfs/server/datanode/BlockSender.java | 50 ++- .../hdfs/server/datanode/ReplicaInPipeline.java | 7 +- .../fsdataset/ReplicaOutputStreams.java | 9 +- .../datanode/fsdataset/impl/BlockPoolSlice.java | 9 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 82 +++- .../impl/RamDiskAsyncLazyPersistService.java | 2 +- .../impl/RamDiskReplicaLruTracker.java | 4 +- .../server/datanode/SimulatedFSDataset.java | 3 +- .../fsdataset/impl/LazyPersistTestCase.java | 389 +++++++++++++++++++ .../fsdataset/impl/TestLazyPersistFiles.java | 326 ++-------------- .../fsdataset/impl/TestScrLazyPersistFiles.java | 356 ++++++++--------- 27 files changed, 1155 insertions(+), 710 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java index 19cbb6f..934421a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java @@ -51,7 +51,7 @@ abstract public class FSOutputSummer extends OutputStream { protected FSOutputSummer(DataChecksum sum) { this.sum = sum; this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS]; - this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS]; + this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS]; this.count = 0; } @@ -188,7 +188,12 @@ abstract public class FSOutputSummer extends OutputStream { protected synchronized int getBufferedDataSize() { return count; } - + + /** @return the size for a checksum. */ + protected int getChecksumSize() { + return sum.getChecksumSize(); + } + /** Generate checksums for the given data chunks and output chunks & checksums * to the underlying output stream. */ @@ -197,9 +202,8 @@ abstract public class FSOutputSummer extends OutputStream { sum.calculateChunkedSums(b, off, len, checksum, 0); for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); - int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize(); - writeChunk(b, off + i, chunkLen, checksum, ckOffset, - sum.getChecksumSize()); + int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); + writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize()); } } @@ -226,8 +230,7 @@ abstract public class FSOutputSummer extends OutputStream { */ protected synchronized void setChecksumBufSize(int size) { this.buf = new byte[size]; - this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) * - sum.getChecksumSize()]; + this.checksum = new byte[sum.getChecksumSize(size)]; this.count = 0; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java index e070943..da75d1c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java @@ -234,15 +234,14 @@ public final class Options { * This is used in FileSystem and FileContext to specify checksum options. */ public static class ChecksumOpt { - private final int crcBlockSize; - private final DataChecksum.Type crcType; + private final DataChecksum.Type checksumType; + private final int bytesPerChecksum; /** * Create a uninitialized one */ public ChecksumOpt() { - crcBlockSize = -1; - crcType = DataChecksum.Type.DEFAULT; + this(DataChecksum.Type.DEFAULT, -1); } /** @@ -251,16 +250,21 @@ public final class Options { * @param size bytes per checksum */ public ChecksumOpt(DataChecksum.Type type, int size) { - crcBlockSize = size; - crcType = type; + checksumType = type; + bytesPerChecksum = size; } public int getBytesPerChecksum() { - return crcBlockSize; + return bytesPerChecksum; } public DataChecksum.Type getChecksumType() { - return crcType; + return checksumType; + } + + @Override + public String toString() { + return checksumType + ":" + bytesPerChecksum; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 2400958..f0aca3a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -869,7 +869,8 @@ public class NativeIO { * @throws IOException */ public static void copyFileUnbuffered(File src, File dst) throws IOException { - if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) { + if ((nativeLoaded) && + (Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) { copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath()); } else { FileUtils.copyFile(src, dst); http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java index 9f0ee35..a38ec32 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java @@ -37,9 +37,6 @@ import org.apache.hadoop.fs.ChecksumException; @InterfaceStability.Evolving public class DataChecksum implements Checksum { - // Misc constants - public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len - // checksum types public static final int CHECKSUM_NULL = 0; public static final int CHECKSUM_CRC32 = 1; @@ -103,7 +100,7 @@ public class DataChecksum implements Checksum { * @return DataChecksum of the type in the array or null in case of an error. */ public static DataChecksum newDataChecksum( byte bytes[], int offset ) { - if ( offset < 0 || bytes.length < offset + HEADER_LEN ) { + if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) { return null; } @@ -116,8 +113,8 @@ public class DataChecksum implements Checksum { } /** - * This constructucts a DataChecksum by reading HEADER_LEN bytes from - * input stream in + * This constructs a DataChecksum by reading HEADER_LEN bytes from input + * stream in */ public static DataChecksum newDataChecksum( DataInputStream in ) throws IOException { @@ -141,7 +138,7 @@ public class DataChecksum implements Checksum { } public byte[] getHeader() { - byte[] header = new byte[DataChecksum.HEADER_LEN]; + byte[] header = new byte[getChecksumHeaderSize()]; header[0] = (byte) (type.id & 0xff); // Writing in buffer just like DataOutput.WriteInt() header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff); @@ -229,13 +226,18 @@ public class DataChecksum implements Checksum { bytesPerChecksum = chunkSize; } - // Accessors + /** @return the checksum algorithm type. */ public Type getChecksumType() { return type; } + /** @return the size for a checksum. */ public int getChecksumSize() { return type.size; } + /** @return the required checksum size given the data length. */ + public int getChecksumSize(int dataSize) { + return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize(); + } public int getBytesPerChecksum() { return bytesPerChecksum; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java index bd25b9d..e2c00d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java @@ -377,6 +377,117 @@ abstract public class Shell { return winUtilsPath; } + public static class LinuxKernelVersion implements Comparable{ + private final short major; + private final short minor; + private final short revision; + + public LinuxKernelVersion(short major, short minor, short revision) { + this.major = major; + this.minor = minor; + this.revision = revision; + } + + /** + * Parse Linux kernel version string from output of POSIX command 'uname -r' + * @param version version string from POSIX command 'uname -r' + * @return LinuxKernelVersion + * @throws IllegalArgumentException + * + * Note: + * On CentOS 5.8: '2.6.18-308.24.1.el5' + * On Ubuntu 14: '3.13.0-32-generic' + */ + public static LinuxKernelVersion parseLinuxKernelVersion(String version) + throws IllegalArgumentException { + if (version == null) { + throw new IllegalArgumentException(); + } + String parts[] = version.split("-")[0].split("\\."); + if (parts.length != 3) { + throw new IllegalArgumentException(version); + } + short major = Short.parseShort(parts[0]); + short minor = Short.parseShort(parts[1]); + short revision = Short.parseShort(parts[2]); + return new LinuxKernelVersion(major, minor, revision); + } + + @Override + public int compareTo(LinuxKernelVersion o) { + if (this.major == o.major) { + if (this.minor == o.minor) { + return this.revision - o.revision; + } else { + return this.minor - o.minor; + } + } else { + return this.major - o.major; + } + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof LinuxKernelVersion)) { + return false; + } + return compareTo((LinuxKernelVersion) other) == 0; + } + + @Override + public String toString() { + return String.format("%d.%d.%d", major, minor, revision); + } + + @Override + public int hashCode(){ + int hash = 41; + hash = (19 * hash) + major; + hash = (53 * hash) + minor; + hash = (29 * hash) + revision; + return hash; + } + } + + /* + * sendfile() API between two file descriptors + * is only supported on Linux Kernel version 2.6.33+ + * according to http://man7.org/linux/man-pages/man2/sendfile.2.html + */ + public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported(); + private static LinuxKernelVersion minLkvSupportSendfile = + new LinuxKernelVersion((short)2, (short)6, (short)33); + + private static boolean isLinuxSendfileSupported() { + if (!Shell.LINUX) { + return false; + } + ShellCommandExecutor shexec = null; + boolean sendfileSupported = false; + try { + String[] args = {"uname", "bash", "-r"}; + shexec = new ShellCommandExecutor(args); + shexec.execute(); + String version = shexec.getOutput(); + LinuxKernelVersion lkv = + LinuxKernelVersion.parseLinuxKernelVersion(version); + if (lkv.compareTo(minLkvSupportSendfile) > 0) { + sendfileSupported = true; + } + } catch (Exception e) { + LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e); + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug("uname exited with exit code " + + (shexec != null ? shexec.getExitCode() : "(null executor)")); + } + } + return sendfileSupported; + } + public static final boolean isSetsidAvailable = isSetsidSupported(); private static boolean isSetsidSupported() { if (Shell.WINDOWS) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java index d9dc9ef..19589f8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java @@ -165,4 +165,24 @@ public class TestShell extends TestCase { assertEquals(2, command.getRunCount()); } } + + public void testLinuxKernelVersion() throws IOException { + Shell.LinuxKernelVersion v2_6_18 = + new Shell.LinuxKernelVersion((short)2, (short)6, (short)18); + Shell.LinuxKernelVersion v2_6_32 = + new Shell.LinuxKernelVersion((short)2, (short)6, (short)32); + assertTrue(v2_6_18.compareTo(v2_6_32) < 0); + } + + public void testParseLinuxKernelVersion() throws Exception { + String centOs58Ver = new String("2.6.18-308.24.1.el5"); + String ubuntu14Ver = new String("3.13.0-32-generic"); + Shell.LinuxKernelVersion lkvCentOs58 = + Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver); + Shell.LinuxKernelVersion lkvUnbuntu14 = + Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver); + assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0); + assertFalse(lkvUnbuntu14.equals(lkvCentOs58)); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1f18ab1..1f2c630 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1274,6 +1274,9 @@ Release 2.6.0 - UNRELEASED HDFS-7090. Use unbuffered writes when persisting in-memory replicas. (Xiaoyu Yao via cnauroth) + HDFS-6934. Move checksum computation off the hot path when writing to RAM + disk. (cnauroth) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 3fb442b..13e0a52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -110,6 +110,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { private DatanodeInfo datanode; /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + /** * If false, we won't try short-circuit local reads. */ private boolean allowShortCircuitLocalReads; @@ -201,6 +206,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { return this; } + public BlockReaderFactory setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + public BlockReaderFactory setAllowShortCircuitLocalReads( boolean allowShortCircuitLocalReads) { this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; @@ -353,7 +363,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { try { return BlockReaderLocalLegacy.newBlockReader(conf, userGroupInformation, configuration, fileName, block, token, - datanode, startOffset, length); + datanode, startOffset, length, storageType); } catch (RemoteException remoteException) { ioe = remoteException.unwrapRemoteException( InvalidToken.class, AccessControlException.class); @@ -415,6 +425,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { setShortCircuitReplica(info.getReplica()). setVerifyChecksum(verifyChecksum). setCachingStrategy(cachingStrategy). + setStorageType(storageType). build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 3954755..2a9ce96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -69,6 +69,7 @@ class BlockReaderLocal implements BlockReader { private ShortCircuitReplica replica; private long dataPos; private ExtendedBlock block; + private StorageType storageType; public Builder(Conf conf) { this.maxReadahead = Integer.MAX_VALUE; @@ -109,6 +110,11 @@ class BlockReaderLocal implements BlockReader { return this; } + public Builder setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + public BlockReaderLocal build() { Preconditions.checkNotNull(replica); return new BlockReaderLocal(this); @@ -212,6 +218,11 @@ class BlockReaderLocal implements BlockReader { */ private ByteBuffer checksumBuf; + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + private BlockReaderLocal(Builder builder) { this.replica = builder.replica; this.dataIn = replica.getDataStream().getChannel(); @@ -240,6 +251,7 @@ class BlockReaderLocal implements BlockReader { this.zeroReadaheadRequested = false; } this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; + this.storageType = builder.storageType; } private synchronized void createDataBufIfNeeded() { @@ -333,8 +345,8 @@ class BlockReaderLocal implements BlockReader { int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; checksumBuf.clear(); checksumBuf.limit(checksumsNeeded * checksumSize); - long checksumPos = - 7 + ((startDataPos / bytesPerChecksum) * checksumSize); + long checksumPos = BlockMetadataHeader.getHeaderSize() + + ((startDataPos / bytesPerChecksum) * checksumSize); while (checksumBuf.hasRemaining()) { int nRead = checksumIn.read(checksumBuf, checksumPos); if (nRead < 0) { @@ -359,7 +371,14 @@ class BlockReaderLocal implements BlockReader { private boolean createNoChecksumContext() { if (verifyChecksum) { - return replica.addNoChecksumAnchor(); + if (storageType != null && storageType.isTransient()) { + // Checksums are not stored for replicas on transient storage. We do not + // anchor, because we do not intend for client activity to block eviction + // from transient storage on the DataNode side. + return true; + } else { + return replica.addNoChecksumAnchor(); + } } else { return true; } @@ -367,7 +386,9 @@ class BlockReaderLocal implements BlockReader { private void releaseNoChecksumContext() { if (verifyChecksum) { - replica.removeNoChecksumAnchor(); + if (storageType == null || !storageType.isTransient()) { + replica.removeNoChecksumAnchor(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index d42b860..f7ff94a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -181,7 +181,8 @@ class BlockReaderLocalLegacy implements BlockReader { UserGroupInformation userGroupInformation, Configuration configuration, String file, ExtendedBlock blk, Token token, DatanodeInfo node, - long startOffset, long length) throws IOException { + long startOffset, long length, StorageType storageType) + throws IOException { LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node .getIpcPort()); // check the cache first @@ -192,7 +193,7 @@ class BlockReaderLocalLegacy implements BlockReader { } pathinfo = getBlockPathInfo(userGroupInformation, blk, node, configuration, conf.socketTimeout, token, - conf.connectToDnViaHostname); + conf.connectToDnViaHostname, storageType); } // check to see if the file exists. It may so happen that the @@ -204,7 +205,8 @@ class BlockReaderLocalLegacy implements BlockReader { FileInputStream dataIn = null; FileInputStream checksumIn = null; BlockReaderLocalLegacy localBlockReader = null; - boolean skipChecksumCheck = conf.skipShortCircuitChecksums; + boolean skipChecksumCheck = conf.skipShortCircuitChecksums || + storageType.isTransient(); try { // get a local file system File blkfile = new File(pathinfo.getBlockPath()); @@ -221,15 +223,8 @@ class BlockReaderLocalLegacy implements BlockReader { File metafile = new File(pathinfo.getMetaPath()); checksumIn = new FileInputStream(metafile); - // read and handle the common header here. For now just a version - BlockMetadataHeader header = BlockMetadataHeader - .readHeader(new DataInputStream(checksumIn)); - short version = header.getVersion(); - if (version != BlockMetadataHeader.VERSION) { - LOG.warn("Wrong version (" + version + ") for metadata file for " - + blk + " ignoring ..."); - } - DataChecksum checksum = header.getChecksum(); + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + new DataInputStream(checksumIn), blk); long firstChunkOffset = startOffset - (startOffset % checksum.getBytesPerChecksum()); localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token, @@ -270,8 +265,8 @@ class BlockReaderLocalLegacy implements BlockReader { private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, - Token token, boolean connectToDnViaHostname) - throws IOException { + Token token, boolean connectToDnViaHostname, + StorageType storageType) throws IOException { LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); BlockLocalPathInfo pathinfo = null; ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, @@ -279,7 +274,15 @@ class BlockReaderLocalLegacy implements BlockReader { try { // make RPC to local datanode to find local pathnames of blocks pathinfo = proxy.getBlockLocalPathInfo(blk, token); - if (pathinfo != null) { + // We cannot cache the path information for a replica on transient storage. + // If the replica gets evicted, then it moves to a different path. Then, + // our next attempt to read from the cached path would fail to find the + // file. Additionally, the failure would cause us to disable legacy + // short-circuit read for all subsequent use in the ClientContext. Unlike + // the newer short-circuit read implementation, we have no communication + // channel for the DataNode to notify the client that the path has been + // invalidated. Therefore, our only option is to skip caching. + if (pathinfo != null && !storageType.isTransient()) { if (LOG.isDebugEnabled()) { LOG.debug("Cached location of block " + blk + " as " + pathinfo); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 1fb1af1..97ffdde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -97,6 +97,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CipherSuite; @@ -519,8 +520,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return createChecksum(null); } - private DataChecksum createChecksum(ChecksumOpt userOpt) - throws IOException { + private DataChecksum createChecksum(ChecksumOpt userOpt) { // Fill in any missing field with the default. ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt( defaultChecksumOpt, userOpt); @@ -528,8 +528,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, myOpt.getChecksumType(), myOpt.getBytesPerChecksum()); if (dataChecksum == null) { - throw new IOException("Invalid checksum type specified: " - + myOpt.getChecksumType().name()); + throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt=" + + userOpt + ", default=" + defaultChecksumOpt + + ", effective=null"); } return dataChecksum; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index e8bcfcc..e83f067 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -570,6 +571,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, DNAddrPair retval = chooseDataNode(targetBlock, null); chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; + StorageType storageType = retval.storageType; try { ExtendedBlock blk = targetBlock.getBlock(); @@ -578,6 +580,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, setInetSocketAddress(targetAddr). setRemotePeerFactory(dfsClient). setDatanodeInfo(chosenNode). + setStorageType(storageType). setFileName(src). setBlock(blk). setBlockToken(accessToken). @@ -885,12 +888,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private DNAddrPair chooseDataNode(LocatedBlock block, Collection ignoredNodes) throws IOException { while (true) { - DatanodeInfo[] nodes = block.getLocations(); try { - return getBestNodeDNAddrPair(nodes, ignoredNodes); + return getBestNodeDNAddrPair(block, ignoredNodes); } catch (IOException ie) { - String errMsg = - getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes); + String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), + deadNodes, ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getMaxBlockAcquireFailures()) { String description = "Could not obtain block: " + blockInfo; @@ -899,7 +901,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, throw new BlockMissingException(src, description, block.getStartOffset()); } - + + DatanodeInfo[] nodes = block.getLocations(); if (nodes == null || nodes.length == 0) { DFSClient.LOG.info("No node available for " + blockInfo); } @@ -933,22 +936,44 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } /** - * Get the best node. - * @param nodes Nodes to choose from. - * @param ignoredNodes Do not chose nodes in this array (may be null) + * Get the best node from which to stream the data. + * @param block LocatedBlock, containing nodes in priority order. + * @param ignoredNodes Do not choose nodes in this array (may be null) * @return The DNAddrPair of the best node. * @throws IOException */ - private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes, + private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, Collection ignoredNodes) throws IOException { - DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes); + DatanodeInfo[] nodes = block.getLocations(); + StorageType[] storageTypes = block.getStorageTypes(); + DatanodeInfo chosenNode = null; + StorageType storageType = null; + if (nodes != null) { + for (int i = 0; i < nodes.length; i++) { + if (!deadNodes.containsKey(nodes[i]) + && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { + chosenNode = nodes[i]; + // Storage types are ordered to correspond with nodes, so use the same + // index to get storage type. + if (storageTypes != null && i < storageTypes.length) { + storageType = storageTypes[i]; + } + break; + } + } + } + if (chosenNode == null) { + throw new IOException("No live nodes contain block " + block.getBlock() + + " after checking nodes = " + Arrays.toString(nodes) + + ", ignoredNodes = " + ignoredNodes); + } final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); - return new DNAddrPair(chosenNode, targetAddr); + return new DNAddrPair(chosenNode, targetAddr, storageType); } private static String getBestNodeDNAddrPairErrorString( @@ -1039,6 +1064,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } DatanodeInfo chosenNode = datanode.info; InetSocketAddress targetAddr = datanode.addr; + StorageType storageType = datanode.storageType; BlockReader reader = null; try { @@ -1049,6 +1075,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, setInetSocketAddress(targetAddr). setRemotePeerFactory(dfsClient). setDatanodeInfo(chosenNode). + setStorageType(storageType). setFileName(src). setBlock(block.getBlock()). setBlockToken(blockToken). @@ -1174,7 +1201,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, // If no nodes to do hedged reads against, pass. try { try { - chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored); + chosenNode = getBestNodeDNAddrPair(block, ignored); } catch (IOException ioe) { chosenNode = chooseDataNode(block, ignored); } @@ -1529,31 +1556,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, throw new IOException("Mark/reset not supported"); } - /** - * Pick the best node from which to stream the data. - * Entries in nodes are already in the priority order - */ - static DatanodeInfo bestNode(DatanodeInfo nodes[], - AbstractMap deadNodes, - Collection ignoredNodes) throws IOException { - if (nodes != null) { - for (int i = 0; i < nodes.length; i++) { - if (!deadNodes.containsKey(nodes[i]) - && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { - return nodes[i]; - } - } - } - throw new IOException("No live nodes contain current block"); - } - /** Utility class to encapsulate data node info and its address. */ - static class DNAddrPair { + private static final class DNAddrPair { final DatanodeInfo info; final InetSocketAddress addr; - DNAddrPair(DatanodeInfo info, InetSocketAddress addr) { + final StorageType storageType; + + DNAddrPair(DatanodeInfo info, InetSocketAddress addr, + StorageType storageType) { this.info = info; this.addr = addr; + this.storageType = storageType; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index b6b4846..08e7670 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -42,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Preconditions; + +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CreateFlag; @@ -89,9 +91,9 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; - import org.htrace.Span; import org.htrace.Trace; import org.htrace.TraceScope; @@ -148,7 +150,10 @@ public class DFSOutputStream extends FSOutputSummer private String src; private final long fileId; private final long blockSize; - private final DataChecksum checksum; + /** Only for DataTransferProtocol.writeBlock(..) */ + private final DataChecksum checksum4WriteBlock; + private final int bytesPerChecksum; + // both dataQueue and ackQueue are protected by dataQueue lock private final LinkedList dataQueue = new LinkedList(); private final LinkedList ackQueue = new LinkedList(); @@ -245,6 +250,9 @@ public class DFSOutputStream extends FSOutputSummer } void writeChecksum(byte[] inarray, int off, int len) { + if (len == 0) { + return; + } if (checksumPos + len > dataStart) { throw new BufferOverflowException(); } @@ -378,18 +386,11 @@ public class DFSOutputStream extends FSOutputSummer private final Span traceSpan; /** - * Default construction for file create - */ - private DataStreamer(HdfsFileStatus stat) { - this(stat, null); - } - - /** * construction with tracing info */ private DataStreamer(HdfsFileStatus stat, Span span) { isAppend = false; - isLazyPersistFile = initLazyPersist(stat); + isLazyPersistFile = isLazyPersist(stat); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; traceSpan = span; } @@ -409,7 +410,7 @@ public class DFSOutputStream extends FSOutputSummer block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); - isLazyPersistFile = initLazyPersist(stat); + isLazyPersistFile = isLazyPersist(stat); long usedInLastBlock = stat.getLen() % blockSize; int freeInLastBlock = (int)(blockSize - usedInLastBlock); @@ -452,13 +453,6 @@ public class DFSOutputStream extends FSOutputSummer } } - - private boolean initLazyPersist(HdfsFileStatus stat) { - final BlockStoragePolicy lpPolicy = blockStoragePolicySuite - .getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME); - return lpPolicy != null && - stat.getStoragePolicy() == lpPolicy.getId(); - } private void setPipeline(LocatedBlock lb) { setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); @@ -553,7 +547,7 @@ public class DFSOutputStream extends FSOutputSummer } // get packet to be sent. if (dataQueue.isEmpty()) { - one = new Packet(checksum.getChecksumSize()); // heartbeat packet + one = new Packet(getChecksumSize()); // heartbeat packet } else { one = dataQueue.getFirst(); // regular data packet } @@ -1408,8 +1402,8 @@ public class DFSOutputStream extends FSOutputSummer // send the request new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, - nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, - cachingStrategy.get(), isLazyPersistFile); + nodes.length, block.getNumBytes(), bytesSent, newGS, + checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( @@ -1618,9 +1612,23 @@ public class DFSOutputStream extends FSOutputSummer return value; } + /** + * @return the object for computing checksum. + * The type is NULL if checksum is not computed. + */ + private static DataChecksum getChecksum4Compute(DataChecksum checksum, + HdfsFileStatus stat) { + if (isLazyPersist(stat) && stat.getReplication() == 1) { + // do not compute checksum for writing to single replica to memory + return DataChecksum.newDataChecksum(Type.NULL, + checksum.getBytesPerChecksum()); + } + return checksum; + } + private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) throws IOException { - super(checksum); + super(getChecksum4Compute(checksum, stat)); this.dfsClient = dfsClient; this.src = src; this.fileId = stat.getFileId(); @@ -1635,15 +1643,18 @@ public class DFSOutputStream extends FSOutputSummer "Set non-null progress callback on DFSOutputStream " + src); } - final int bytesPerChecksum = checksum.getBytesPerChecksum(); - if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) { - throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum + - ") and blockSize(" + blockSize + - ") do not match. " + "blockSize should be a " + - "multiple of io.bytes.per.checksum"); - - } - this.checksum = checksum; + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + if (bytesPerChecksum <= 0) { + throw new HadoopIllegalArgumentException( + "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0"); + } + if (blockSize % bytesPerChecksum != 0) { + throw new HadoopIllegalArgumentException("Invalid values: " + + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + + ") must divide block size (=" + blockSize + ")."); + } + this.checksum4WriteBlock = checksum; + this.dfsclientSlowLogThresholdMs = dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; } @@ -1655,8 +1666,7 @@ public class DFSOutputStream extends FSOutputSummer this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); - computePacketChunkSize(dfsClient.getConf().writePacketSize, - checksum.getBytesPerChecksum()); + computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); Span traceSpan = null; if (Trace.isTracing()) { @@ -1734,11 +1744,9 @@ public class DFSOutputStream extends FSOutputSummer if (lastBlock != null) { // indicate that we are appending to an existing block bytesCurBlock = lastBlock.getBlockSize(); - streamer = new DataStreamer(lastBlock, stat, - checksum.getBytesPerChecksum(), traceSpan); + streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan); } else { - computePacketChunkSize(dfsClient.getConf().writePacketSize, - checksum.getBytesPerChecksum()); + computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); streamer = new DataStreamer(stat, traceSpan); } this.fileEncryptionInfo = stat.getFileEncryptionInfo(); @@ -1752,9 +1760,15 @@ public class DFSOutputStream extends FSOutputSummer out.start(); return out; } + + private static boolean isLazyPersist(HdfsFileStatus stat) { + final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy( + HdfsConstants.MEMORY_STORAGE_POLICY_NAME); + return p != null && stat.getStoragePolicy() == p.getId(); + } private void computePacketChunkSize(int psize, int csize) { - int chunkSize = csize + checksum.getChecksumSize(); + final int chunkSize = csize + getChecksumSize(); chunksPerPacket = Math.max(psize/chunkSize, 1); packetSize = chunkSize*chunksPerPacket; if (DFSClient.LOG.isDebugEnabled()) { @@ -1811,21 +1825,19 @@ public class DFSOutputStream extends FSOutputSummer dfsClient.checkOpen(); checkClosed(); - int bytesPerChecksum = this.checksum.getBytesPerChecksum(); if (len > bytesPerChecksum) { throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum " + bytesPerChecksum); } - if (cklen != this.checksum.getChecksumSize()) { + if (cklen != 0 && cklen != getChecksumSize()) { throw new IOException("writeChunk() checksum size is supposed to be " + - this.checksum.getChecksumSize() + - " but found to be " + cklen); + getChecksumSize() + " but found to be " + cklen); } if (currentPacket == null) { currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); + bytesCurBlock, currentSeqno++, getChecksumSize()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + @@ -1873,7 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer // if (bytesCurBlock == blockSize) { currentPacket = new Packet(0, 0, bytesCurBlock, - currentSeqno++, this.checksum.getChecksumSize()); + currentSeqno++, getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); @@ -1961,7 +1973,7 @@ public class DFSOutputStream extends FSOutputSummer // but sync was requested. // Send an empty packet currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); + bytesCurBlock, currentSeqno++, getChecksumSize()); } } else { if (isSync && bytesCurBlock > 0) { @@ -1970,7 +1982,7 @@ public class DFSOutputStream extends FSOutputSummer // and sync was requested. // So send an empty sync packet. currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); + bytesCurBlock, currentSeqno++, getChecksumSize()); } else { // just discard the current packet since it is already been sent. currentPacket = null; @@ -2174,8 +2186,7 @@ public class DFSOutputStream extends FSOutputSummer if (bytesCurBlock != 0) { // send an empty packet to mark the end of the block - currentPacket = new Packet(0, 0, bytesCurBlock, - currentSeqno++, this.checksum.getChecksumSize()); + currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; } @@ -2239,8 +2250,7 @@ public class DFSOutputStream extends FSOutputSummer @VisibleForTesting public synchronized void setChunksPerPacket(int value) { chunksPerPacket = Math.min(chunksPerPacket, value); - packetSize = (checksum.getBytesPerChecksum() + - checksum.getChecksumSize()) * chunksPerPacket; + packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket; } synchronized void setTestFilename(String newname) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 16bcc0b..30368f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -185,7 +186,11 @@ public class LocatedBlock { + "; getBlockSize()=" + getBlockSize() + "; corrupt=" + corrupt + "; offset=" + offset - + "; locs=" + java.util.Arrays.asList(locs) + + "; locs=" + Arrays.asList(locs) + + "; storageIDs=" + + (storageIDs != null ? Arrays.asList(storageIDs) : null) + + "; storageTypes=" + + (storageTypes != null ? Arrays.asList(storageTypes) : null) + "}"; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index b86cad4..51a6134 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -29,10 +29,13 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.DataChecksum; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; @@ -46,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private @InterfaceStability.Evolving public class BlockMetadataHeader { + private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class); public static final short VERSION = 1; @@ -74,6 +78,37 @@ public class BlockMetadataHeader { } /** + * Read the checksum header from the meta file. + * @return the data checksum obtained from the header. + */ + public static DataChecksum readDataChecksum(File metaFile) throws IOException { + DataInputStream in = null; + try { + in = new DataInputStream(new BufferedInputStream( + new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE)); + return readDataChecksum(in, metaFile); + } finally { + IOUtils.closeStream(in); + } + } + + /** + * Read the checksum header from the meta input stream. + * @return the data checksum obtained from the header. + */ + public static DataChecksum readDataChecksum(final DataInputStream metaIn, + final Object name) throws IOException { + // read and handle the common header here. For now just a version + final BlockMetadataHeader header = readHeader(metaIn); + if (header.getVersion() != VERSION) { + LOG.warn("Unexpected meta-file version for " + name + + ": version in file is " + header.getVersion() + + " but expected version is " + VERSION); + } + return header.getChecksum(); + } + + /** * Read the header without changing the position of the FileChannel. * * @param fc The FileChannel to read. @@ -82,7 +117,7 @@ public class BlockMetadataHeader { */ public static BlockMetadataHeader preadHeader(FileChannel fc) throws IOException { - byte arr[] = new byte[2 + DataChecksum.HEADER_LEN]; + final byte arr[] = new byte[getHeaderSize()]; ByteBuffer buf = ByteBuffer.wrap(arr); while (buf.hasRemaining()) { @@ -158,7 +193,7 @@ public class BlockMetadataHeader { * Writes all the fields till the beginning of checksum. * @throws IOException on error */ - static void writeHeader(DataOutputStream out, DataChecksum checksum) + public static void writeHeader(DataOutputStream out, DataChecksum checksum) throws IOException { writeHeader(out, new BlockMetadataHeader(VERSION, checksum)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 3d497f5..2e388f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -82,12 +82,12 @@ class BlockReceiver implements Closeable { * checksum polynomial than the block is stored with on disk, * the DataNode needs to recalculate checksums before writing. */ - private boolean needsChecksumTranslation; + private final boolean needsChecksumTranslation; private OutputStream out = null; // to block file at local disk private FileDescriptor outFd; private DataOutputStream checksumOut = null; // to crc file at local disk - private int bytesPerChecksum; - private int checksumSize; + private final int bytesPerChecksum; + private final int checksumSize; private final PacketReceiver packetReceiver = new PacketReceiver(false); @@ -99,7 +99,6 @@ class BlockReceiver implements Closeable { private DataTransferThrottler throttler; private ReplicaOutputStreams streams; private DatanodeInfo srcDataNode = null; - private Checksum partialCrc = null; private final DataNode datanode; volatile private boolean mirrorError; @@ -490,7 +489,7 @@ class BlockReceiver implements Closeable { long offsetInBlock = header.getOffsetInBlock(); long seqno = header.getSeqno(); boolean lastPacketInBlock = header.isLastPacketInBlock(); - int len = header.getDataLen(); + final int len = header.getDataLen(); boolean syncBlock = header.getSyncBlock(); // avoid double sync'ing on close @@ -499,7 +498,7 @@ class BlockReceiver implements Closeable { } // update received bytes - long firstByteInBlock = offsetInBlock; + final long firstByteInBlock = offsetInBlock; offsetInBlock += len; if (replicaInfo.getNumBytes() < offsetInBlock) { replicaInfo.setNumBytes(offsetInBlock); @@ -539,16 +538,15 @@ class BlockReceiver implements Closeable { flushOrSync(true); } } else { - int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* - checksumSize; + final int checksumLen = diskChecksum.getChecksumSize(len); + final int checksumReceivedLen = checksumBuf.capacity(); - if ( checksumBuf.capacity() != checksumLen) { - throw new IOException("Length of checksums in packet " + - checksumBuf.capacity() + " does not match calculated checksum " + - "length " + checksumLen); + if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) { + throw new IOException("Invalid checksum length: received length is " + + checksumReceivedLen + " but expected length is " + checksumLen); } - if (shouldVerifyChecksum()) { + if (checksumReceivedLen > 0 && shouldVerifyChecksum()) { try { verifyChunks(dataBuf, checksumBuf); } catch (IOException ioe) { @@ -572,11 +570,17 @@ class BlockReceiver implements Closeable { translateChunks(dataBuf, checksumBuf); } } + + if (checksumReceivedLen == 0 && !streams.isTransientStorage()) { + // checksum is missing, need to calculate it + checksumBuf = ByteBuffer.allocate(checksumLen); + diskChecksum.calculateChunkedSums(dataBuf, checksumBuf); + } // by this point, the data in the buffer uses the disk checksum - byte[] lastChunkChecksum; - + final boolean shouldNotWriteChecksum = checksumReceivedLen == 0 + && streams.isTransientStorage(); try { long onDiskLen = replicaInfo.getBytesOnDisk(); if (onDiskLen bytesPerChecksum) { - throw new IOException("Got wrong length during writeBlock(" + - block + ") from " + inAddr + " " + - "A packet can have only one partial chunk."+ - " len = " + len + - " bytesPerChecksum " + bytesPerChecksum); + throw new IOException("Unexpected packet data length for " + + block + " from " + inAddr + ": a partial chunk must be " + + " sent in an individual packet (data length = " + len + + " > bytesPerChecksum = " + bytesPerChecksum + ")"); } partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk); byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); - lastChunkChecksum = Arrays.copyOfRange( - buf, buf.length - checksumSize, buf.length - ); + lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length); checksumOut.write(buf); if(LOG.isDebugEnabled()) { LOG.debug("Writing out partial crc for data len " + len); } partialCrc = null; } else { - lastChunkChecksum = Arrays.copyOfRange( - checksumBuf.array(), - checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize, - checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen); - checksumOut.write(checksumBuf.array(), - checksumBuf.arrayOffset() + checksumBuf.position(), - checksumLen); + // write checksum + final int offset = checksumBuf.arrayOffset() + + checksumBuf.position(); + final int end = offset + checksumLen; + lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize, + end); + checksumOut.write(checksumBuf.array(), offset, checksumLen); } + /// flush entire packet, sync if requested flushOrSync(syncBlock); - replicaInfo.setLastChecksumAndDataLen( - offsetInBlock, lastChunkChecksum - ); + replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc); datanode.metrics.incrBytesWritten(len); @@ -686,6 +691,10 @@ class BlockReceiver implements Closeable { return lastPacketInBlock?-1:len; } + private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) { + return Arrays.copyOfRange(array, end - size, end); + } + private void manageWriterOsCache(long offsetInBlock) { try { if (outFd != null && @@ -921,18 +930,19 @@ class BlockReceiver implements Closeable { * reads in the partial crc chunk and computes checksum * of pre-existing data in partial chunk. */ - private void computePartialChunkCrc(long blkoff, long ckoff, - int bytesPerChecksum) throws IOException { + private Checksum computePartialChunkCrc(long blkoff, long ckoff) + throws IOException { // find offset of the beginning of partial chunk. // int sizePartialChunk = (int) (blkoff % bytesPerChecksum); - int checksumSize = diskChecksum.getChecksumSize(); blkoff = blkoff - sizePartialChunk; - LOG.info("computePartialChunkCrc sizePartialChunk " + - sizePartialChunk + " " + block + - " block offset " + blkoff + - " metafile offset " + ckoff); + if (LOG.isDebugEnabled()) { + LOG.debug("computePartialChunkCrc for " + block + + ": sizePartialChunk=" + sizePartialChunk + + ", block offset=" + blkoff + + ", metafile offset=" + ckoff); + } // create an input stream from the block file // and read in partial crc chunk into temporary buffer @@ -951,10 +961,12 @@ class BlockReceiver implements Closeable { } // compute crc of partial chunk from data read in the block file. - partialCrc = DataChecksum.newDataChecksum( + final Checksum partialCrc = DataChecksum.newDataChecksum( diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum()); partialCrc.update(buf, 0, sizePartialChunk); - LOG.info("Read in partial CRC chunk from disk for " + block); + if (LOG.isDebugEnabled()) { + LOG.debug("Read in partial CRC chunk from disk for " + block); + } // paranoia! verify that the pre-computed crc matches what we // recalculated just now @@ -965,6 +977,7 @@ class BlockReceiver implements Closeable { checksum2long(crcbuf); throw new IOException(msg); } + return partialCrc; } private static enum PacketResponderType { http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 0082fcd..ce0e1d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; @@ -265,26 +266,37 @@ class BlockSender implements java.io.Closeable { */ DataChecksum csum = null; if (verifyChecksum || sendChecksum) { - final InputStream metaIn = datanode.data.getMetaDataInputStream(block); - if (!corruptChecksumOk || metaIn != null) { - if (metaIn == null) { - //need checksum but meta-data not found - throw new FileNotFoundException("Meta-data not found for " + block); - } - - checksumIn = new DataInputStream( - new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); + LengthInputStream metaIn = null; + boolean keepMetaInOpen = false; + try { + metaIn = datanode.data.getMetaDataInputStream(block); + if (!corruptChecksumOk || metaIn != null) { + if (metaIn == null) { + //need checksum but meta-data not found + throw new FileNotFoundException("Meta-data not found for " + + block); + } + + // The meta file will contain only the header if the NULL checksum + // type was used, or if the replica was written to transient storage. + // Checksum verification is not performed for replicas on transient + // storage. The header is important for determining the checksum + // type later when lazy persistence copies the block to non-transient + // storage and computes the checksum. + if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) { + checksumIn = new DataInputStream(new BufferedInputStream( + metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); - // read and handle the common header here. For now just a version - BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - short version = header.getVersion(); - if (version != BlockMetadataHeader.VERSION) { - LOG.warn("Wrong version (" + version + ") for metadata file for " - + block + " ignoring ..."); + csum = BlockMetadataHeader.readDataChecksum(checksumIn, block); + keepMetaInOpen = true; + } + } else { + LOG.warn("Could not find metadata file for " + block); + } + } finally { + if (!keepMetaInOpen) { + IOUtils.closeStream(metaIn); } - csum = header.getChecksum(); - } else { - LOG.warn("Could not find metadata file for " + block); } } if (csum == null) { @@ -343,7 +355,7 @@ class BlockSender implements java.io.Closeable { endOffset = end; // seek to the right offsets - if (offset > 0) { + if (offset > 0 && checksumIn != null) { long checksumSkip = (offset / chunkSize) * checksumSize; // note blockInStream is seeked when created below if (checksumSkip > 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 45862ca..6a26640 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -213,7 +213,7 @@ public class ReplicaInPipeline extends ReplicaInfo // the checksum that should actually be used -- this // may differ from requestedChecksum for appends. - DataChecksum checksum; + final DataChecksum checksum; RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); @@ -250,7 +250,7 @@ public class ReplicaInPipeline extends ReplicaInfo } } } else { - // for create, we can use the requested checksum + // for create, we can use the requested checksum checksum = requestedChecksum; } @@ -264,7 +264,8 @@ public class ReplicaInPipeline extends ReplicaInfo blockOut.getChannel().position(blockDiskSize); crcOut.getChannel().position(crcDiskSize); } - return new ReplicaOutputStreams(blockOut, crcOut, checksum); + return new ReplicaOutputStreams(blockOut, crcOut, checksum, + getVolume().isTransientStorage()); } catch (IOException e) { IOUtils.closeStream(blockOut); IOUtils.closeStream(metaRAF); http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index 95044c8..bd1461a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -32,16 +32,18 @@ public class ReplicaOutputStreams implements Closeable { private final OutputStream dataOut; private final OutputStream checksumOut; private final DataChecksum checksum; + private final boolean isTransientStorage; /** * Create an object with a data output stream, a checksum output stream * and a checksum. */ public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, - DataChecksum checksum) { + DataChecksum checksum, boolean isTransientStorage) { this.dataOut = dataOut; this.checksumOut = checksumOut; this.checksum = checksum; + this.isTransientStorage = isTransientStorage; } /** @return the data output stream. */ @@ -59,6 +61,11 @@ public class ReplicaOutputStreams implements Closeable { return checksum; } + /** @return is writing to a transient storage? */ + public boolean isTransientStorage() { + return isTransientStorage; + } + @Override public void close() { IOUtils.closeStream(dataOut); http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 384a80a..5c8709c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -593,13 +593,8 @@ class BlockPoolSlice { HdfsConstants.IO_FILE_BUFFER_SIZE)); // read and handle the common header here. For now just a version - BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - short version = header.getVersion(); - if (version != BlockMetadataHeader.VERSION) { - FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file " - + metaFile + " ignoring ..."); - } - DataChecksum checksum = header.getChecksum(); + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + checksumIn, metaFile); int bytesPerChecksum = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize(); long numChunks = Math.min( http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 350788c..070395a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileDescriptor; import java.io.FileInputStream; @@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -91,6 +94,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.metrics2.util.MBeans; @@ -633,7 +637,7 @@ class FsDatasetImpl implements FsDatasetSpi { * Get the meta info of a block stored in volumeMap. To find a block, * block pool Id, block Id and generation stamp must match. * @param b extended block - * @return the meta replica information; null if block was not found + * @return the meta replica information * @throws ReplicaNotFoundException if no entry is in the map or * there is a generation stamp mismatch */ @@ -721,23 +725,80 @@ class FsDatasetImpl implements FsDatasetSpi { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); - try { - Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); - } catch (IOException e) { - throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e); - } + computeChecksum(srcMeta, dstMeta, srcFile); + try { Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true); } catch (IOException e) { throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); } if (LOG.isDebugEnabled()) { - LOG.debug("Copied " + srcMeta + " to " + dstMeta); + LOG.debug("Copied " + srcMeta + " to " + dstMeta + + " and calculated checksum"); LOG.debug("Copied " + srcFile + " to " + dstFile); } return new File[] {dstMeta, dstFile}; } + /** + * Compute and store the checksum for a block file that does not already have + * its checksum computed. + * + * @param srcMeta source meta file, containing only the checksum header, not a + * calculated checksum + * @param dstMeta destination meta file, into which this method will write a + * full computed checksum + * @param blockFile block file for which the checksum will be computed + * @throws IOException + */ + private static void computeChecksum(File srcMeta, File dstMeta, File blockFile) + throws IOException { + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); + final byte[] data = new byte[1 << 16]; + final byte[] crcs = new byte[checksum.getChecksumSize(data.length)]; + + DataOutputStream metaOut = null; + InputStream dataIn = null; + try { + File parentFile = dstMeta.getParentFile(); + if (parentFile != null) { + if (!parentFile.mkdirs() && !parentFile.isDirectory()) { + throw new IOException("Destination '" + parentFile + + "' directory cannot be created"); + } + } + metaOut = new DataOutputStream(new BufferedOutputStream( + new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE)); + BlockMetadataHeader.writeHeader(metaOut, checksum); + + dataIn = isNativeIOAvailable ? + NativeIO.getShareDeleteFileInputStream(blockFile) : + new FileInputStream(blockFile); + + int offset = 0; + for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) { + if (n > 0) { + n += offset; + offset = n % checksum.getBytesPerChecksum(); + final int length = n - offset; + + if (length > 0) { + checksum.calculateChunkedSums(data, 0, length, crcs, 0); + metaOut.write(crcs, 0, checksum.getChecksumSize(length)); + + System.arraycopy(data, length, data, 0, offset); + } + } + } + + // calculate and write the last crc + checksum.calculateChunkedSums(data, 0, offset, crcs, 0); + metaOut.write(crcs, 0, 4); + } finally { + IOUtils.cleanup(LOG, dataIn, metaOut); + } + } + static private void truncateBlock(File blockFile, File metaFile, long oldlen, long newlen) throws IOException { LOG.info("truncateBlock: blockFile=" + blockFile @@ -1640,6 +1701,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } + @Override public boolean isCached(String bpid, long blockId) { return cacheManager.isCached(bpid, blockId); } @@ -2555,8 +2617,14 @@ class FsDatasetImpl implements FsDatasetSpi { // Before deleting the files from transient storage we must notify the // NN that the files are on the new storage. Else a blockReport from // the transient storage might cause the NN to think the blocks are lost. + // Replicas must be evicted from client short-circuit caches, because the + // storage will no longer be transient, and thus will require validating + // checksum. This also stops a client from holding file descriptors, + // which would prevent the OS from reclaiming the memory. ExtendedBlock extendedBlock = new ExtendedBlock(bpid, newReplicaInfo); + datanode.getShortCircuitRegistry().processBlockInvalidation( + ExtendedBlockId.fromExtendedBlock(extendedBlock)); datanode.notifyNamenodeReceivedBlock( extendedBlock, null, newReplicaInfo.getStorageUuid()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 76acbea..5fdcc2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -241,7 +241,7 @@ class RamDiskAsyncLazyPersistService { } catch (Exception e){ FsDatasetImpl.LOG.warn( "LazyWriter failed to async persist RamDisk block pool id: " - + bpId + "block Id: " + blockId); + + bpId + "block Id: " + blockId, e); } finally { if (!succeeded) { datanode.getFSDataset().onFailLazyPersist(bpId, blockId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java index a843d9a..c01a6cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java @@ -168,9 +168,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { @Override synchronized RamDiskReplicaLru getNextCandidateForEviction() { - Iterator it = replicasPersisted.values().iterator(); + final Iterator it = replicasPersisted.values().iterator(); while (it.hasNext()) { - RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next(); + final RamDiskReplicaLru ramDiskReplicaLru = it.next(); it.remove(); Map replicaMap = http://git-wip-us.apache.org/repos/asf/hadoop/blob/463aec11/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 0786bc6..83b476f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -248,7 +248,8 @@ public class SimulatedFSDataset implements FsDatasetSpi { + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); - return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum); + return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, + volume.isTransientStorage()); } }