Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 802E318D3F for ; Fri, 28 Aug 2015 21:38:41 +0000 (UTC) Received: (qmail 57370 invoked by uid 500); 28 Aug 2015 21:38:40 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 57165 invoked by uid 500); 28 Aug 2015 21:38:40 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 56842 invoked by uid 99); 28 Aug 2015 21:38:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Aug 2015 21:38:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1CFD0E107B; Fri, 28 Aug 2015 21:38:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Date: Fri, 28 Aug 2015 21:38:43 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/4] hadoop git commit: HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu. HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e2c9b288 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e2c9b288 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e2c9b288 Branch: refs/heads/trunk Commit: e2c9b288b223b9fd82dc12018936e13128413492 Parents: b94b568 Author: Haohui Mai Authored: Fri Aug 28 14:20:55 2015 -0700 Committer: Haohui Mai Committed: Fri Aug 28 14:38:36 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/BlockReader.java | 102 +++ .../apache/hadoop/hdfs/BlockReaderLocal.java | 743 +++++++++++++++++++ .../hadoop/hdfs/BlockReaderLocalLegacy.java | 738 ++++++++++++++++++ .../org/apache/hadoop/hdfs/BlockReaderUtil.java | 57 ++ .../org/apache/hadoop/hdfs/ClientContext.java | 196 +++++ .../org/apache/hadoop/hdfs/DFSUtilClient.java | 68 ++ .../apache/hadoop/hdfs/ExternalBlockReader.java | 120 +++ .../apache/hadoop/hdfs/KeyProviderCache.java | 112 +++ .../java/org/apache/hadoop/hdfs/PeerCache.java | 291 ++++++++ .../hadoop/hdfs/client/BlockReportOptions.java | 59 ++ .../hdfs/client/HdfsClientConfigKeys.java | 5 + .../hdfs/protocol/BlockLocalPathInfo.java | 70 ++ .../hdfs/protocol/ClientDatanodeProtocol.java | 152 ++++ .../InvalidEncryptionKeyException.java | 40 + .../protocolPB/ClientDatanodeProtocolPB.java | 37 + .../ClientDatanodeProtocolTranslatorPB.java | 326 ++++++++ .../hadoop/hdfs/protocolPB/PBHelperClient.java | 13 + .../token/block/BlockTokenSelector.java | 48 ++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/BlockReader.java | 102 --- .../apache/hadoop/hdfs/BlockReaderLocal.java | 741 ------------------ .../hadoop/hdfs/BlockReaderLocalLegacy.java | 735 ------------------ .../org/apache/hadoop/hdfs/BlockReaderUtil.java | 57 -- .../org/apache/hadoop/hdfs/ClientContext.java | 195 ----- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 +- .../org/apache/hadoop/hdfs/DFSInputStream.java | 2 +- .../java/org/apache/hadoop/hdfs/DFSUtil.java | 67 +- .../apache/hadoop/hdfs/ExternalBlockReader.java | 120 --- .../apache/hadoop/hdfs/KeyProviderCache.java | 111 --- .../java/org/apache/hadoop/hdfs/PeerCache.java | 290 -------- .../hadoop/hdfs/client/BlockReportOptions.java | 59 -- .../hdfs/protocol/BlockLocalPathInfo.java | 70 -- .../hdfs/protocol/ClientDatanodeProtocol.java | 152 ---- .../InvalidEncryptionKeyException.java | 40 - .../hdfs/protocol/datatransfer/Receiver.java | 15 +- .../protocolPB/ClientDatanodeProtocolPB.java | 37 - ...tDatanodeProtocolServerSideTranslatorPB.java | 6 +- .../ClientDatanodeProtocolTranslatorPB.java | 326 -------- ...tNamenodeProtocolServerSideTranslatorPB.java | 14 +- .../DatanodeProtocolServerSideTranslatorPB.java | 2 +- ...rDatanodeProtocolServerSideTranslatorPB.java | 2 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 26 +- .../token/block/BlockTokenSelector.java | 48 -- .../hadoop/hdfs/server/datanode/DNConf.java | 4 +- .../org/apache/hadoop/hdfs/tools/DFSAdmin.java | 3 +- .../hadoop/hdfs/TestBlockReaderLocal.java | 30 +- .../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 2 +- .../hadoop/hdfs/TestDFSClientRetries.java | 2 +- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 4 +- .../security/token/block/TestBlockToken.java | 10 +- .../shortcircuit/TestShortCircuitLocalRead.java | 4 +- 51 files changed, 3243 insertions(+), 3227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java new file mode 100644 index 0000000..aa3e8ba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + +/** + * A BlockReader is responsible for reading a single block + * from a single datanode. + */ +@InterfaceAudience.Private +public interface BlockReader extends ByteBufferReadable { + + + /* same interface as inputStream java.io.InputStream#read() + * used by DFSInputStream#read() + * This violates one rule when there is a checksum error: + * "Read should not modify user buffer before successful read" + * because it first reads the data to user buffer and then checks + * the checksum. + * Note: this must return -1 on EOF, even in the case of a 0-byte read. + * See HDFS-5762 for details. + */ + int read(byte[] buf, int off, int len) throws IOException; + + /** + * Skip the given number of bytes + */ + long skip(long n) throws IOException; + + /** + * Returns an estimate of the number of bytes that can be read + * (or skipped over) from this input stream without performing + * network I/O. + * This may return more than what is actually present in the block. + */ + int available() throws IOException; + + /** + * Close the block reader. + * + * @throws IOException + */ + void close() throws IOException; + + /** + * Read exactly the given amount of data, throwing an exception + * if EOF is reached before that amount + */ + void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException; + + /** + * Similar to {@link #readFully(byte[], int, int)} except that it will + * not throw an exception on EOF. However, it differs from the simple + * {@link #read(byte[], int, int)} call in that it is guaranteed to + * read the data if it is available. In other words, if this call + * does not throw an exception, then either the buffer has been + * filled or the next call will return EOF. + */ + int readAll(byte[] buf, int offset, int len) throws IOException; + + /** + * @return true only if this is a local read. + */ + boolean isLocal(); + + /** + * @return true only if this is a short-circuit read. + * All short-circuit reads are also local. + */ + boolean isShortCircuit(); + + /** + * Get a ClientMmap object for this BlockReader. + * + * @param opts The read options to use. + * @return The ClientMmap object, or null if mmap is not + * supported. + */ + ClientMmap getClientMmap(EnumSet opts); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java new file mode 100644 index 0000000..2a0e21b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -0,0 +1,743 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BlockReaderLocal enables local short circuited reads. If the DFS client is on + * the same machine as the datanode, then the client can read files directly + * from the local file system rather than going through the datanode for better + * performance.
+ * {@link BlockReaderLocal} works as follows: + *
    + *
  • The client performing short circuit reads must be configured at the + * datanode.
  • + *
  • The client gets the file descriptors for the metadata file and the data + * file for the block using + * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}. + *
  • + *
  • The client reads the file descriptors.
  • + *
+ */ +@InterfaceAudience.Private +class BlockReaderLocal implements BlockReader { + static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class); + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + public static class Builder { + private final int bufferSize; + private boolean verifyChecksum; + private int maxReadahead; + private String filename; + private ShortCircuitReplica replica; + private long dataPos; + private ExtendedBlock block; + private StorageType storageType; + + public Builder(ShortCircuitConf conf) { + this.maxReadahead = Integer.MAX_VALUE; + this.verifyChecksum = !conf.isSkipShortCircuitChecksums(); + this.bufferSize = conf.getShortCircuitBufferSize(); + } + + public Builder setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public Builder setCachingStrategy(CachingStrategy cachingStrategy) { + long readahead = cachingStrategy.getReadahead() != null ? + cachingStrategy.getReadahead() : + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; + this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); + return this; + } + + public Builder setFilename(String filename) { + this.filename = filename; + return this; + } + + public Builder setShortCircuitReplica(ShortCircuitReplica replica) { + this.replica = replica; + return this; + } + + public Builder setStartOffset(long startOffset) { + this.dataPos = Math.max(0, startOffset); + return this; + } + + public Builder setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public Builder setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + + public BlockReaderLocal build() { + Preconditions.checkNotNull(replica); + return new BlockReaderLocal(this); + } + } + + private boolean closed = false; + + /** + * Pair of streams for this block. + */ + private final ShortCircuitReplica replica; + + /** + * The data FileChannel. + */ + private final FileChannel dataIn; + + /** + * The next place we'll read from in the block data FileChannel. + * + * If data is buffered in dataBuf, this offset will be larger than the + * offset of the next byte which a read() operation will give us. + */ + private long dataPos; + + /** + * The Checksum FileChannel. + */ + private final FileChannel checksumIn; + + /** + * Checksum type and size. + */ + private final DataChecksum checksum; + + /** + * If false, we will always skip the checksum. + */ + private final boolean verifyChecksum; + + /** + * Name of the block, for logging purposes. + */ + private final String filename; + + /** + * Block ID and Block Pool ID. + */ + private final ExtendedBlock block; + + /** + * Cache of Checksum#bytesPerChecksum. + */ + private final int bytesPerChecksum; + + /** + * Cache of Checksum#checksumSize. + */ + private final int checksumSize; + + /** + * Maximum number of chunks to allocate. + * + * This is used to allocate dataBuf and checksumBuf, in the event that + * we need them. + */ + private final int maxAllocatedChunks; + + /** + * True if zero readahead was requested. + */ + private final boolean zeroReadaheadRequested; + + /** + * Maximum amount of readahead we'll do. This will always be at least the, + * size of a single chunk, even if {@link #zeroReadaheadRequested} is true. + * The reason is because we need to do a certain amount of buffering in order + * to do checksumming. + * + * This determines how many bytes we'll use out of dataBuf and checksumBuf. + * Why do we allocate buffers, and then (potentially) only use part of them? + * The rationale is that allocating a lot of buffers of different sizes would + * make it very difficult for the DirectBufferPool to re-use buffers. + */ + private final int maxReadaheadLength; + + /** + * Buffers data starting at the current dataPos and extending on + * for dataBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer dataBuf; + + /** + * Buffers checksums starting at the current checksumPos and extending on + * for checksumBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer checksumBuf; + + /** + * StorageType of replica on DataNode. + */ + private StorageType storageType; + + private BlockReaderLocal(Builder builder) { + this.replica = builder.replica; + this.dataIn = replica.getDataStream().getChannel(); + this.dataPos = builder.dataPos; + this.checksumIn = replica.getMetaStream().getChannel(); + BlockMetadataHeader header = builder.replica.getMetaHeader(); + this.checksum = header.getChecksum(); + this.verifyChecksum = builder.verifyChecksum && + (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); + this.filename = builder.filename; + this.block = builder.block; + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + this.checksumSize = checksum.getChecksumSize(); + + this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : + ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); + // Calculate the effective maximum readahead. + // We can't do more readahead than there is space in the buffer. + int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : + ((Math.min(builder.bufferSize, builder.maxReadahead) + + bytesPerChecksum - 1) / bytesPerChecksum); + if (maxReadaheadChunks == 0) { + this.zeroReadaheadRequested = true; + maxReadaheadChunks = 1; + } else { + this.zeroReadaheadRequested = false; + } + this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; + this.storageType = builder.storageType; + } + + private synchronized void createDataBufIfNeeded() { + if (dataBuf == null) { + dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); + dataBuf.position(0); + dataBuf.limit(0); + } + } + + private synchronized void freeDataBufIfExists() { + if (dataBuf != null) { + // When disposing of a dataBuf, we have to move our stored file index + // backwards. + dataPos -= dataBuf.remaining(); + dataBuf.clear(); + bufferPool.returnBuffer(dataBuf); + dataBuf = null; + } + } + + private synchronized void createChecksumBufIfNeeded() { + if (checksumBuf == null) { + checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); + checksumBuf.position(0); + checksumBuf.limit(0); + } + } + + private synchronized void freeChecksumBufIfExists() { + if (checksumBuf != null) { + checksumBuf.clear(); + bufferPool.returnBuffer(checksumBuf); + checksumBuf = null; + } + } + + private synchronized int drainDataBuf(ByteBuffer buf) { + if (dataBuf == null) return -1; + int oldLimit = dataBuf.limit(); + int nRead = Math.min(dataBuf.remaining(), buf.remaining()); + if (nRead == 0) { + return (dataBuf.remaining() == 0) ? -1 : 0; + } + try { + dataBuf.limit(dataBuf.position() + nRead); + buf.put(dataBuf); + } finally { + dataBuf.limit(oldLimit); + } + return nRead; + } + + /** + * Read from the block file into a buffer. + * + * This function overwrites checksumBuf. It will increment dataPos. + * + * @param buf The buffer to read into. May be dataBuf. + * The position and limit of this buffer should be set to + * multiples of the checksum size. + * @param canSkipChecksum True if we can skip checksumming. + * + * @return Total bytes read. 0 on EOF. + */ + private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) + throws IOException { + TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" + + block.getBlockId() + ")", Sampler.NEVER); + try { + int total = 0; + long startDataPos = dataPos; + int startBufPos = buf.position(); + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead < 0) { + break; + } + dataPos += nRead; + total += nRead; + } + if (canSkipChecksum) { + freeChecksumBufIfExists(); + return total; + } + if (total > 0) { + try { + buf.limit(buf.position()); + buf.position(startBufPos); + createChecksumBufIfNeeded(); + int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuf.clear(); + checksumBuf.limit(checksumsNeeded * checksumSize); + long checksumPos = BlockMetadataHeader.getHeaderSize() + + ((startDataPos / bytesPerChecksum) * checksumSize); + while (checksumBuf.hasRemaining()) { + int nRead = checksumIn.read(checksumBuf, checksumPos); + if (nRead < 0) { + throw new IOException("Got unexpected checksum file EOF at " + + checksumPos + ", block file position " + startDataPos + " for " + + "block " + block + " of file " + filename); + } + checksumPos += nRead; + } + checksumBuf.flip(); + + checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); + } finally { + buf.position(buf.limit()); + } + } + return total; + } finally { + scope.close(); + } + } + + private boolean createNoChecksumContext() { + if (verifyChecksum) { + if (storageType != null && storageType.isTransient()) { + // Checksums are not stored for replicas on transient storage. We do not + // anchor, because we do not intend for client activity to block eviction + // from transient storage on the DataNode side. + return true; + } else { + return replica.addNoChecksumAnchor(); + } + } else { + return true; + } + } + + private void releaseNoChecksumContext() { + if (verifyChecksum) { + if (storageType == null || !storageType.isTransient()) { + replica.removeNoChecksumAnchor(); + } + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + try { + String traceString = null; + if (LOG.isTraceEnabled()) { + traceString = new StringBuilder(). + append("read("). + append("buf.remaining=").append(buf.remaining()). + append(", block=").append(block). + append(", filename=").append(filename). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.info(traceString + ": starting"); + } + int nRead; + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(buf); + } else { + nRead = readWithBounceBuffer(buf, canSkipChecksum); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.info(traceString + ": I/O error", e); + } + throw e; + } + if (LOG.isTraceEnabled()) { + LOG.info(traceString + ": returning " + nRead); + } + return nRead; + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + } + + private synchronized int readWithoutBounceBuffer(ByteBuffer buf) + throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int total = 0; + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead <= 0) break; + dataPos += nRead; + total += nRead; + } + return (total == 0 && (dataPos == dataIn.size())) ? -1 : total; + } + + /** + * Fill the data buffer. If necessary, validate the data against the + * checksums. + * + * We always want the offsets of the data contained in dataBuf to be + * aligned to the chunk boundary. If we are validating checksums, we + * accomplish this by seeking backwards in the file until we're on a + * chunk boundary. (This is necessary because we can't checksum a + * partial chunk.) If we are not validating checksums, we simply only + * fill the latter part of dataBuf. + * + * @param canSkipChecksum true if we can skip checksumming. + * @return true if we hit EOF. + * @throws IOException + */ + private synchronized boolean fillDataBuf(boolean canSkipChecksum) + throws IOException { + createDataBufIfNeeded(); + final int slop = (int)(dataPos % bytesPerChecksum); + final long oldDataPos = dataPos; + dataBuf.limit(maxReadaheadLength); + if (canSkipChecksum) { + dataBuf.position(slop); + fillBuffer(dataBuf, canSkipChecksum); + } else { + dataPos -= slop; + dataBuf.position(0); + fillBuffer(dataBuf, canSkipChecksum); + } + dataBuf.limit(dataBuf.position()); + dataBuf.position(Math.min(dataBuf.position(), slop)); + if (LOG.isTraceEnabled()) { + LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " + + "buffer from offset " + oldDataPos + " of " + block); + } + return dataBuf.limit() != maxReadaheadLength; + } + + /** + * Read using the bounce buffer. + * + * A 'direct' read actually has three phases. The first drains any + * remaining bytes from the slow read buffer. After this the read is + * guaranteed to be on a checksum chunk boundary. If there are still bytes + * to read, the fast direct path is used for as many remaining bytes as + * possible, up to a multiple of the checksum chunk size. Finally, any + * 'odd' bytes remaining at the end of the read cause another slow read to + * be issued, which involves an extra copy. + * + * Every 'slow' read tries to fill the slow read buffer in one go for + * efficiency's sake. As described above, all non-checksum-chunk-aligned + * reads will be served from the slower read path. + * + * @param buf The buffer to read into. + * @param canSkipChecksum True if we can skip checksums. + */ + private synchronized int readWithBounceBuffer(ByteBuffer buf, + boolean canSkipChecksum) throws IOException { + int total = 0; + int bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + total += bb; + if (buf.remaining() == 0) return total; + } + boolean eof = true, done = false; + do { + if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) + && ((dataPos % bytesPerChecksum) == 0)) { + // Fast lane: try to read directly into user-supplied buffer, bypassing + // bounce buffer. + int oldLimit = buf.limit(); + int nRead; + try { + buf.limit(buf.position() + maxReadaheadLength); + nRead = fillBuffer(buf, canSkipChecksum); + } finally { + buf.limit(oldLimit); + } + if (nRead < maxReadaheadLength) { + done = true; + } + if (nRead > 0) { + eof = false; + } + total += nRead; + } else { + // Slow lane: refill bounce buffer. + if (fillDataBuf(canSkipChecksum)) { + done = true; + } + bb = drainDataBuf(buf); // drain bounce buffer if possible + if (bb >= 0) { + eof = false; + total += bb; + } + } + } while ((!done) && (buf.remaining() > 0)); + return (eof && total == 0) ? -1 : total; + } + + @Override + public synchronized int read(byte[] arr, int off, int len) + throws IOException { + boolean canSkipChecksum = createNoChecksumContext(); + int nRead; + try { + String traceString = null; + if (LOG.isTraceEnabled()) { + traceString = new StringBuilder(). + append("read(arr.length=").append(arr.length). + append(", off=").append(off). + append(", len=").append(len). + append(", filename=").append(filename). + append(", block=").append(block). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.trace(traceString + ": starting"); + } + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(arr, off, len); + } else { + nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace(traceString + ": I/O error", e); + } + throw e; + } + if (LOG.isTraceEnabled()) { + LOG.trace(traceString + ": returning " + nRead); + } + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); + } + return nRead; + } + + private synchronized int readWithoutBounceBuffer(byte arr[], int off, + int len) throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); + if (nRead > 0) { + dataPos += nRead; + } else if ((nRead == 0) && (dataPos == dataIn.size())) { + return -1; + } + return nRead; + } + + private synchronized int readWithBounceBuffer(byte arr[], int off, int len, + boolean canSkipChecksum) throws IOException { + createDataBufIfNeeded(); + if (!dataBuf.hasRemaining()) { + dataBuf.position(0); + dataBuf.limit(maxReadaheadLength); + fillDataBuf(canSkipChecksum); + } + if (dataBuf.remaining() == 0) return -1; + int toRead = Math.min(dataBuf.remaining(), len); + dataBuf.get(arr, off, toRead); + return toRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + int discardedFromBuf = 0; + long remaining = n; + if ((dataBuf != null) && dataBuf.hasRemaining()) { + discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); + dataBuf.position(dataBuf.position() + discardedFromBuf); + remaining -= discardedFromBuf; + } + if (LOG.isTraceEnabled()) { + LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + + filename + "): discarded " + discardedFromBuf + " bytes from " + + "dataBuf and advanced dataPos by " + remaining); + } + dataPos += remaining; + return n; + } + + @Override + public int available() throws IOException { + // We never do network I/O in BlockReaderLocal. + return Integer.MAX_VALUE; + } + + @Override + public synchronized void close() throws IOException { + if (closed) return; + closed = true; + if (LOG.isTraceEnabled()) { + LOG.trace("close(filename=" + filename + ", block=" + block + ")"); + } + replica.unref(); + freeDataBufIfExists(); + freeChecksumBufIfExists(); + } + + @Override + public synchronized void readFully(byte[] arr, int off, int len) + throws IOException { + BlockReaderUtil.readFully(this, arr, off, len); + } + + @Override + public synchronized int readAll(byte[] buf, int off, int len) + throws IOException { + return BlockReaderUtil.readAll(this, buf, off, len); + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + /** + * Get or create a memory map for this replica. + * + * There are two kinds of ClientMmap objects we could fetch here: one that + * will always read pre-checksummed data, and one that may read data that + * hasn't been checksummed. + * + * If we fetch the former, "safe" kind of ClientMmap, we have to increment + * the anchor count on the shared memory slot. This will tell the DataNode + * not to munlock the block until this ClientMmap is closed. + * If we fetch the latter, we don't bother with anchoring. + * + * @param opts The options to use, such as SKIP_CHECKSUMS. + * + * @return null on failure; the ClientMmap otherwise. + */ + @Override + public ClientMmap getClientMmap(EnumSet opts) { + boolean anchor = verifyChecksum && + (opts.contains(ReadOption.SKIP_CHECKSUMS) == false); + if (anchor) { + if (!createNoChecksumContext()) { + if (LOG.isTraceEnabled()) { + LOG.trace("can't get an mmap for " + block + " of " + filename + + " since SKIP_CHECKSUMS was not given, " + + "we aren't skipping checksums, and the block is not mlocked."); + } + return null; + } + } + ClientMmap clientMmap = null; + try { + clientMmap = replica.getOrCreateClientMmap(anchor); + } finally { + if ((clientMmap == null) && anchor) { + releaseNoChecksumContext(); + } + } + return clientMmap; + } + + @VisibleForTesting + boolean getVerifyChecksum() { + return this.verifyChecksum; + } + + @VisibleForTesting + int getMaxReadaheadLength() { + return this.maxReadaheadLength; + } + + /** + * Make the replica anchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceAnchorable() { + replica.getSlot().makeAnchorable(); + } + + /** + * Make the replica unanchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceUnanchorable() { + replica.getSlot().makeUnanchorable(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java new file mode 100644 index 0000000..eea3f06 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -0,0 +1,738 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.hdfs.util.IOUtilsClient; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DirectBufferPool; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on + * the same machine as the datanode, then the client can read files directly + * from the local file system rather than going through the datanode for better + * performance.
+ * + * This is the legacy implementation based on HDFS-2246, which requires + * permissions on the datanode to be set so that clients can directly access the + * blocks. The new implementation based on HDFS-347 should be preferred on UNIX + * systems where the required native code has been implemented.
+ * + * {@link BlockReaderLocalLegacy} works as follows: + *
    + *
  • The client performing short circuit reads must be configured at the + * datanode.
  • + *
  • The client gets the path to the file where block is stored using + * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)} + * RPC call
  • + *
  • Client uses kerberos authentication to connect to the datanode over RPC, + * if security is enabled.
  • + *
+ */ +@InterfaceAudience.Private +class BlockReaderLocalLegacy implements BlockReader { + private static final Logger LOG = LoggerFactory.getLogger( + BlockReaderLocalLegacy.class); + + //Stores the cache and proxy for a local datanode. + private static class LocalDatanodeInfo { + private ClientDatanodeProtocol proxy = null; + private final Map cache; + + LocalDatanodeInfo() { + final int cacheSize = 10000; + final float hashTableLoadFactor = 0.75f; + int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1; + cache = Collections + .synchronizedMap(new LinkedHashMap( + hashTableCapacity, hashTableLoadFactor, true) { + private static final long serialVersionUID = 1; + + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > cacheSize; + } + }); + } + + private synchronized ClientDatanodeProtocol getDatanodeProxy( + UserGroupInformation ugi, final DatanodeInfo node, + final Configuration conf, final int socketTimeout, + final boolean connectToDnViaHostname) throws IOException { + if (proxy == null) { + try { + proxy = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public ClientDatanodeProtocol run() throws Exception { + return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf, + socketTimeout, connectToDnViaHostname); + } + }); + } catch (InterruptedException e) { + LOG.warn("encountered exception ", e); + } + } + return proxy; + } + + private synchronized void resetDatanodeProxy() { + if (null != proxy) { + RPC.stopProxy(proxy); + proxy = null; + } + } + + private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { + return cache.get(b); + } + + private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) { + cache.put(b, info); + } + + private void removeBlockLocalPathInfo(ExtendedBlock b) { + cache.remove(b); + } + } + + // Multiple datanodes could be running on the local machine. Store proxies in + // a map keyed by the ipc port of the datanode. + private static final Map localDatanodeInfoMap = new HashMap(); + + private final FileInputStream dataIn; // reader for the data file + private final FileInputStream checksumIn; // reader for the checksum file + + /** + * Offset from the most recent chunk boundary at which the next read should + * take place. Is only set to non-zero at construction time, and is + * decremented (usually to 0) by subsequent reads. This avoids having to do a + * checksum read at construction to position the read cursor correctly. + */ + private int offsetFromChunkBoundary; + + private byte[] skipBuf = null; + + /** + * Used for checksummed reads that need to be staged before copying to their + * output buffer because they are either a) smaller than the checksum chunk + * size or b) issued by the slower read(byte[]...) path + */ + private ByteBuffer slowReadBuff = null; + private ByteBuffer checksumBuff = null; + private DataChecksum checksum; + private final boolean verifyChecksum; + + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + + private final int bytesPerChecksum; + private final int checksumSize; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; + private long blockId; + + /** + * The only way this object can be instantiated. + */ + static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf, + UserGroupInformation userGroupInformation, + Configuration configuration, String file, ExtendedBlock blk, + Token token, DatanodeInfo node, + long startOffset, long length, StorageType storageType) + throws IOException { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node + .getIpcPort()); + // check the cache first + BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); + if (pathinfo == null) { + if (userGroupInformation == null) { + userGroupInformation = UserGroupInformation.getCurrentUser(); + } + pathinfo = getBlockPathInfo(userGroupInformation, blk, node, + configuration, conf.getSocketTimeout(), token, + conf.isConnectToDnViaHostname(), storageType); + } + + // check to see if the file exists. It may so happen that the + // HDFS file has been deleted and this block-lookup is occurring + // on behalf of a new HDFS file. This time, the block file could + // be residing in a different portion of the fs.data.dir directory. + // In this case, we remove this entry from the cache. The next + // call to this method will re-populate the cache. + FileInputStream dataIn = null; + FileInputStream checksumIn = null; + BlockReaderLocalLegacy localBlockReader = null; + final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums() + || storageType.isTransient(); + try { + // get a local file system + File blkfile = new File(pathinfo.getBlockPath()); + dataIn = new FileInputStream(blkfile); + + if (LOG.isDebugEnabled()) { + LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size " + + blkfile.length() + " startOffset " + startOffset + " length " + + length + " short circuit checksum " + !skipChecksumCheck); + } + + if (!skipChecksumCheck) { + // get the metadata file + File metafile = new File(pathinfo.getMetaPath()); + checksumIn = new FileInputStream(metafile); + + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + new DataInputStream(checksumIn), blk); + long firstChunkOffset = startOffset + - (startOffset % checksum.getBytesPerChecksum()); + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, + startOffset, length, pathinfo, checksum, true, dataIn, + firstChunkOffset, checksumIn); + } else { + localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token, + startOffset, length, pathinfo, dataIn); + } + } catch (IOException e) { + // remove from cache + localDatanodeInfo.removeBlockLocalPathInfo(blk); + LOG.warn("BlockReaderLocalLegacy: Removing " + blk + + " from cache because local file " + pathinfo.getBlockPath() + + " could not be opened."); + throw e; + } finally { + if (localBlockReader == null) { + if (dataIn != null) { + dataIn.close(); + } + if (checksumIn != null) { + checksumIn.close(); + } + } + } + return localBlockReader; + } + + private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) { + LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port); + if (ldInfo == null) { + ldInfo = new LocalDatanodeInfo(); + localDatanodeInfoMap.put(port, ldInfo); + } + return ldInfo; + } + + private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi, + ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout, + Token token, boolean connectToDnViaHostname, + StorageType storageType) throws IOException { + LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); + BlockLocalPathInfo pathinfo = null; + ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node, + conf, timeout, connectToDnViaHostname); + try { + // make RPC to local datanode to find local pathnames of blocks + pathinfo = proxy.getBlockLocalPathInfo(blk, token); + // We cannot cache the path information for a replica on transient storage. + // If the replica gets evicted, then it moves to a different path. Then, + // our next attempt to read from the cached path would fail to find the + // file. Additionally, the failure would cause us to disable legacy + // short-circuit read for all subsequent use in the ClientContext. Unlike + // the newer short-circuit read implementation, we have no communication + // channel for the DataNode to notify the client that the path has been + // invalidated. Therefore, our only option is to skip caching. + if (pathinfo != null && !storageType.isTransient()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cached location of block " + blk + " as " + pathinfo); + } + localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); + } + } catch (IOException e) { + localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error + throw e; + } + return pathinfo; + } + + private static int getSlowReadBufferNumChunks(int bufferSizeBytes, + int bytesPerChecksum) { + if (bufferSizeBytes < bytesPerChecksum) { + throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " + + "buffer size (" + bufferSizeBytes + ") is not large enough to hold " + + "a single chunk (" + bytesPerChecksum + "). Please configure " + + HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY + + " appropriately"); + } + + // Round down to nearest chunk size + return bufferSizeBytes / bytesPerChecksum; + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, Token token, long startOffset, + long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) + throws IOException { + this(conf, hdfsfile, block, token, startOffset, length, pathinfo, + DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false, + dataIn, startOffset, null); + } + + private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile, + ExtendedBlock block, Token token, long startOffset, + long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, + boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, + FileInputStream checksumIn) throws IOException { + this.filename = hdfsfile; + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max(startOffset, 0); + this.blockId = block.getBlockId(); + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + + this.dataIn = dataIn; + this.checksumIn = checksumIn; + this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); + + final int chunksPerChecksumRead = getSlowReadBufferNumChunks( + conf.getShortCircuitBufferSize(), bytesPerChecksum); + slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); + checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); + // Initially the buffers have nothing to read. + slowReadBuff.flip(); + checksumBuff.flip(); + boolean success = false; + try { + // Skip both input streams to beginning of the chunk containing startOffset + IOUtils.skipFully(dataIn, firstChunkOffset); + if (checksumIn != null) { + long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + } + success = true; + } finally { + if (!success) { + bufferPool.returnBuffer(slowReadBuff); + bufferPool.returnBuffer(checksumBuff); + } + } + } + + /** + * Reads bytes into a buffer until EOF or the buffer's limit is reached + */ + private int fillBuffer(FileInputStream stream, ByteBuffer buf) + throws IOException { + TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" + + blockId + ")", Sampler.NEVER); + try { + int bytesRead = stream.getChannel().read(buf); + if (bytesRead < 0) { + //EOF + return bytesRead; + } + while (buf.remaining() > 0) { + int n = stream.getChannel().read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } finally { + scope.close(); + } + } + + /** + * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into + * another. + */ + private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { + int oldLimit = from.limit(); + from.limit(from.position() + length); + try { + to.put(from); + } finally { + from.limit(oldLimit); + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int nRead = 0; + if (verifyChecksum) { + // A 'direct' read actually has three phases. The first drains any + // remaining bytes from the slow read buffer. After this the read is + // guaranteed to be on a checksum chunk boundary. If there are still bytes + // to read, the fast direct path is used for as many remaining bytes as + // possible, up to a multiple of the checksum chunk size. Finally, any + // 'odd' bytes remaining at the end of the read cause another slow read to + // be issued, which involves an extra copy. + + // Every 'slow' read tries to fill the slow read buffer in one go for + // efficiency's sake. As described above, all non-checksum-chunk-aligned + // reads will be served from the slower read path. + + if (slowReadBuff.hasRemaining()) { + // There are remaining bytes from a small read available. This usually + // means this read is unaligned, which falls back to the slow path. + int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + + if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { + // Since we have drained the 'small read' buffer, we are guaranteed to + // be chunk-aligned + int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); + + // There's only enough checksum buffer space available to checksum one + // entire slow read buffer. This saves keeping the number of checksum + // chunks around. + len = Math.min(len, slowReadBuff.capacity()); + int oldlimit = buf.limit(); + buf.limit(buf.position() + len); + int readResult = 0; + try { + readResult = doByteBufferRead(buf); + } finally { + buf.limit(oldlimit); + } + if (readResult == -1) { + return nRead; + } else { + nRead += readResult; + buf.position(buf.position() + readResult); + } + } + + // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read + // until chunk boundary + if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) { + int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary); + int readResult = fillSlowReadBuffer(toRead); + if (readResult == -1) { + return nRead; + } else { + int fromSlowReadBuff = Math.min(readResult, buf.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + } + } else { + // Non-checksummed reads are much easier; we can just fill the buffer directly. + nRead = doByteBufferRead(buf); + if (nRead > 0) { + buf.position(buf.position() + nRead); + } + } + return nRead; + } + + /** + * Tries to read as many bytes as possible into supplied buffer, checksumming + * each chunk if needed. + * + * Preconditions: + *
    + *
  • + * If checksumming is enabled, buf.remaining must be a multiple of + * bytesPerChecksum. Note that this is not a requirement for clients of + * read(ByteBuffer) - in the case of non-checksum-sized read requests, + * read(ByteBuffer) will substitute a suitably sized buffer to pass to this + * method. + *
  • + *
+ * Postconditions: + *
    + *
  • buf.limit and buf.mark are unchanged.
  • + *
  • buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the + * requested bytes can be read straight from the buffer
  • + *
+ * + * @param buf + * byte buffer to write bytes to. If checksums are not required, buf + * can have any number of bytes remaining, otherwise there must be a + * multiple of the checksum chunk size remaining. + * @return max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0) + * that is, the the number of useful bytes (up to the amount + * requested) readable from the buffer by the client. + */ + private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { + if (verifyChecksum) { + assert buf.remaining() % bytesPerChecksum == 0; + } + int dataRead = -1; + + int oldpos = buf.position(); + // Read as much as we can into the buffer. + dataRead = fillBuffer(dataIn, buf); + + if (dataRead == -1) { + return -1; + } + + if (verifyChecksum) { + ByteBuffer toChecksum = buf.duplicate(); + toChecksum.position(oldpos); + toChecksum.limit(oldpos + dataRead); + + checksumBuff.clear(); + // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); + int numChunks = + (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuff.limit(checksumSize * numChunks); + + fillBuffer(checksumIn, checksumBuff); + checksumBuff.flip(); + + checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, + this.startOffset); + } + + if (dataRead >= 0) { + buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); + } + + if (dataRead < offsetFromChunkBoundary) { + // yikes, didn't even get enough bytes to honour offset. This can happen + // even if we are verifying checksums if we are at EOF. + offsetFromChunkBoundary -= dataRead; + dataRead = 0; + } else { + dataRead -= offsetFromChunkBoundary; + offsetFromChunkBoundary = 0; + } + + return dataRead; + } + + /** + * Ensures that up to len bytes are available and checksummed in the slow read + * buffer. The number of bytes available to read is returned. If the buffer is + * not already empty, the number of remaining bytes is returned and no actual + * read happens. + * + * @param len + * the maximum number of bytes to make available. After len bytes + * are read, the underlying bytestream must be at a checksum + * boundary, or EOF. That is, (len + currentPosition) % + * bytesPerChecksum == 0. + * @return the number of bytes available to read, or -1 if EOF. + */ + private synchronized int fillSlowReadBuffer(int len) throws IOException { + int nRead = -1; + if (slowReadBuff.hasRemaining()) { + // Already got data, good to go. + nRead = Math.min(len, slowReadBuff.remaining()); + } else { + // Round a complete read of len bytes (plus any implicit offset) to the + // next chunk boundary, since we try and read in multiples of a chunk + int nextChunk = len + offsetFromChunkBoundary + + (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum)); + int limit = Math.min(nextChunk, slowReadBuff.capacity()); + assert limit % bytesPerChecksum == 0; + + slowReadBuff.clear(); + slowReadBuff.limit(limit); + + nRead = doByteBufferRead(slowReadBuff); + + if (nRead > 0) { + // So that next time we call slowReadBuff.hasRemaining(), we don't get a + // false positive. + slowReadBuff.limit(nRead + slowReadBuff.position()); + } + } + return nRead; + } + + @Override + public synchronized int read(byte[] buf, int off, int len) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read off " + off + " len " + len); + } + if (!verifyChecksum) { + return dataIn.read(buf, off, len); + } + + int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); + + if (nRead > 0) { + // Possible that buffer is filled with a larger read than we need, since + // we tried to read as much as possible at once + nRead = Math.min(len, nRead); + slowReadBuff.get(buf, off, nRead); + } + + return nRead; + } + + @Override + public synchronized long skip(long n) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("skip " + n); + } + if (n <= 0) { + return 0; + } + if (!verifyChecksum) { + return dataIn.skip(n); + } + + // caller made sure newPosition is not beyond EOF. + int remaining = slowReadBuff.remaining(); + int position = slowReadBuff.position(); + int newPosition = position + (int)n; + + // if the new offset is already read into dataBuff, just reposition + if (n <= remaining) { + assert offsetFromChunkBoundary == 0; + slowReadBuff.position(newPosition); + return n; + } + + // for small gap, read through to keep the data/checksum in sync + if (n - remaining <= bytesPerChecksum) { + slowReadBuff.position(position + remaining); + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + int ret = read(skipBuf, 0, (int)(n - remaining)); + return (remaining + ret); + } + + // optimize for big gap: discard the current buffer, skip to + // the beginning of the appropriate checksum chunk and then + // read to the middle of that chunk to be in sync with checksums. + + // We can't use this.offsetFromChunkBoundary because we need to know how + // many bytes of the offset were really read. Calling read(..) with a + // positive this.offsetFromChunkBoundary causes that many bytes to get + // silently skipped. + int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; + long toskip = n - remaining - myOffsetFromChunkBoundary; + + slowReadBuff.position(slowReadBuff.limit()); + checksumBuff.position(checksumBuff.limit()); + + IOUtils.skipFully(dataIn, toskip); + long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize; + IOUtils.skipFully(checksumIn, checkSumOffset); + + // read into the middle of the chunk + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + assert skipBuf.length == bytesPerChecksum; + assert myOffsetFromChunkBoundary < bytesPerChecksum; + + int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); + + if (ret == -1) { // EOS + return (toskip + remaining); + } else { + return (toskip + remaining + ret); + } + } + + @Override + public synchronized void close() throws IOException { + IOUtilsClient.cleanup(LOG, dataIn, checksumIn); + if (slowReadBuff != null) { + bufferPool.returnBuffer(slowReadBuff); + slowReadBuff = null; + } + if (checksumBuff != null) { + bufferPool.returnBuffer(checksumBuff); + checksumBuff = null; + } + startOffset = -1; + checksum = null; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + @Override + public int available() throws IOException { + // We never do network I/O in BlockReaderLocalLegacy. + return Integer.MAX_VALUE; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + @Override + public ClientMmap getClientMmap(EnumSet opts) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java new file mode 100644 index 0000000..dbc528e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * For sharing between the local and remote block reader implementations. + */ +@InterfaceAudience.Private +class BlockReaderUtil { + + /* See {@link BlockReader#readAll(byte[], int, int)} */ + public static int readAll(BlockReader reader, + byte[] buf, int offset, int len) throws IOException { + int n = 0; + for (;;) { + int nread = reader.read(buf, offset + n, len - n); + if (nread <= 0) + return (n == 0) ? nread : n; + n += nread; + if (n >= len) + return n; + } + } + + /* See {@link BlockReader#readFully(byte[], int, int)} */ + public static void readFully(BlockReader reader, + byte[] buf, int off, int len) throws IOException { + int toRead = len; + while (toRead > 0) { + int ret = reader.read(buf, off, toRead); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream"); + } + toRead -= ret; + off += ret; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java new file mode 100644 index 0000000..3836979 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.util.HashMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.util.ByteArrayManager; + +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ClientContext contains context information for a client. + * + * This allows us to share caches such as the socket cache across + * DFSClient instances. + */ +@InterfaceAudience.Private +public class ClientContext { + private static final Logger LOG = LoggerFactory.getLogger(ClientContext.class); + + /** + * Global map of context names to caches contexts. + */ + private final static HashMap CACHES = + new HashMap(); + + /** + * Name of context. + */ + private final String name; + + /** + * String representation of the configuration. + */ + private final String confString; + + /** + * Caches short-circuit file descriptors, mmap regions. + */ + private final ShortCircuitCache shortCircuitCache; + + /** + * Caches TCP and UNIX domain sockets for reuse. + */ + private final PeerCache peerCache; + + /** + * Stores information about socket paths. + */ + private final DomainSocketFactory domainSocketFactory; + + /** + * Caches key Providers for the DFSClient + */ + private final KeyProviderCache keyProviderCache; + /** + * True if we should use the legacy BlockReaderLocal. + */ + private final boolean useLegacyBlockReaderLocal; + + /** + * True if the legacy BlockReaderLocal is disabled. + * + * The legacy block reader local gets disabled completely whenever there is an + * error or miscommunication. The new block reader local code handles this + * case more gracefully inside DomainSocketFactory. + */ + private volatile boolean disableLegacyBlockReaderLocal = false; + + /** Creating byte[] for {@link DFSOutputStream}. */ + private final ByteArrayManager byteArrayManager; + + /** + * Whether or not we complained about a DFSClient fetching a CacheContext that + * didn't match its config values yet. + */ + private boolean printedConfWarning = false; + + private ClientContext(String name, DfsClientConf conf) { + final ShortCircuitConf scConf = conf.getShortCircuitConf(); + + this.name = name; + this.confString = scConf.confAsString(); + this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); + this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), + scConf.getSocketCacheExpiry()); + this.keyProviderCache = new KeyProviderCache( + scConf.getKeyProviderCacheExpiryMs()); + this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal(); + this.domainSocketFactory = new DomainSocketFactory(scConf); + + this.byteArrayManager = ByteArrayManager.newInstance( + conf.getWriteByteArrayManagerConf()); + } + + public static ClientContext get(String name, DfsClientConf conf) { + ClientContext context; + synchronized(ClientContext.class) { + context = CACHES.get(name); + if (context == null) { + context = new ClientContext(name, conf); + CACHES.put(name, context); + } else { + context.printConfWarningIfNeeded(conf); + } + } + return context; + } + + /** + * Get a client context, from a Configuration object. + * + * This method is less efficient than the version which takes a DFSClient#Conf + * object, and should be mostly used by tests. + */ + @VisibleForTesting + public static ClientContext getFromConf(Configuration conf) { + return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, + HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), + new DfsClientConf(conf)); + } + + private void printConfWarningIfNeeded(DfsClientConf conf) { + String existing = this.getConfString(); + String requested = conf.getShortCircuitConf().confAsString(); + if (!existing.equals(requested)) { + if (!printedConfWarning) { + printedConfWarning = true; + LOG.warn("Existing client context '" + name + "' does not match " + + "requested configuration. Existing: " + existing + + ", Requested: " + requested); + } + } + } + + public String getConfString() { + return confString; + } + + public ShortCircuitCache getShortCircuitCache() { + return shortCircuitCache; + } + + public PeerCache getPeerCache() { + return peerCache; + } + + public KeyProviderCache getKeyProviderCache() { + return keyProviderCache; + } + + public boolean getUseLegacyBlockReaderLocal() { + return useLegacyBlockReaderLocal; + } + + public boolean getDisableLegacyBlockReaderLocal() { + return disableLegacyBlockReaderLocal; + } + + public void setDisableLegacyBlockReaderLocal() { + disableLegacyBlockReaderLocal = true; + } + + public DomainSocketFactory getDomainSocketFactory() { + return domainSocketFactory; + } + + public ByteArrayManager getByteArrayManager() { + return byteArrayManager; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 3d0acb0..a89f556 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -22,22 +22,32 @@ import com.google.common.collect.Maps; import com.google.common.primitives.SignedBytes; import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.SocketFactory; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Collections; @@ -455,4 +465,62 @@ public class DFSUtilClient { localAddrMap.put(addr.getHostAddress(), local); return local; } + + /** Create a {@link ClientDatanodeProtocol} proxy */ + public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout, + boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { + return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, + connectToDnViaHostname, locatedBlock); + } + + /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ + public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout, + boolean connectToDnViaHostname) throws IOException { + return new ClientDatanodeProtocolTranslatorPB( + datanodeid, conf, socketTimeout, connectToDnViaHostname); + } + + /** Create a {@link ClientDatanodeProtocol} proxy */ + public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory) throws IOException { + return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory); + } + + /** + * Creates a new KeyProvider from the given Configuration. + * + * @param conf Configuration + * @return new KeyProvider, or null if no provider was found. + * @throws IOException if the KeyProvider is improperly specified in + * the Configuration + */ + public static KeyProvider createKeyProvider( + final Configuration conf) throws IOException { + final String providerUriStr = + conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, ""); + // No provider set in conf + if (providerUriStr.isEmpty()) { + return null; + } + final URI providerUri; + try { + providerUri = new URI(providerUriStr); + } catch (URISyntaxException e) { + throw new IOException(e); + } + KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf); + if (keyProvider == null) { + throw new IOException("Could not instantiate KeyProvider from " + + HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '" + + providerUriStr + "'"); + } + if (keyProvider.isTransient()) { + throw new IOException("KeyProvider " + keyProvider.toString() + + " was found but it is a transient provider."); + } + return keyProvider; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java new file mode 100644 index 0000000..e135d8e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + +/** + * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from + * replicas. + */ +@InterfaceAudience.Private +public final class ExternalBlockReader implements BlockReader { + private final ReplicaAccessor accessor; + private final long visibleLength; + private long pos; + + ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, + long startOffset) { + this.accessor = accessor; + this.visibleLength = visibleLength; + this.pos = startOffset; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int nread = accessor.read(pos, buf, off, len); + pos += nread; + return nread; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int nread = accessor.read(pos, buf); + pos += nread; + return nread; + } + + @Override + public long skip(long n) throws IOException { + // You cannot skip backwards + if (n <= 0) { + return 0; + } + // You can't skip past the end of the replica. + long oldPos = pos; + pos += n; + if (pos > visibleLength) { + pos = visibleLength; + } + return pos - oldPos; + } + + @Override + public int available() throws IOException { + // We return the amount of bytes that we haven't read yet from the + // replica, based on our current position. Some of the other block + // readers return a shorter length than that. The only advantage to + // returning a shorter length is that the DFSInputStream will + // trash your block reader and create a new one if someone tries to + // seek() beyond the available() region. + long diff = visibleLength - pos; + if (diff > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int)diff; + } + } + + @Override + public void close() throws IOException { + accessor.close(); + } + + @Override + public void readFully(byte[] buf, int offset, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, offset, len); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public boolean isLocal() { + return accessor.isLocal(); + } + + @Override + public boolean isShortCircuit() { + return accessor.isShortCircuit(); + } + + @Override + public ClientMmap getClientMmap(EnumSet opts) { + // For now, pluggable ReplicaAccessors do not support zero-copy. + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java new file mode 100644 index 0000000..05492e0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class KeyProviderCache { + + public static final Logger LOG = LoggerFactory.getLogger(KeyProviderCache.class); + + private final Cache cache; + + public KeyProviderCache(long expiryMs) { + cache = CacheBuilder.newBuilder() + .expireAfterAccess(expiryMs, TimeUnit.MILLISECONDS) + .removalListener(new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification notification) { + try { + notification.getValue().close(); + } catch (Throwable e) { + LOG.error( + "Error closing KeyProvider with uri [" + + notification.getKey() + "]", e); + ; + } + } + }) + .build(); + } + + public KeyProvider get(final Configuration conf) { + URI kpURI = createKeyProviderURI(conf); + if (kpURI == null) { + return null; + } + try { + return cache.get(kpURI, new Callable() { + @Override + public KeyProvider call() throws Exception { + return DFSUtilClient.createKeyProvider(conf); + } + }); + } catch (Exception e) { + LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause()); + return null; + } + } + + private URI createKeyProviderURI(Configuration conf) { + final String providerUriStr = + conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, ""); + // No provider set in conf + if (providerUriStr.isEmpty()) { + LOG.error("Could not find uri with key [" + + HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + + "] to create a keyProvider !!"); + return null; + } + final URI providerUri; + try { + providerUri = new URI(providerUriStr); + } catch (URISyntaxException e) { + LOG.error("KeyProvider URI string is invalid [" + providerUriStr + + "]!!", e.getCause()); + return null; + } + return providerUri; + } + + @VisibleForTesting + public void setKeyProvider(Configuration conf, KeyProvider keyProvider) + throws IOException { + URI uri = createKeyProviderURI(conf); + cache.put(uri, keyProvider); + } +}