Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4666D17E90 for ; Wed, 2 Sep 2015 05:58:37 +0000 (UTC) Received: (qmail 51468 invoked by uid 500); 2 Sep 2015 05:58:16 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 51221 invoked by uid 500); 2 Sep 2015 05:58:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 47683 invoked by uid 99); 2 Sep 2015 05:58:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Sep 2015 05:58:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C8774E33C9; Wed, 2 Sep 2015 05:58:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Wed, 02 Sep 2015 05:59:02 -0000 Message-Id: In-Reply-To: <6565787dd4d443c7bb93c4c198c51fd2@git.apache.org> References: <6565787dd4d443c7bb93c4c198c51fd2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285 Merge remote-tracking branch 'apache/trunk' into HDFS-7285 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/53358fe6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/53358fe6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/53358fe6 Branch: refs/heads/HDFS-7285 Commit: 53358fe680a11c1b66a7f60733d11c1f4efe0232 Parents: ab56fcd 2e251a7 Author: Zhe Zhang Authored: Tue Sep 1 00:29:55 2015 -0700 Committer: Zhe Zhang Committed: Tue Sep 1 14:48:37 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 9 + hadoop-common-project/hadoop-common/pom.xml | 19 +- .../fs/CommonConfigurationKeysPublic.java | 7 + .../src/main/resources/core-default.xml | 14 +- .../src/site/markdown/FileSystemShell.md | 13 +- .../java/org/apache/hadoop/fs/test-untar.tar | Bin 20480 -> 0 bytes .../java/org/apache/hadoop/fs/test-untar.tgz | Bin 2024 -> 0 bytes .../fs/viewfs/ViewFileSystemBaseTest.java | 2 +- .../apache/hadoop/fs/viewfs/ViewFsBaseTest.java | 2 +- .../src/test/resources/test-untar.tar | Bin 0 -> 20480 bytes .../src/test/resources/test-untar.tgz | Bin 0 -> 2024 bytes hadoop-hdfs-project/hadoop-hdfs-client/pom.xml | 5 + .../org/apache/hadoop/hdfs/BlockReader.java | 110 +++ .../apache/hadoop/hdfs/BlockReaderLocal.java | 748 +++++++++++++++++++ .../hadoop/hdfs/BlockReaderLocalLegacy.java | 743 ++++++++++++++++++ .../org/apache/hadoop/hdfs/BlockReaderUtil.java | 57 ++ .../org/apache/hadoop/hdfs/ClientContext.java | 196 +++++ .../org/apache/hadoop/hdfs/DFSUtilClient.java | 68 ++ .../apache/hadoop/hdfs/ExternalBlockReader.java | 126 ++++ .../apache/hadoop/hdfs/KeyProviderCache.java | 112 +++ .../java/org/apache/hadoop/hdfs/PeerCache.java | 291 ++++++++ .../apache/hadoop/hdfs/RemoteBlockReader.java | 517 +++++++++++++ .../apache/hadoop/hdfs/RemoteBlockReader2.java | 485 ++++++++++++ .../hadoop/hdfs/client/BlockReportOptions.java | 59 ++ .../hdfs/client/HdfsClientConfigKeys.java | 13 + .../hdfs/protocol/BlockLocalPathInfo.java | 70 ++ .../hdfs/protocol/ClientDatanodeProtocol.java | 152 ++++ .../InvalidEncryptionKeyException.java | 40 + .../protocol/datatransfer/PacketHeader.java | 214 ++++++ .../protocol/datatransfer/PacketReceiver.java | 310 ++++++++ .../protocolPB/ClientDatanodeProtocolPB.java | 37 + .../ClientDatanodeProtocolTranslatorPB.java | 326 ++++++++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 13 + .../token/block/BlockTokenSelector.java | 48 ++ .../hdfs/util/ByteBufferOutputStream.java | 49 ++ .../hadoop/hdfs/web/URLConnectionFactory.java | 30 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 15 +- .../hdfs/web/oauth2/AccessTokenProvider.java | 66 ++ .../hdfs/web/oauth2/AccessTokenTimer.java | 103 +++ .../ConfCredentialBasedAccessTokenProvider.java | 62 ++ ...onfRefreshTokenBasedAccessTokenProvider.java | 146 ++++ .../CredentialBasedAccessTokenProvider.java | 135 ++++ .../oauth2/OAuth2ConnectionConfigurator.java | 79 ++ .../hadoop/hdfs/web/oauth2/OAuth2Constants.java | 46 ++ .../apache/hadoop/hdfs/web/oauth2/Utils.java | 63 ++ .../hadoop/hdfs/web/oauth2/package-info.java | 26 + hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 22 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 6 + .../bkjournal/BookKeeperEditLogInputStream.java | 2 +- .../org/apache/hadoop/hdfs/BlockReader.java | 110 --- .../apache/hadoop/hdfs/BlockReaderLocal.java | 746 ------------------ .../hadoop/hdfs/BlockReaderLocalLegacy.java | 740 ------------------ .../org/apache/hadoop/hdfs/BlockReaderUtil.java | 57 -- .../org/apache/hadoop/hdfs/ClientContext.java | 195 ----- .../java/org/apache/hadoop/hdfs/DFSClient.java | 1 - .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 +- .../org/apache/hadoop/hdfs/DFSInputStream.java | 2 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 68 +- .../apache/hadoop/hdfs/ExternalBlockReader.java | 126 ---- .../apache/hadoop/hdfs/KeyProviderCache.java | 111 --- .../java/org/apache/hadoop/hdfs/PeerCache.java | 290 ------- .../apache/hadoop/hdfs/RemoteBlockReader.java | 513 ------------- .../apache/hadoop/hdfs/RemoteBlockReader2.java | 482 ------------ .../hadoop/hdfs/client/BlockReportOptions.java | 59 -- .../hdfs/protocol/BlockLocalPathInfo.java | 70 -- .../hdfs/protocol/ClientDatanodeProtocol.java | 152 ---- .../hadoop/hdfs/protocol/LayoutVersion.java | 2 +- .../InvalidEncryptionKeyException.java | 40 - .../protocol/datatransfer/PacketHeader.java | 214 ------ .../protocol/datatransfer/PacketReceiver.java | 310 -------- .../hdfs/protocol/datatransfer/Receiver.java | 15 +- .../protocolPB/ClientDatanodeProtocolPB.java | 37 - ...tDatanodeProtocolServerSideTranslatorPB.java | 6 +- .../ClientDatanodeProtocolTranslatorPB.java | 326 -------- ...tNamenodeProtocolServerSideTranslatorPB.java | 14 +- .../DatanodeProtocolServerSideTranslatorPB.java | 2 +- ...rDatanodeProtocolServerSideTranslatorPB.java | 2 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 33 +- .../token/block/BlockTokenSelector.java | 48 -- .../hdfs/server/blockmanagement/BlockInfo.java | 19 +- .../blockmanagement/BlockInfoContiguous.java | 15 - .../blockmanagement/BlockInfoStriped.java | 19 - .../server/blockmanagement/BlockManager.java | 552 +++++--------- .../BlockPlacementPolicyDefault.java | 147 +--- .../blockmanagement/BlockRecoveryWork.java | 111 +++ .../blockmanagement/BlockToMarkCorrupt.java | 82 ++ .../hdfs/server/blockmanagement/BlocksMap.java | 16 - .../blockmanagement/DatanodeDescriptor.java | 35 +- .../server/blockmanagement/DatanodeManager.java | 9 +- .../blockmanagement/ErasureCodingWork.java | 60 ++ .../server/blockmanagement/HostFileManager.java | 19 + .../server/blockmanagement/ReplicationWork.java | 53 ++ .../hadoop/hdfs/server/datanode/DNConf.java | 4 +- .../namenode/EditLogBackupInputStream.java | 2 +- .../server/namenode/EditLogFileInputStream.java | 2 +- .../hdfs/server/namenode/FSDirDeleteOp.java | 40 + .../hdfs/server/namenode/FSDirectory.java | 63 ++ .../hdfs/server/namenode/FSEditLogLoader.java | 4 +- .../hdfs/server/namenode/FSEditLogOp.java | 354 ++++++--- .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 3 +- .../hdfs/util/ByteBufferOutputStream.java | 49 -- .../hadoop-hdfs/src/site/markdown/WebHDFS.md | 25 + .../hadoop/hdfs/TestBlockReaderLocal.java | 30 +- .../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 2 +- .../hdfs/TestClientBlockVerification.java | 4 +- .../hadoop/hdfs/TestDFSClientRetries.java | 2 +- .../apache/hadoop/hdfs/TestDecommission.java | 15 +- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 4 +- .../security/token/block/TestBlockToken.java | 10 +- .../blockmanagement/TestBlockInfoStriped.java | 30 - .../blockmanagement/TestDatanodeManager.java | 103 ++- .../blockmanagement/TestHostFileManager.java | 7 +- .../blockmanagement/TestReplicationPolicy.java | 26 +- .../hdfs/server/namenode/TestEditLog.java | 2 +- .../namenode/TestEditLogFileInputStream.java | 80 ++ .../namenode/TestProtectedDirectories.java | 373 +++++++++ .../shortcircuit/TestShortCircuitLocalRead.java | 4 +- .../hadoop/hdfs/web/TestWebHDFSOAuth2.java | 216 ++++++ .../hdfs/web/oauth2/TestAccessTokenTimer.java | 63 ++ ...ClientCredentialTimeBasedTokenRefresher.java | 138 ++++ ...TestRefreshTokenTimeBasedTokenRefresher.java | 138 ++++ hadoop-project/src/site/site.xml | 1 + .../org/apache/hadoop/fs/s3a/Constants.java | 4 +- .../src/site/markdown/tools/hadoop-aws/index.md | 4 +- hadoop-yarn-project/CHANGES.txt | 10 +- .../hadoop/yarn/client/TestRMFailover.java | 27 + .../hadoop/yarn/webapp/YarnWebParams.java | 1 + .../scheduler/capacity/AbstractCSQueue.java | 27 + .../scheduler/capacity/CSQueue.java | 26 + .../scheduler/capacity/CapacityScheduler.java | 40 +- .../scheduler/capacity/LeafQueue.java | 16 + .../scheduler/common/fica/FiCaSchedulerApp.java | 9 + .../resourcemanager/webapp/RMWebAppFilter.java | 90 ++- .../TestCapacitySchedulerNodeLabelUpdate.java | 249 +++++- .../src/site/markdown/NodeLabel.md | 140 ++++ 135 files changed, 8407 insertions(+), 5608 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 0000000,aa3e8ba..8f988af mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@@ -1,0 -1,102 +1,110 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + ++import java.io.Closeable; + import java.io.IOException; + import java.util.EnumSet; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.fs.ByteBufferReadable; + import org.apache.hadoop.fs.ReadOption; + import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; ++import org.apache.hadoop.util.DataChecksum; + + /** + * A BlockReader is responsible for reading a single block + * from a single datanode. + */ + @InterfaceAudience.Private -public interface BlockReader extends ByteBufferReadable { ++public interface BlockReader extends ByteBufferReadable, Closeable { + + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + * Note: this must return -1 on EOF, even in the case of a 0-byte read. + * See HDFS-5762 for details. + */ + int read(byte[] buf, int off, int len) throws IOException; + + /** + * Skip the given number of bytes + */ + long skip(long n) throws IOException; + + /** + * Returns an estimate of the number of bytes that can be read + * (or skipped over) from this input stream without performing + * network I/O. + * This may return more than what is actually present in the block. + */ + int available() throws IOException; + + /** + * Close the block reader. + * + * @throws IOException + */ ++ @Override // java.io.Closeable + void close() throws IOException; + + /** + * Read exactly the given amount of data, throwing an exception + * if EOF is reached before that amount + */ + void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException; + + /** + * Similar to {@link #readFully(byte[], int, int)} except that it will + * not throw an exception on EOF. However, it differs from the simple + * {@link #read(byte[], int, int)} call in that it is guaranteed to + * read the data if it is available. In other words, if this call + * does not throw an exception, then either the buffer has been + * filled or the next call will return EOF. + */ + int readAll(byte[] buf, int offset, int len) throws IOException; + + /** + * @return true only if this is a local read. + */ + boolean isLocal(); + + /** + * @return true only if this is a short-circuit read. + * All short-circuit reads are also local. + */ + boolean isShortCircuit(); + + /** + * Get a ClientMmap object for this BlockReader. + * + * @param opts The read options to use. + * @return The ClientMmap object, or null if mmap is not + * supported. + */ + ClientMmap getClientMmap(EnumSet opts); ++ ++ /** ++ * @return The DataChecksum used by the read block ++ */ ++ DataChecksum getDataChecksum(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 0000000,2a0e21b..8d7f294 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@@ -1,0 -1,743 +1,748 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.IOException; + import java.nio.ByteBuffer; + import java.nio.channels.FileChannel; + import java.util.EnumSet; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.fs.ReadOption; + import org.apache.hadoop.fs.StorageType; + import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; + import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; + import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; + import org.apache.hadoop.util.DataChecksum; + import org.apache.hadoop.util.DirectBufferPool; + import org.apache.htrace.Sampler; + import org.apache.htrace.Trace; + import org.apache.htrace.TraceScope; + + import com.google.common.annotations.VisibleForTesting; + import com.google.common.base.Preconditions; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * BlockReaderLocal enables local short circuited reads. If the DFS client is on + * the same machine as the datanode, then the client can read files directly + * from the local file system rather than going through the datanode for better + * performance.
+ * {@link BlockReaderLocal} works as follows: + *
    + *
  • The client performing short circuit reads must be configured at the + * datanode.
  • + *
  • The client gets the file descriptors for the metadata file and the data + * file for the block using + * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}. + *
  • + *
  • The client reads the file descriptors.
  • + *
+ */ + @InterfaceAudience.Private + class BlockReaderLocal implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class); + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + public static class Builder { + private final int bufferSize; + private boolean verifyChecksum; + private int maxReadahead; + private String filename; + private ShortCircuitReplica replica; + private long dataPos; + private ExtendedBlock block; + private StorageType storageType; + + public Builder(ShortCircuitConf conf) { + this.maxReadahead = Integer.MAX_VALUE; + this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); + this.bufferSize = conf.getShortCircuitBufferSize(); + } + + public Builder setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public Builder setCachingStrategy(CachingStrategy cachingStrategy) { + long readahead = cachingStrategy.getReadahead() != null ? + cachingStrategy.getReadahead() : + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; + this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); + return this; + } + + public Builder setFilename(String filename) { + this.filename = filename; + return this; + } + + public Builder setShortCircuitReplica(ShortCircuitReplica replica) { + this.replica = replica; + return this; + } + + public Builder setStartOffset(long startOffset) { + this.dataPos = Math.max(0, startOffset); + return this; + } + + public Builder setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public Builder setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + + public BlockReaderLocal build() { + Preconditions.checkNotNull(replica); + return new BlockReaderLocal(this); + } + } + + private boolean closed = false; + + /** + * Pair of streams for this block. + */ + private final ShortCircuitReplica replica; + + /** + * The data FileChannel. + */ + private final FileChannel dataIn; + + /** + * The next place we'll read from in the block data FileChannel. + * + * If data is buffered in dataBuf, this offset will be larger than the + * offset of the next byte which a read() operation will give us. + */ + private long dataPos; + + /** + * The Checksum FileChannel. + */ + private final FileChannel checksumIn; + + /** + * Checksum type and size. + */ + private final DataChecksum checksum; + + /** + * If false, we will always skip the checksum. + */ + private final boolean verifyChecksum; + + /** + * Name of the block, for logging purposes. + */ + private final String filename; + + /** + * Block ID and Block Pool ID. + */ + private final ExtendedBlock block; + + /** + * Cache of Checksum#bytesPerChecksum. + */ + private final int bytesPerChecksum; + + /** + * Cache of Checksum#checksumSize. + */ + private final int checksumSize; + + /** + * Maximum number of chunks to allocate. + * + * This is used to allocate dataBuf and checksumBuf, in the event that + * we need them. + */ + private final int maxAllocatedChunks; + + /** + * True if zero readahead was requested. + */ + private final boolean zeroReadaheadRequested; + + /** + * Maximum amount of readahead we'll do. This will always be at least the, + * size of a single chunk, even if {@link #zeroReadaheadRequested} is true. + * The reason is because we need to do a certain amount of buffering in order + * to do checksumming. + * + * This determines how many bytes we'll use out of dataBuf and checksumBuf. + * Why do we allocate buffers, and then (potentially) only use part of them? + * The rationale is that allocating a lot of buffers of different sizes would + * make it very difficult for the DirectBufferPool to re-use buffers. + */ + private final int maxReadaheadLength; + + /** + * Buffers data starting at the current dataPos and extending on + * for dataBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer dataBuf; + + /** + * Buffers checksums starting at the current checksumPos and extending on + * for checksumBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer checksumBuf; + + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + private BlockReaderLocal(Builder builder) { + this.replica = builder.replica; + this.dataIn = replica.getDataStream().getChannel(); + this.dataPos = builder.dataPos; + this.checksumIn = replica.getMetaStream().getChannel(); + BlockMetadataHeader header = builder.replica.getMetaHeader(); + this.checksum = header.getChecksum(); + this.verifyChecksum = builder.verifyChecksum && + (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); + this.filename = builder.filename; + this.block = builder.block; + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + this.checksumSize = checksum.getChecksumSize(); + + this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : + ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); + // Calculate the effective maximum readahead. + // We can't do more readahead than there is space in the buffer. + int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : + ((Math.min(builder.bufferSize, builder.maxReadahead) + + bytesPerChecksum - 1) / bytesPerChecksum); + if (maxReadaheadChunks == 0) { + this.zeroReadaheadRequested = true; + maxReadaheadChunks = 1; + } else { + this.zeroReadaheadRequested = false; + } + this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; + this.storageType = builder.storageType; + } + + private synchronized void createDataBufIfNeeded() { + if (dataBuf == null) { + dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); + dataBuf.position(0); + dataBuf.limit(0); + } + } + + private synchronized void freeDataBufIfExists() { + if (dataBuf != null) { + // When disposing of a dataBuf, we have to move our stored file index + // backwards. + dataPos -= dataBuf.remaining(); + dataBuf.clear(); + bufferPool.returnBuffer(dataBuf); + dataBuf = null; + } + } + + private synchronized void createChecksumBufIfNeeded() { + if (checksumBuf == null) { + checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); + checksumBuf.position(0); + checksumBuf.limit(0); + } + } + + private synchronized void freeChecksumBufIfExists() { + if (checksumBuf != null) { + checksumBuf.clear(); + bufferPool.returnBuffer(checksumBuf); + checksumBuf = null; + } + } + + private synchronized int drainDataBuf(ByteBuffer buf) { + if (dataBuf == null) return -1; + int oldLimit = dataBuf.limit(); + int nRead = Math.min(dataBuf.remaining(), buf.remaining()); + if (nRead == 0) { + return (dataBuf.remaining() == 0) ? -1 : 0; + } + try { + dataBuf.limit(dataBuf.position() + nRead); + buf.put(dataBuf); + } finally { + dataBuf.limit(oldLimit); + } + return nRead; + } + + /** + * Read from the block file into a buffer. + * + * This function overwrites checksumBuf. It will increment dataPos. + * + * @param buf The buffer to read into. May be dataBuf. + * The position and limit of this buffer should be set to + * multiples of the checksum size. + * @param canSkipChecksum True if we can skip checksumming. + * + * @return Total bytes read. 0 on EOF. + */ + private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) + throws IOException { + TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" + + block.getBlockId() + ")", Sampler.NEVER); + try { + int total = 0; + long startDataPos = dataPos; + int startBufPos = buf.position(); + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead < 0) { + break; + } + dataPos += nRead; + total += nRead; + } + if (canSkipChecksum) { + freeChecksumBufIfExists(); + return total; + } + if (total > 0) { + try { + buf.limit(buf.position()); + buf.position(startBufPos); + createChecksumBufIfNeeded(); + int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuf.clear(); + checksumBuf.limit(checksumsNeeded * checksumSize); + long checksumPos = BlockMetadataHeader.getHeaderSize() + + ((startDataPos / bytesPerChecksum) * checksumSize); + while (checksumBuf.hasRemaining()) { + int nRead = checksumIn.read(checksumBuf, checksumPos); + if (nRead < 0) { + throw new IOException("Got unexpected checksum file EOF at " + + checksumPos + ", block file position " + startDataPos + " for " + + "block " + block + " of file " + filename); + } + checksumPos += nRead; + } + checksumBuf.flip(); + + checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); + } finally { + buf.position(buf.limit()); + } + } + return total; + } finally { + scope.close(); + } + } + + private boolean createNoChecksumContext() { + if (verifyChecksum) { + if (storageType != null && storageType.isTransient()) { + // Checksums are not stored for replicas on transient storage. We do not + // anchor, because we do not intend for client activity to block eviction + // from transient storage on the DataNode side. + return true; + } else { + return replica.addNoChecksumAnchor(); + } + } else { + return true; + } + } + + private void releaseNoChecksumContext() { + if (verifyChecksum) { + if (storageType == null || !storageType.isTransient()) { + replica.removeNoChecksumAnchor(); + } + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + try { + String traceString = null; + if (LOG.isTraceEnabled()) { + traceString = new StringBuilder(). + append("read("). + append("buf.remaining=").append(buf.remaining()). + append(", block=").append(block). + append(", filename=").append(filename). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.info(traceString + ": starting"); + } + int nRead; + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(buf); + } else { + nRead = readWithBounceBuffer(buf, canSkipChecksum); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.info(traceString + ": I/O error", e); + } + throw e; + } + if (LOG.isTraceEnabled()) { + LOG.info(traceString + ": returning " + nRead); + } + return nRead; + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + } + + private synchronized int readWithoutBounceBuffer(ByteBuffer buf) + throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int total = 0; + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead <= 0) break; + dataPos += nRead; + total += nRead; + } + return (total == 0 && (dataPos == dataIn.size())) ? -1 : total; + } + + /** + * Fill the data buffer. If necessary, validate the data against the + * checksums. + * + * We always want the offsets of the data contained in dataBuf to be + * aligned to the chunk boundary. If we are validating checksums, we + * accomplish this by seeking backwards in the file until we're on a + * chunk boundary. (This is necessary because we can't checksum a + * partial chunk.) If we are not validating checksums, we simply only + * fill the latter part of dataBuf. + * + * @param canSkipChecksum true if we can skip checksumming. + * @return true if we hit EOF. + * @throws IOException + */ + private synchronized boolean fillDataBuf(boolean canSkipChecksum) + throws IOException { + createDataBufIfNeeded(); + final int slop = (int)(dataPos % bytesPerChecksum); + final long oldDataPos = dataPos; + dataBuf.limit(maxReadaheadLength); + if (canSkipChecksum) { + dataBuf.position(slop); + fillBuffer(dataBuf, canSkipChecksum); + } else { + dataPos -= slop; + dataBuf.position(0); + fillBuffer(dataBuf, canSkipChecksum); + } + dataBuf.limit(dataBuf.position()); + dataBuf.position(Math.min(dataBuf.position(), slop)); + if (LOG.isTraceEnabled()) { + LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " + + "buffer from offset " + oldDataPos + " of " + block); + } + return dataBuf.limit() != maxReadaheadLength; + } + + /** + * Read using the bounce buffer. + * + * A 'direct' read actually has three phases. The first drains any + * remaining bytes from the slow read buffer. After this the read is + * guaranteed to be on a checksum chunk boundary. If there are still bytes + * to read, the fast direct path is used for as many remaining bytes as + * possible, up to a multiple of the checksum chunk size. Finally, any + * 'odd' bytes remaining at the end of the read cause another slow read to + * be issued, which involves an extra copy. + * + * Every 'slow' read tries to fill the slow read buffer in one go for + * efficiency's sake. As described above, all non-checksum-chunk-aligned + * reads will be served from the slower read path. + * + * @param buf The buffer to read into. + * @param canSkipChecksum True if we can skip checksums. + */ + private synchronized int readWithBounceBuffer(ByteBuffer buf, + boolean canSkipChecksum) throws IOException { + int total = 0; + int bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + total += bb; + if (buf.remaining() == 0) return total; + } + boolean eof = true, done = false; + do { + if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) + && ((dataPos % bytesPerChecksum) == 0)) { + // Fast lane: try to read directly into user-supplied buffer, bypassing + // bounce buffer. + int oldLimit = buf.limit(); + int nRead; + try { + buf.limit(buf.position() + maxReadaheadLength); + nRead = fillBuffer(buf, canSkipChecksum); + } finally { + buf.limit(oldLimit); + } + if (nRead < maxReadaheadLength) { + done = true; + } + if (nRead > 0) { + eof = false; + } + total += nRead; + } else { + // Slow lane: refill bounce buffer. + if (fillDataBuf(canSkipChecksum)) { + done = true; + } + bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + eof = false; + total += bb; + } + } + } while ((!done) && (buf.remaining() > 0)); + return (eof && total == 0) ? -1 : total; + } + + @Override + public synchronized int read(byte[] arr, int off, int len) + throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + int nRead; + try { + String traceString = null; + if (LOG.isTraceEnabled()) { + traceString = new StringBuilder(). + append("read(arr.length=").append(arr.length). + append(", off=").append(off). + append(", len=").append(len). + append(", filename=").append(filename). + append(", block=").append(block). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.trace(traceString + ": starting"); + } + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(arr, off, len); + } else { + nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace(traceString + ": I/O error", e); + } + throw e; + } + if (LOG.isTraceEnabled()) { + LOG.trace(traceString + ": returning " + nRead); + } + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + return nRead; + } + + private synchronized int readWithoutBounceBuffer(byte arr[], int off, + int len) throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); + if (nRead > 0) { + dataPos += nRead; + } else if ((nRead == 0) && (dataPos == dataIn.size())) { + return -1; + } + return nRead; + } + + private synchronized int readWithBounceBuffer(byte arr[], int off, int len, + boolean canSkipChecksum) throws IOException { + createDataBufIfNeeded(); + if (!dataBuf.hasRemaining()) { + dataBuf.position(0); + dataBuf.limit(maxReadaheadLength); + fillDataBuf(canSkipChecksum); + } + if (dataBuf.remaining() == 0) return -1; + int toRead = Math.min(dataBuf.remaining(), len); + dataBuf.get(arr, off, toRead); + return toRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + int discardedFromBuf = 0; + long remaining = n; + if ((dataBuf != null) && dataBuf.hasRemaining()) { + discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); + dataBuf.position(dataBuf.position() + discardedFromBuf); + remaining -= discardedFromBuf; + } + if (LOG.isTraceEnabled()) { + LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + + filename + "): discarded " + discardedFromBuf + " bytes from " + + "dataBuf and advanced dataPos by " + remaining); + } + dataPos += remaining; + return n; + } + + @Override + public int available() throws IOException { + // We never do network I/O in BlockReaderLocal. + return Integer.MAX_VALUE; + } + + @Override + public synchronized void close() throws IOException { + if (closed) return; + closed = true; + if (LOG.isTraceEnabled()) { + LOG.trace("close(filename=" + filename + ", block=" + block + ")"); + } + replica.unref(); + freeDataBufIfExists(); + freeChecksumBufIfExists(); + } + + @Override + public synchronized void readFully(byte[] arr, int off, int len) + throws IOException { + BlockReaderUtil.readFully(this, arr, off, len); + } + + @Override + public synchronized int readAll(byte[] buf, int off, int len) + throws IOException { + return BlockReaderUtil.readAll(this, buf, off, len); + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + /** + * Get or create a memory map for this replica. + * + * There are two kinds of ClientMmap objects we could fetch here: one that + * will always read pre-checksummed data, and one that may read data that + * hasn't been checksummed. + * + * If we fetch the former, "safe" kind of ClientMmap, we have to increment + * the anchor count on the shared memory slot. This will tell the DataNode + * not to munlock the block until this ClientMmap is closed. + * If we fetch the latter, we don't bother with anchoring. + * + * @param opts The options to use, such as SKIP_CHECKSUMS. + * + * @return null on failure; the ClientMmap otherwise. + */ + @Override + public ClientMmap getClientMmap(EnumSet opts) { + boolean anchor = verifyChecksum && + (opts.contains(ReadOption.SKIP_CHECKSUMS) == false); + if (anchor) { + if (!createNoChecksumContext()) { + if (LOG.isTraceEnabled()) { + LOG.trace("can't get an mmap for " + block + " of " + filename + + " since SKIP_CHECKSUMS was not given, " + + "we aren't skipping checksums, and the block is not mlocked."); + } + return null; + } + } + ClientMmap clientMmap = null; + try { + clientMmap = replica.getOrCreateClientMmap(anchor); + } finally { + if ((clientMmap == null) && anchor) { + releaseNoChecksumContext(); + } + } + return clientMmap; + } + + @VisibleForTesting + boolean getVerifyChecksum() { + return this.verifyChecksum; + } + + @VisibleForTesting + int getMaxReadaheadLength() { + return this.maxReadaheadLength; + } + + /** + * Make the replica anchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceAnchorable() { + replica.getSlot().makeAnchorable(); + } + + /** + * Make the replica unanchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceUnanchorable() { + replica.getSlot().makeUnanchorable(); + } ++ ++ @Override ++ public DataChecksum getDataChecksum() { ++ return checksum; ++ } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index 0000000,eea3f06..9920438 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@@ -1,0 -1,738 +1,743 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.DataInputStream; + import java.io.File; + import java.io.FileInputStream; + import java.io.IOException; + import java.nio.ByteBuffer; + import java.security.PrivilegedExceptionAction; + import java.util.Collections; + import java.util.EnumSet; + import java.util.HashMap; + import java.util.LinkedHashMap; + import java.util.Map; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.ReadOption; + import org.apache.hadoop.fs.StorageType; + import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + import org.apache.hadoop.hdfs.client.impl.DfsClientConf; + import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; + import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; + import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; + import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; + import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; + import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + import org.apache.hadoop.hdfs.util.IOUtilsClient; + import org.apache.hadoop.io.IOUtils; + import org.apache.hadoop.ipc.RPC; + import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hadoop.security.token.Token; + import org.apache.hadoop.util.DataChecksum; + import org.apache.hadoop.util.DirectBufferPool; + import org.apache.htrace.Sampler; + import org.apache.htrace.Trace; + import org.apache.htrace.TraceScope; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on + * the same machine as the datanode, then the client can read files directly + * from the local file system rather than going through the datanode for better + * performance.
+ * + * This is the legacy implementation based on HDFS-2246, which requires + * permissions on the datanode to be set so that clients can directly access the + * blocks. The new implementation based on HDFS-347 should be preferred on UNIX + * systems where the required native code has been implemented.
+ * + * {@link BlockReaderLocalLegacy} works as follows: + *
    + *
  • The client performing short circuit reads must be configured at the + * datanode.
  • + *
  • The client gets the path to the file where block is stored using + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)} + * RPC call
  • + *
  • Client uses kerberos authentication to connect to the datanode over RPC, + * if security is enabled.
  • + *
+ */ + @InterfaceAudience.Private + class BlockReaderLocalLegacy implements BlockReader { + private static final Logger LOG = LoggerFactory.getLogger( + BlockReaderLocalLegacy.class); + + //Stores the cache and proxy for a local datanode. + private static class LocalDatanodeInfo { + private ClientDatanodeProtocol proxy = null; + private final Map cache; + + LocalDatanodeInfo() { + final int cacheSize = 10000; + final float hashTableLoadFactor = 0.75f; + int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; + cache = Collections + .synchronizedMap(new LinkedHashMap( + hashTableCapacity, hashTableLoadFactor, true) { + private static final long serialVersionUID = 1; + + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > cacheSize; + } + }); + } + + private synchronized ClientDatanodeProtocol getDatanodeProxy( + UserGroupInformation ugi, final DatanodeInfo node, + final Configuration conf, final int socketTimeout, + final boolean connectToDnViaHostname) throws IOException { + if (proxy == null) { + try { + proxy = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf, + socketTimeout, connectToDnViaHostname); + } + }); + } catch (InterruptedException e) { + LOG.warn("encountered exception ", e); + } + } + return proxy; + } + + private synchronized void resetDatanodeProxy() { + if (null != proxy) { + RPC.stopProxy(proxy); + proxy = null; + } + } + + private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { + return cache.get(b); + } + + private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) { + cache.put(b, info); + } + + private void removeBlockLocalPathInfo(ExtendedBlock b) { + cache.remove(b); + } + } + + // Multiple datanodes could be running on the local machine. Store proxies in + // a map keyed by the ipc port of the datanode. + private static final Map localDatanodeInfoMap = new HashMap(); + + private final FileInputStream dataIn; // reader for the data file + private final FileInputStream checksumIn; // reader for the checksum file + + /** + * Offset from the most recent chunk boundary at which the next read should + * take place. Is only set to non-zero at construction time, and is + * decremented (usually to 0) by subsequent reads. This avoids having to do a + * checksum read at construction to position the read cursor correctly. + */ + private int offsetFromChunkBoundary; + + private byte[] skipBuf = null; + + /** + * Used for checksummed reads that need to be staged before copying to their + * output buffer because they are either a) smaller than the checksum chunk + * size or b) issued by the slower read(byte[]...) path + */ + private ByteBuffer slowReadBuff = null; + private ByteBuffer checksumBuff = null; + private DataChecksum checksum; + private final boolean verifyChecksum; + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + private final int bytesPerChecksum; + private final int checksumSize; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + private long blockId; + + /** + * The only way this object can be instantiated. + */ + static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, + UserGroupInformation userGroupInformation, + Configuration configuration, String file, ExtendedBlock blk, + Token token, DatanodeInfo node, + long startOffset, long length, StorageType storageType) + throws IOException { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node + .getIpcPort()); + // check the cache first + BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); + if (pathinfo == null) { + if (userGroupInformation == null) { + userGroupInformation = UserGroupInformation.getCurrentUser(); + } + pathinfo = getBlockPathInfo(userGroupInformation, blk, node, + configuration, conf.getSocketTimeout(), token, + conf.isConnectToDnViaHostname(), storageType); + } + + // check to see if the file exists. It may so happen that the + // HDFS file has been deleted and this block-lookup is occurring + // on behalf of a new HDFS file. This time, the block file could + // be residing in a different portion of the fs.data.dir directory. + // In this case, we remove this entry from the cache. The next + // call to this method will re-populate the cache. + FileInputStream dataIn = null; + FileInputStream checksumIn = null; + BlockReaderLocalLegacy localBlockReader = null; + final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() + || storageType.isTransient(); + try { + // get a local file system + File blkfile = new File(pathinfo.getBlockPath()); + dataIn = new FileInputStream(blkfile); + + if (LOG.isDebugEnabled()) { + LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size " + + blkfile.length() + " startOffset " + startOffset + " length " + + length + " short circuit checksum " + !skipChecksumCheck); + } + + if (!skipChecksumCheck) { + // get the metadata file + File metafile = new File(pathinfo.getMetaPath()); + checksumIn = new FileInputStream(metafile); + + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + new DataInputStream(checksumIn), blk); + long firstChunkOffset = startOffset + - (startOffset % checksum.getBytesPerChecksum()); + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, + startOffset, length, pathinfo, checksum, true, dataIn, + firstChunkOffset, checksumIn); + } else { + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, + startOffset, length, pathinfo, dataIn); + } + } catch (IOException e) { + // remove from cache + localDatanodeInfo.removeBlockLocalPathInfo(blk); + LOG.warn("BlockReaderLocalLegacy: Removing " + blk + + " from cache because local file " + pathinfo.getBlockPath() + + " could not be opened."); + throw e; + } finally { + if (localBlockReader == null) { + if (dataIn != null) { + dataIn.close(); + } + if (checksumIn != null) { + checksumIn.close(); + } + } + } + return localBlockReader; + } + + private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) { + LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port); + if (ldInfo == null) { + ldInfo = new LocalDatanodeInfo(); + localDatanodeInfoMap.put(port, ldInfo); + } + return ldInfo; + } + + private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, + ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, + Token token, boolean connectToDnViaHostname, + StorageType storageType) throws IOException { + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); + BlockLocalPathInfo pathinfo = null; + ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, + conf, timeout, connectToDnViaHostname); + try { + // make RPC to local datanode to find local pathnames of blocks + pathinfo = proxy.getBlockLocalPathInfo(blk, token); + // We cannot cache the path information for a replica on transient storage. + // If the replica gets evicted, then it moves to a different path. Then, + // our next attempt to read from the cached path would fail to find the + // file. Additionally, the failure would cause us to disable legacy + // short-circuit read for all subsequent use in the ClientContext. Unlike + // the newer short-circuit read implementation, we have no communication + // channel for the DataNode to notify the client that the path has been + // invalidated. Therefore, our only option is to skip caching. + if (pathinfo != null && !storageType.isTransient()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cached location of block " + blk + " as " + pathinfo); + } + localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); + } + } catch (IOException e) { + localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error + throw e; + } + return pathinfo; + } + + private static int getSlowReadBufferNumChunks(int bufferSizeBytes, + int bytesPerChecksum) { + if (bufferSizeBytes < bytesPerChecksum) { + throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + + "buffer size (" + bufferSizeBytes + ") is not large enough to hold " + + "a single chunk (" + bytesPerChecksum + "). Please configure " + + HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY + + " appropriately"); + } + + // Round down to nearest chunk size + return bufferSizeBytes / bytesPerChecksum; + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, Token token, long startOffset, + long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) + throws IOException { + this(conf, hdfsfile, block, token, startOffset, length, pathinfo, + DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false, + dataIn, startOffset, null); + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, Token token, long startOffset, + long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, + boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, + FileInputStream checksumIn) throws IOException { + this.filename = hdfsfile; + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max(startOffset, 0); + this.blockId = block.getBlockId(); + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + + this.dataIn = dataIn; + this.checksumIn = checksumIn; + this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); + + final int chunksPerChecksumRead = getSlowReadBufferNumChunks( + conf.getShortCircuitBufferSize(), bytesPerChecksum); + slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); + checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); + // Initially the buffers have nothing to read. + slowReadBuff.flip(); + checksumBuff.flip(); + boolean success = false; + try { + // Skip both input streams to beginning of the chunk containing startOffset + IOUtils.skipFully(dataIn, firstChunkOffset); + if (checksumIn != null) { + long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + } + success = true; + } finally { + if (!success) { + bufferPool.returnBuffer(slowReadBuff); + bufferPool.returnBuffer(checksumBuff); + } + } + } + + /** + * Reads bytes into a buffer until EOF or the buffer's limit is reached + */ + private int fillBuffer(FileInputStream stream, ByteBuffer buf) + throws IOException { + TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" + + blockId + ")", Sampler.NEVER); + try { + int bytesRead = stream.getChannel().read(buf); + if (bytesRead < 0) { + //EOF + return bytesRead; + } + while (buf.remaining() > 0) { + int n = stream.getChannel().read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } finally { + scope.close(); + } + } + + /** + * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into + * another. + */ + private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { + int oldLimit = from.limit(); + from.limit(from.position() + length); + try { + to.put(from); + } finally { + from.limit(oldLimit); + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int nRead = 0; + if (verifyChecksum) { + // A 'direct' read actually has three phases. The first drains any + // remaining bytes from the slow read buffer. After this the read is + // guaranteed to be on a checksum chunk boundary. If there are still bytes + // to read, the fast direct path is used for as many remaining bytes as + // possible, up to a multiple of the checksum chunk size. Finally, any + // 'odd' bytes remaining at the end of the read cause another slow read to + // be issued, which involves an extra copy. + + // Every 'slow' read tries to fill the slow read buffer in one go for + // efficiency's sake. As described above, all non-checksum-chunk-aligned + // reads will be served from the slower read path. + + if (slowReadBuff.hasRemaining()) { + // There are remaining bytes from a small read available. This usually + // means this read is unaligned, which falls back to the slow path. + int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + + if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { + // Since we have drained the 'small read' buffer, we are guaranteed to + // be chunk-aligned + int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); + + // There's only enough checksum buffer space available to checksum one + // entire slow read buffer. This saves keeping the number of checksum + // chunks around. + len = Math.min(len, slowReadBuff.capacity()); + int oldlimit = buf.limit(); + buf.limit(buf.position() + len); + int readResult = 0; + try { + readResult = doByteBufferRead(buf); + } finally { + buf.limit(oldlimit); + } + if (readResult == -1) { + return nRead; + } else { + nRead += readResult; + buf.position(buf.position() + readResult); + } + } + + // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read + // until chunk boundary + if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) { + int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary); + int readResult = fillSlowReadBuffer(toRead); + if (readResult == -1) { + return nRead; + } else { + int fromSlowReadBuff = Math.min(readResult, buf.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + } + } else { + // Non-checksummed reads are much easier; we can just fill the buffer directly. + nRead = doByteBufferRead(buf); + if (nRead > 0) { + buf.position(buf.position() + nRead); + } + } + return nRead; + } + + /** + * Tries to read as many bytes as possible into supplied buffer, checksumming + * each chunk if needed. + * + * Preconditions: + *
    + *
  • + * If checksumming is enabled, buf.remaining must be a multiple of + * bytesPerChecksum. Note that this is not a requirement for clients of + * read(ByteBuffer) - in the case of non-checksum-sized read requests, + * read(ByteBuffer) will substitute a suitably sized buffer to pass to this + * method. + *
  • + *
+ * Postconditions: + *
    + *
  • buf.limit and buf.mark are unchanged.
  • + *
  • buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the + * requested bytes can be read straight from the buffer
  • + *
+ * + * @param buf + * byte buffer to write bytes to. If checksums are not required, buf + * can have any number of bytes remaining, otherwise there must be a + * multiple of the checksum chunk size remaining. + * @return max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0) + * that is, the the number of useful bytes (up to the amount + * requested) readable from the buffer by the client. + */ + private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { + if (verifyChecksum) { + assert buf.remaining() % bytesPerChecksum == 0; + } + int dataRead = -1; + + int oldpos = buf.position(); + // Read as much as we can into the buffer. + dataRead = fillBuffer(dataIn, buf); + + if (dataRead == -1) { + return -1; + } + + if (verifyChecksum) { + ByteBuffer toChecksum = buf.duplicate(); + toChecksum.position(oldpos); + toChecksum.limit(oldpos + dataRead); + + checksumBuff.clear(); + // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); + int numChunks = + (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuff.limit(checksumSize * numChunks); + + fillBuffer(checksumIn, checksumBuff); + checksumBuff.flip(); + + checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, + this.startOffset); + } + + if (dataRead >= 0) { + buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); + } + + if (dataRead < offsetFromChunkBoundary) { + // yikes, didn't even get enough bytes to honour offset. This can happen + // even if we are verifying checksums if we are at EOF. + offsetFromChunkBoundary -= dataRead; + dataRead = 0; + } else { + dataRead -= offsetFromChunkBoundary; + offsetFromChunkBoundary = 0; + } + + return dataRead; + } + + /** + * Ensures that up to len bytes are available and checksummed in the slow read + * buffer. The number of bytes available to read is returned. If the buffer is + * not already empty, the number of remaining bytes is returned and no actual + * read happens. + * + * @param len + * the maximum number of bytes to make available. After len bytes + * are read, the underlying bytestream must be at a checksum + * boundary, or EOF. That is, (len + currentPosition) % + * bytesPerChecksum == 0. + * @return the number of bytes available to read, or -1 if EOF. + */ + private synchronized int fillSlowReadBuffer(int len) throws IOException { + int nRead = -1; + if (slowReadBuff.hasRemaining()) { + // Already got data, good to go. + nRead = Math.min(len, slowReadBuff.remaining()); + } else { + // Round a complete read of len bytes (plus any implicit offset) to the + // next chunk boundary, since we try and read in multiples of a chunk + int nextChunk = len + offsetFromChunkBoundary + + (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum)); + int limit = Math.min(nextChunk, slowReadBuff.capacity()); + assert limit % bytesPerChecksum == 0; + + slowReadBuff.clear(); + slowReadBuff.limit(limit); + + nRead = doByteBufferRead(slowReadBuff); + + if (nRead > 0) { + // So that next time we call slowReadBuff.hasRemaining(), we don't get a + // false positive. + slowReadBuff.limit(nRead + slowReadBuff.position()); + } + } + return nRead; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read off " + off + " len " + len); + } + if (!verifyChecksum) { + return dataIn.read(buf, off, len); + } + + int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); + + if (nRead > 0) { + // Possible that buffer is filled with a larger read than we need, since + // we tried to read as much as possible at once + nRead = Math.min(len, nRead); + slowReadBuff.get(buf, off, nRead); + } + + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("skip " + n); + } + if (n <= 0) { + return 0; + } + if (!verifyChecksum) { + return dataIn.skip(n); + } + + // caller made sure newPosition is not beyond EOF. + int remaining = slowReadBuff.remaining(); + int position = slowReadBuff.position(); + int newPosition = position + (int)n; + + // if the new offset is already read into dataBuff, just reposition + if (n <= remaining) { + assert offsetFromChunkBoundary == 0; + slowReadBuff.position(newPosition); + return n; + } + + // for small gap, read through to keep the data/checksum in sync + if (n - remaining <= bytesPerChecksum) { + slowReadBuff.position(position + remaining); + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + int ret = read(skipBuf, 0, (int)(n - remaining)); + return (remaining + ret); + } + + // optimize for big gap: discard the current buffer, skip to + // the beginning of the appropriate checksum chunk and then + // read to the middle of that chunk to be in sync with checksums. + + // We can't use this.offsetFromChunkBoundary because we need to know how + // many bytes of the offset were really read. Calling read(..) with a + // positive this.offsetFromChunkBoundary causes that many bytes to get + // silently skipped. + int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; + long toskip = n - remaining - myOffsetFromChunkBoundary; + + slowReadBuff.position(slowReadBuff.limit()); + checksumBuff.position(checksumBuff.limit()); + + IOUtils.skipFully(dataIn, toskip); + long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + + // read into the middle of the chunk + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + assert skipBuf.length == bytesPerChecksum; + assert myOffsetFromChunkBoundary < bytesPerChecksum; + + int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); + + if (ret == -1) { // EOS + return (toskip + remaining); + } else { + return (toskip + remaining + ret); + } + } + + @Override + public synchronized void close() throws IOException { + IOUtilsClient.cleanup(LOG, dataIn, checksumIn); + if (slowReadBuff != null) { + bufferPool.returnBuffer(slowReadBuff); + slowReadBuff = null; + } + if (checksumBuff != null) { + bufferPool.returnBuffer(checksumBuff); + checksumBuff = null; + } + startOffset = -1; + checksum = null; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + @Override + public int available() throws IOException { + // We never do network I/O in BlockReaderLocalLegacy. + return Integer.MAX_VALUE; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + @Override + public ClientMmap getClientMmap(EnumSet opts) { + return null; + } ++ ++ @Override ++ public DataChecksum getDataChecksum() { ++ return checksum; ++ } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java index 0000000,e135d8e..f908dd3 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java @@@ -1,0 -1,120 +1,126 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs; + + import java.io.IOException; + import java.nio.ByteBuffer; + import java.util.EnumSet; + + import org.apache.hadoop.classification.InterfaceAudience; + import org.apache.hadoop.fs.ReadOption; + import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; ++import org.apache.hadoop.util.DataChecksum; + + /** + * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from + * replicas. + */ + @InterfaceAudience.Private + public final class ExternalBlockReader implements BlockReader { + private final ReplicaAccessor accessor; + private final long visibleLength; + private long pos; + + ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, + long startOffset) { + this.accessor = accessor; + this.visibleLength = visibleLength; + this.pos = startOffset; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int nread = accessor.read(pos, buf, off, len); + pos += nread; + return nread; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int nread = accessor.read(pos, buf); + pos += nread; + return nread; + } + + @Override + public long skip(long n) throws IOException { + // You cannot skip backwards + if (n <= 0) { + return 0; + } + // You can't skip past the end of the replica. + long oldPos = pos; + pos += n; + if (pos > visibleLength) { + pos = visibleLength; + } + return pos - oldPos; + } + + @Override + public int available() throws IOException { + // We return the amount of bytes that we haven't read yet from the + // replica, based on our current position. Some of the other block + // readers return a shorter length than that. The only advantage to + // returning a shorter length is that the DFSInputStream will + // trash your block reader and create a new one if someone tries to + // seek() beyond the available() region. + long diff = visibleLength - pos; + if (diff > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int)diff; + } + } + + @Override + public void close() throws IOException { + accessor.close(); + } + + @Override + public void readFully(byte[] buf, int offset, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, offset, len); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public boolean isLocal() { + return accessor.isLocal(); + } + + @Override + public boolean isShortCircuit() { + return accessor.isShortCircuit(); + } + + @Override + public ClientMmap getClientMmap(EnumSet opts) { + // For now, pluggable ReplicaAccessors do not support zero-copy. + return null; + } ++ ++ @Override ++ public DataChecksum getDataChecksum() { ++ return null; ++ } + }