hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject [5/6] hadoop git commit: HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Date Wed, 25 May 2016 19:26:57 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
deleted file mode 100644
index 85f925f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.io.IOException;
-
-/**
- * For sharing between the local and remote block reader implementations.
- */
-@InterfaceAudience.Private
-class BlockReaderUtil {
-
-  /* See {@link BlockReader#readAll(byte[], int, int)} */
-  public static int readAll(BlockReader reader,
-      byte[] buf, int offset, int len) throws IOException {
-    int n = 0;
-    for (;;) {
-      int nread = reader.read(buf, offset + n, len - n);
-      if (nread <= 0)
-        return (n == 0) ? nread : n;
-      n += nread;
-      if (n >= len)
-        return n;
-    }
-  }
-
-  /* See {@link BlockReader#readFully(byte[], int, int)} */
-  public static void readFully(BlockReader reader,
-      byte[] buf, int off, int len) throws IOException {
-    int toRead = len;
-    while (toRead > 0) {
-      int ret = reader.read(buf, off, toRead);
-      if (ret < 0) {
-        throw new IOException("Premature EOF from inputStream");
-      }
-      toRead -= ret;
-      off += ret;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6bef2bf..23cae0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
deleted file mode 100644
index fae2cc0..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-
-/**
- * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
- * replicas.
- */
-@InterfaceAudience.Private
-public final class ExternalBlockReader implements BlockReader {
-  private final ReplicaAccessor accessor;
-  private final long visibleLength;
-  private long pos;
-
-  ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
-                      long startOffset) {
-    this.accessor = accessor;
-    this.visibleLength = visibleLength;
-    this.pos = startOffset;
-  }
-
-  @Override
-  public int read(byte[] buf, int off, int len) throws IOException {
-    int nread = accessor.read(pos, buf, off, len);
-    if (nread < 0) {
-      return nread;
-    }
-    pos += nread;
-    return nread;
-  }
-
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    int nread = accessor.read(pos, buf);
-    if (nread < 0) {
-      return nread;
-    }
-    pos += nread;
-    return nread;
-  }
-
-  @Override
-  public long skip(long n) throws IOException {
-    // You cannot skip backwards
-    if (n <= 0) {
-      return 0;
-    }
-    // You can't skip past the last offset that we want to read with this
-    // block reader.
-    long oldPos = pos;
-    pos += n;
-    if (pos > visibleLength) {
-      pos = visibleLength;
-    }
-    return pos - oldPos;
-  }
-
-  @Override
-  public int available() {
-    // We return the amount of bytes between the current offset and the visible
-    // length.  Some of the other block readers return a shorter length than
-    // that.  The only advantage to returning a shorter length is that the
-    // DFSInputStream will trash your block reader and create a new one if
-    // someone tries to seek() beyond the available() region.
-    long diff = visibleLength - pos;
-    if (diff > Integer.MAX_VALUE) {
-      return Integer.MAX_VALUE;
-    } else {
-      return (int)diff;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    accessor.close();
-  }
-
-  @Override
-  public void readFully(byte[] buf, int offset, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, offset, len);
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public boolean isLocal() {
-    return accessor.isLocal();
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return accessor.isShortCircuit();
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    // For now, pluggable ReplicaAccessors do not support zero-copy.
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
deleted file mode 100644
index 028c964..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.core.TraceScope;
-import org.apache.htrace.core.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @deprecated this is an old implementation that is being left around
- * in case any issues spring up with the new {@link RemoteBlockReader2}
- * implementation.
- * It will be removed in the next release.
- */
-@InterfaceAudience.Private
-@Deprecated
-public class RemoteBlockReader extends FSInputChecker implements BlockReader {
-  static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
-
-  private final Peer peer;
-  private final DatanodeID datanodeID;
-  private final DataInputStream in;
-  private DataChecksum checksum;
-
-  /** offset in block of the last chunk received */
-  private long lastChunkOffset = -1;
-  private long lastChunkLen = -1;
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-
-  private final long blockId;
-
-  /** offset in block of of first chunk - may be less than startOffset
-   if startOffset is not chunk-aligned */
-  private final long firstChunkOffset;
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private final long bytesNeededToFinish;
-
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
-  private boolean eos = false;
-  private boolean sentStatusCode = false;
-
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
-
-  private final PeerCache peerCache;
-
-  private final Tracer tracer;
-
-  /* FSInputChecker interface */
-
-  /* same interface as inputStream java.io.InputStream#read()
-   * used by DFSInputStream#read()
-   * This violates one rule when there is a checksum error:
-   * "Read should not modify user buffer before successful read"
-   * because it first reads the data to user buffer and then checks
-   * the checksum.
-   */
-  @Override
-  public synchronized int read(byte[] buf, int off, int len)
-      throws IOException {
-
-    // This has to be set here, *before* the skip, since we can
-    // hit EOS during the skip, in the case that our entire read
-    // is smaller than the checksum chunk.
-    boolean eosBefore = eos;
-
-    //for the first read, skip the extra bytes at the front.
-    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-      // Skip these bytes. But don't call this.skip()!
-      int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( super.readAndDiscard(toSkip) != toSkip ) {
-        // should never happen
-        throw new IOException("Could not skip required number of bytes");
-      }
-    }
-
-    int nRead = super.read(buf, off, len);
-
-    // if eos was set in the previous read, send a status code to the DN
-    if (eos && !eosBefore && nRead >= 0) {
-      if (needChecksum()) {
-        sendReadResult(peer, Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(peer, Status.SUCCESS);
-      }
-    }
-    return nRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */
-    long nSkipped = 0;
-    while (nSkipped < n) {
-      int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
-      int ret = readAndDiscard(toSkip);
-      if (ret <= 0) {
-        return nSkipped;
-      }
-      nSkipped += ret;
-    }
-    return nSkipped;
-  }
-
-  @Override
-  public int read() throws IOException {
-    throw new IOException("read() is not expected to be invoked. " +
-        "Use read(buf, off, len) instead.");
-  }
-
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    /* Checksum errors are handled outside the BlockReader.
-     * DFSInputStream does not always call 'seekToNewSource'. In the
-     * case of pread(), it just tries a different replica without seeking.
-     */
-    return false;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    throw new IOException("Seek() is not supported in BlockInputChecker");
-  }
-
-  @Override
-  protected long getChunkPosition(long pos) {
-    throw new RuntimeException("getChunkPosition() is not supported, " +
-        "since seek is not required");
-  }
-
-  /**
-   * Makes sure that checksumBytes has enough capacity
-   * and limit is set to the number of checksum bytes needed
-   * to be read.
-   */
-  private void adjustChecksumBytes(int dataLen) {
-    int requiredSize =
-        ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-    } else {
-      checksumBytes.clear();
-    }
-    checksumBytes.limit(requiredSize);
-  }
-
-  @Override
-  protected synchronized int readChunk(long pos, byte[] buf, int offset,
-      int len, byte[] checksumBuf)
-      throws IOException {
-    try (TraceScope ignored = tracer.newScope(
-        "RemoteBlockReader#readChunk(" + blockId + ")")) {
-      return readChunkImpl(pos, buf, offset, len, checksumBuf);
-    }
-  }
-
-  private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
-      int len, byte[] checksumBuf)
-      throws IOException {
-    // Read one chunk.
-    if (eos) {
-      // Already hit EOF
-      return -1;
-    }
-
-    // Read one DATA_CHUNK.
-    long chunkOffset = lastChunkOffset;
-    if ( lastChunkLen > 0 ) {
-      chunkOffset += lastChunkLen;
-    }
-
-    // pos is relative to the start of the first chunk of the read.
-    // chunkOffset is relative to the start of the block.
-    // This makes sure that the read passed from FSInputChecker is the
-    // for the same chunk we expect to be reading from the DN.
-    if ( (pos + firstChunkOffset) != chunkOffset ) {
-      throw new IOException("Mismatch in pos : " + pos + " + " +
-          firstChunkOffset + " != " + chunkOffset);
-    }
-
-    // Read next packet if the previous packet has been read completely.
-    if (dataLeft <= 0) {
-      //Read packet headers.
-      PacketHeader header = new PacketHeader();
-      header.readFields(in);
-
-      LOG.debug("DFSClient readChunk got header {}", header);
-
-      // Sanity check the lengths
-      if (!header.sanityCheck(lastSeqNo)) {
-        throw new IOException("BlockReader: error in packet header " +
-            header);
-      }
-
-      lastSeqNo = header.getSeqno();
-      dataLeft = header.getDataLen();
-      adjustChecksumBytes(header.getDataLen());
-      if (header.getDataLen() > 0) {
-        IOUtils.readFully(in, checksumBytes.array(), 0,
-            checksumBytes.limit());
-      }
-    }
-
-    // Sanity checks
-    assert len >= bytesPerChecksum;
-    assert checksum != null;
-    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-    int checksumsToRead, bytesToRead;
-
-    if (checksumSize > 0) {
-
-      // How many chunks left in our packet - this is a ceiling
-      // since we may have a partial chunk at the end of the file
-      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-      // How many chunks we can fit in databuffer
-      //  - note this is a floor since we always read full chunks
-      int chunksCanFit = Math.min(len / bytesPerChecksum,
-          checksumBuf.length / checksumSize);
-
-      // How many chunks should we read
-      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-      // How many bytes should we actually read
-      bytesToRead = Math.min(
-          checksumsToRead * bytesPerChecksum, // full chunks
-          dataLeft); // in case we have a partial
-    } else {
-      // no checksum
-      bytesToRead = Math.min(dataLeft, len);
-      checksumsToRead = 0;
-    }
-
-    if ( bytesToRead > 0 ) {
-      // Assert we have enough space
-      assert bytesToRead <= len;
-      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-      assert checksumBuf.length >= checksumSize * checksumsToRead;
-      IOUtils.readFully(in, buf, offset, bytesToRead);
-      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
-    }
-
-    dataLeft -= bytesToRead;
-    assert dataLeft >= 0;
-
-    lastChunkOffset = chunkOffset;
-    lastChunkLen = bytesToRead;
-
-    // If there's no data left in the current packet after satisfying
-    // this read, and we have satisfied the client read, we expect
-    // an empty packet header from the DN to signify this.
-    // Note that pos + bytesToRead may in fact be greater since the
-    // DN finishes off the entire last chunk.
-    if (dataLeft == 0 &&
-        pos + bytesToRead >= bytesNeededToFinish) {
-
-      // Read header
-      PacketHeader hdr = new PacketHeader();
-      hdr.readFields(in);
-
-      if (!hdr.isLastPacketInBlock() ||
-          hdr.getDataLen() != 0) {
-        throw new IOException("Expected empty end-of-read packet! Header: " +
-            hdr);
-      }
-
-      eos = true;
-    }
-
-    if ( bytesToRead == 0 ) {
-      return -1;
-    }
-
-    return bytesToRead;
-  }
-
-  private RemoteBlockReader(String file, String bpid, long blockId,
-      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
-    // Path is used only for printing block and file information in debug
-    super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
-            ":" + bpid + ":of:"+ file)/*too non path-like?*/,
-        1, verifyChecksum,
-        checksum.getChecksumSize() > 0? checksum : null,
-        checksum.getBytesPerChecksum(),
-        checksum.getChecksumSize());
-
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
-
-    this.peer = peer;
-    this.datanodeID = datanodeID;
-    this.in = in;
-    this.checksum = checksum;
-    this.startOffset = Math.max( startOffset, 0 );
-    this.blockId = blockId;
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-    this.firstChunkOffset = firstChunkOffset;
-    lastChunkOffset = firstChunkOffset;
-    lastChunkLen = -1;
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-    this.peerCache = peerCache;
-    this.tracer = tracer;
-  }
-
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @return New BlockReader instance, or null on error.
-   */
-  public static RemoteBlockReader newBlockReader(String file,
-      ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken,
-      long startOffset, long len,
-      int bufferSize, boolean verifyChecksum,
-      String clientName, Peer peer,
-      DatanodeID datanodeID,
-      PeerCache peerCache,
-      CachingStrategy cachingStrategy,
-      Tracer tracer)
-      throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum, cachingStrategy);
-
-    //
-    // Get bytes in block, set streams
-    //
-
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(peer.getInputStream(), bufferSize));
-
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    RemoteBlockReader2.checkSuccess(status, peer, block, file);
-    ReadOpChecksumInfoProto checksumInfo =
-        status.getReadOpChecksumInfo();
-    DataChecksum checksum = DataTransferProtoUtil.fromProto(
-        checksumInfo.getChecksum());
-    //Warning when we get CHECKSUM_NULL?
-
-    // Read the first chunk offset.
-    long firstChunkOffset = checksumInfo.getChunkOffset();
-
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-          firstChunkOffset + ") startOffset is " +
-          startOffset + " for file " + file);
-    }
-
-    return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
-        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
-        peer, datanodeID, peerCache, tracer);
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null & sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-
-  @Override
-  public void readFully(byte[] buf, int readOffset, int amtToRead)
-      throws IOException {
-    IOUtils.readFully(this, buf, readOffset, amtToRead);
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return readFully(this, buf, offset, len);
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Peer peer, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-          peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
-  }
-
-  @Override
-  public int available() {
-    // An optimistic estimate of how much data is available
-    // to us without doing network I/O.
-    return RemoteBlockReader2.TCP_WINDOW_SIZE;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return false;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
deleted file mode 100644
index c15bd1b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.EnumSet;
-import java.util.UUID;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.core.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.htrace.core.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- *
- * This is a new implementation introduced in Hadoop 0.23 which
- * is more efficient and simpler than the older BlockReader
- * implementation. It should be renamed to RemoteBlockReader
- * once we are confident in it.
- */
-@InterfaceAudience.Private
-public class RemoteBlockReader2  implements BlockReader {
-
-  static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
-  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
-
-  final private Peer peer;
-  final private DatanodeID datanodeID;
-  final private PeerCache peerCache;
-  final private long blockId;
-  private final ReadableByteChannel in;
-
-  private DataChecksum checksum;
-  private final PacketReceiver packetReceiver = new PacketReceiver(true);
-
-  private ByteBuffer curDataSlice = null;
-
-  /** offset in block of the last chunk received */
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-  private final String filename;
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private long bytesNeededToFinish;
-
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
-  private final boolean verifyChecksum;
-
-  private boolean sentStatusCode = false;
-
-  private final Tracer tracer;
-
-  @VisibleForTesting
-  public Peer getPeer() {
-    return peer;
-  }
-
-  @Override
-  public synchronized int read(byte[] buf, int off, int len)
-      throws IOException {
-    UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
-    LOG.trace("Starting read #{} file {} from datanode {}",
-        randomId, filename, datanodeID.getHostName());
-
-    if (curDataSlice == null ||
-        curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-      try (TraceScope ignored = tracer.newScope(
-          "RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
-        readNextPacket();
-      }
-    }
-
-    LOG.trace("Finishing read #{}", randomId);
-
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-
-    int nRead = Math.min(curDataSlice.remaining(), len);
-    curDataSlice.get(buf, off, nRead);
-
-    return nRead;
-  }
-
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    if (curDataSlice == null ||
-        (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
-      try (TraceScope ignored = tracer.newScope(
-          "RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
-        readNextPacket();
-      }
-    }
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-
-    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
-    ByteBuffer writeSlice = curDataSlice.duplicate();
-    writeSlice.limit(writeSlice.position() + nRead);
-    buf.put(writeSlice);
-    curDataSlice.position(writeSlice.position());
-
-    return nRead;
-  }
-
-  private void readNextPacket() throws IOException {
-    //Read packet headers.
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader curHeader = packetReceiver.getHeader();
-    curDataSlice = packetReceiver.getDataSlice();
-    assert curDataSlice.capacity() == curHeader.getDataLen();
-
-    LOG.trace("DFSClient readNextPacket got header {}", curHeader);
-
-    // Sanity check the lengths
-    if (!curHeader.sanityCheck(lastSeqNo)) {
-      throw new IOException("BlockReader: error in packet header " +
-          curHeader);
-    }
-
-    if (curHeader.getDataLen() > 0) {
-      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
-      int checksumsLen = chunks * checksumSize;
-
-      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
-          "checksum slice capacity=" +
-              packetReceiver.getChecksumSlice().capacity() +
-              " checksumsLen=" + checksumsLen;
-
-      lastSeqNo = curHeader.getSeqno();
-      if (verifyChecksum && curDataSlice.remaining() > 0) {
-        // N.B.: the checksum error offset reported here is actually
-        // relative to the start of the block, not the start of the file.
-        // This is slightly misleading, but preserves the behavior from
-        // the older BlockReader.
-        checksum.verifyChunkedSums(curDataSlice,
-            packetReceiver.getChecksumSlice(),
-            filename, curHeader.getOffsetInBlock());
-      }
-      bytesNeededToFinish -= curHeader.getDataLen();
-    }
-
-    // First packet will include some data prior to the first byte
-    // the user requested. Skip it.
-    if (curHeader.getOffsetInBlock() < startOffset) {
-      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
-      curDataSlice.position(newPos);
-    }
-
-    // If we've now satisfied the whole client read, read one last packet
-    // header, which should be empty
-    if (bytesNeededToFinish <= 0) {
-      readTrailingEmptyPacket();
-      if (verifyChecksum) {
-        sendReadResult(Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(Status.SUCCESS);
-      }
-    }
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */
-    long skipped = 0;
-    while (skipped < n) {
-      long needToSkip = n - skipped;
-      if (curDataSlice == null ||
-          curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-        readNextPacket();
-      }
-      if (curDataSlice.remaining() == 0) {
-        // we're at EOF now
-        break;
-      }
-
-      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
-      curDataSlice.position(curDataSlice.position() + skip);
-      skipped += skip;
-    }
-    return skipped;
-  }
-
-  private void readTrailingEmptyPacket() throws IOException {
-    LOG.trace("Reading empty packet at end of read");
-
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader trailer = packetReceiver.getHeader();
-    if (!trailer.isLastPacketInBlock() ||
-        trailer.getDataLen() != 0) {
-      throw new IOException("Expected empty end-of-read packet! Header: " +
-          trailer);
-    }
-  }
-
-  protected RemoteBlockReader2(String file, long blockId,
-      DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
-    // Path is used only for printing block and file information in debug
-    this.peer = peer;
-    this.datanodeID = datanodeID;
-    this.in = peer.getInputStreamChannel();
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max( startOffset, 0 );
-    this.filename = file;
-    this.peerCache = peerCache;
-    this.blockId = blockId;
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-    this.tracer = tracer;
-  }
-
-
-  @Override
-  public synchronized void close() throws IOException {
-    packetReceiver.close();
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null && sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-          peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  /**
-   * Serialize the actual read result on the wire.
-   */
-  static void writeReadResult(OutputStream out, Status statusCode)
-      throws IOException {
-
-    ClientReadStatusProto.newBuilder()
-        .setStatus(statusCode)
-        .build()
-        .writeDelimitedTo(out);
-
-    out.flush();
-  }
-
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
-  }
-
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @param peer  The Peer to use
-   * @param datanodeID  The DatanodeID this peer is connected to
-   * @return New BlockReader instance, or null on error.
-   */
-  public static BlockReader newBlockReader(String file,
-      ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken,
-      long startOffset, long len,
-      boolean verifyChecksum,
-      String clientName,
-      Peer peer, DatanodeID datanodeID,
-      PeerCache peerCache,
-      CachingStrategy cachingStrategy,
-      Tracer tracer) throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-        peer.getOutputStream()));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum, cachingStrategy);
-
-    //
-    // Get bytes in block
-    //
-    DataInputStream in = new DataInputStream(peer.getInputStream());
-
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    checkSuccess(status, peer, block, file);
-    ReadOpChecksumInfoProto checksumInfo =
-        status.getReadOpChecksumInfo();
-    DataChecksum checksum = DataTransferProtoUtil.fromProto(
-        checksumInfo.getChecksum());
-    //Warning when we get CHECKSUM_NULL?
-
-    // Read the first chunk offset.
-    long firstChunkOffset = checksumInfo.getChunkOffset();
-
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-          firstChunkOffset + ") startOffset is " +
-          startOffset + " for file " + file);
-    }
-
-    return new RemoteBlockReader2(file, block.getBlockId(), checksum,
-        verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
-        peerCache, tracer);
-  }
-
-  static void checkSuccess(
-      BlockOpResponseProto status, Peer peer,
-      ExtendedBlock block, String file)
-      throws IOException {
-    String logInfo = "for OP_READ_BLOCK"
-        + ", self=" + peer.getLocalAddressString()
-        + ", remote=" + peer.getRemoteAddressString()
-        + ", for file " + file
-        + ", for pool " + block.getBlockPoolId()
-        + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
-    DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
-  }
-
-  @Override
-  public int available() {
-    // An optimistic estimate of how much data is available
-    // to us without doing network I/O.
-    return TCP_WINDOW_SIZE;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return false;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
new file mode 100644
index 0000000..5486295
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
@@ -0,0 +1,875 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.ReplicaAccessor;
+import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.PerformanceAdvisory;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.htrace.core.Tracer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to create BlockReader implementations.
+ */
+@InterfaceAudience.Private
+public class BlockReaderFactory implements ShortCircuitReplicaCreator {
+  static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
+
+  public static class FailureInjector {
+    public void injectRequestFileDescriptorsFailure() throws IOException {
+      // do nothing
+    }
+    public boolean getSupportsReceiptVerification() {
+      return true;
+    }
+  }
+
+  @VisibleForTesting
+  static ShortCircuitReplicaCreator
+      createShortCircuitReplicaInfoCallback = null;
+
+  private final DfsClientConf conf;
+
+  /**
+   * Injects failures into specific operations during unit tests.
+   */
+  private static FailureInjector failureInjector = new FailureInjector();
+
+  /**
+   * The file name, for logging and debugging purposes.
+   */
+  private String fileName;
+
+  /**
+   * The block ID and block pool ID to use.
+   */
+  private ExtendedBlock block;
+
+  /**
+   * The block token to use for security purposes.
+   */
+  private Token<BlockTokenIdentifier> token;
+
+  /**
+   * The offset within the block to start reading at.
+   */
+  private long startOffset;
+
+  /**
+   * If false, we won't try to verify the block checksum.
+   */
+  private boolean verifyChecksum;
+
+  /**
+   * The name of this client.
+   */
+  private String clientName;
+
+  /**
+   * The DataNode we're talking to.
+   */
+  private DatanodeInfo datanode;
+
+  /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
+  /**
+   * If false, we won't try short-circuit local reads.
+   */
+  private boolean allowShortCircuitLocalReads;
+
+  /**
+   * The ClientContext to use for things like the PeerCache.
+   */
+  private ClientContext clientContext;
+
+  /**
+   * Number of bytes to read. Must be set to a non-negative value.
+   */
+  private long length = -1;
+
+  /**
+   * Caching strategy to use when reading the block.
+   */
+  private CachingStrategy cachingStrategy;
+
+  /**
+   * Socket address to use to connect to peer.
+   */
+  private InetSocketAddress inetSocketAddress;
+
+  /**
+   * Remote peer factory to use to create a peer, if needed.
+   */
+  private RemotePeerFactory remotePeerFactory;
+
+  /**
+   * UserGroupInformation to use for legacy block reader local objects,
+   * if needed.
+   */
+  private UserGroupInformation userGroupInformation;
+
+  /**
+   * Configuration to use for legacy block reader local objects, if needed.
+   */
+  private Configuration configuration;
+
+  /**
+   * The HTrace tracer to use.
+   */
+  private Tracer tracer;
+
+  /**
+   * Information about the domain socket path we should use to connect to the
+   * local peer-- or null if we haven't examined the local domain socket.
+   */
+  private DomainSocketFactory.PathInfo pathInfo;
+
+  /**
+   * The remaining number of times that we'll try to pull a socket out of the
+   * cache.
+   */
+  private int remainingCacheTries;
+
+  public BlockReaderFactory(DfsClientConf conf) {
+    this.conf = conf;
+    this.remainingCacheTries = conf.getNumCachedConnRetry();
+  }
+
+  public BlockReaderFactory setFileName(String fileName) {
+    this.fileName = fileName;
+    return this;
+  }
+
+  public BlockReaderFactory setBlock(ExtendedBlock block) {
+    this.block = block;
+    return this;
+  }
+
+  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.token = token;
+    return this;
+  }
+
+  public BlockReaderFactory setStartOffset(long startOffset) {
+    this.startOffset = startOffset;
+    return this;
+  }
+
+  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
+    this.verifyChecksum = verifyChecksum;
+    return this;
+  }
+
+  public BlockReaderFactory setClientName(String clientName) {
+    this.clientName = clientName;
+    return this;
+  }
+
+  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
+    this.datanode = datanode;
+    return this;
+  }
+
+  public BlockReaderFactory setStorageType(StorageType storageType) {
+    this.storageType = storageType;
+    return this;
+  }
+
+  public BlockReaderFactory setAllowShortCircuitLocalReads(
+      boolean allowShortCircuitLocalReads) {
+    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
+    return this;
+  }
+
+  public BlockReaderFactory setClientCacheContext(
+      ClientContext clientContext) {
+    this.clientContext = clientContext;
+    return this;
+  }
+
+  public BlockReaderFactory setLength(long length) {
+    this.length = length;
+    return this;
+  }
+
+  public BlockReaderFactory setCachingStrategy(
+      CachingStrategy cachingStrategy) {
+    this.cachingStrategy = cachingStrategy;
+    return this;
+  }
+
+  public BlockReaderFactory setInetSocketAddress (
+      InetSocketAddress inetSocketAddress) {
+    this.inetSocketAddress = inetSocketAddress;
+    return this;
+  }
+
+  public BlockReaderFactory setUserGroupInformation(
+      UserGroupInformation userGroupInformation) {
+    this.userGroupInformation = userGroupInformation;
+    return this;
+  }
+
+  public BlockReaderFactory setRemotePeerFactory(
+      RemotePeerFactory remotePeerFactory) {
+    this.remotePeerFactory = remotePeerFactory;
+    return this;
+  }
+
+  public BlockReaderFactory setConfiguration(
+      Configuration configuration) {
+    this.configuration = configuration;
+    return this;
+  }
+
+  public BlockReaderFactory setTracer(Tracer tracer) {
+    this.tracer = tracer;
+    return this;
+  }
+
+  @VisibleForTesting
+  public static void setFailureInjectorForTesting(FailureInjector injector) {
+    failureInjector = injector;
+  }
+
+  /**
+   * Build a BlockReader with the given options.
+   *
+   * This function will do the best it can to create a block reader that meets
+   * all of our requirements.  We prefer short-circuit block readers
+   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
+   * former avoid the overhead of socket communication.  If short-circuit is
+   * unavailable, our next fallback is data transfer over UNIX domain sockets,
+   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
+   * work, we will try to create a remote block reader that operates over TCP
+   * sockets.
+   *
+   * There are a few caches that are important here.
+   *
+   * The ShortCircuitCache stores file descriptor objects which have been passed
+   * from the DataNode.
+   *
+   * The DomainSocketFactory stores information about UNIX domain socket paths
+   * that we not been able to use in the past, so that we don't waste time
+   * retrying them over and over.  (Like all the caches, it does have a timeout,
+   * though.)
+   *
+   * The PeerCache stores peers that we have used in the past.  If we can reuse
+   * one of these peers, we avoid the overhead of re-opening a socket.  However,
+   * if the socket has been timed out on the remote end, our attempt to reuse
+   * the socket may end with an IOException.  For that reason, we limit our
+   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
+   * that, we create new sockets.  This avoids the problem where a thread tries
+   * to talk to a peer that it hasn't talked to in a while, and has to clean out
+   * every entry in a socket cache full of stale entries.
+   *
+   * @return The new BlockReader.  We will not return null.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  public BlockReader build() throws IOException {
+    Preconditions.checkNotNull(configuration);
+    Preconditions
+        .checkState(length >= 0, "Length must be set to a non-negative value");
+    BlockReader reader = tryToCreateExternalBlockReader();
+    if (reader != null) {
+      return reader;
+    }
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
+    if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
+      if (clientContext.getUseLegacyBlockReaderLocal()) {
+        reader = getLegacyBlockReaderLocal();
+        if (reader != null) {
+          LOG.trace("{}: returning new legacy block reader local.", this);
+          return reader;
+        }
+      } else {
+        reader = getBlockReaderLocal();
+        if (reader != null) {
+          LOG.trace("{}: returning new block reader local.", this);
+          return reader;
+        }
+      }
+    }
+    if (scConf.isDomainSocketDataTraffic()) {
+      reader = getRemoteBlockReaderFromDomain();
+      if (reader != null) {
+        LOG.trace("{}: returning new remote block reader using UNIX domain "
+            + "socket on {}", this, pathInfo.getPath());
+        return reader;
+      }
+    }
+    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
+        "TCP reads were disabled for testing, but we failed to " +
+        "do a non-TCP read.");
+    return getRemoteBlockReaderFromTcp();
+  }
+
+  private BlockReader tryToCreateExternalBlockReader() {
+    List<Class<? extends ReplicaAccessorBuilder>> clses =
+        conf.getReplicaAccessorBuilderClasses();
+    for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
+      try {
+        ByteArrayDataOutput bado = ByteStreams.newDataOutput();
+        token.write(bado);
+        byte tokenBytes[] = bado.toByteArray();
+
+        Constructor<? extends ReplicaAccessorBuilder> ctor =
+            cls.getConstructor();
+        ReplicaAccessorBuilder builder = ctor.newInstance();
+        long visibleLength = startOffset + length;
+        ReplicaAccessor accessor = builder.
+            setAllowShortCircuitReads(allowShortCircuitLocalReads).
+            setBlock(block.getBlockId(), block.getBlockPoolId()).
+            setGenerationStamp(block.getGenerationStamp()).
+            setBlockAccessToken(tokenBytes).
+            setClientName(clientName).
+            setConfiguration(configuration).
+            setFileName(fileName).
+            setVerifyChecksum(verifyChecksum).
+            setVisibleLength(visibleLength).
+            build();
+        if (accessor == null) {
+          LOG.trace("{}: No ReplicaAccessor created by {}",
+              this, cls.getName());
+        } else {
+          return new ExternalBlockReader(accessor, visibleLength, startOffset);
+        }
+      } catch (Throwable t) {
+        LOG.warn("Failed to construct new object of type " +
+            cls.getName(), t);
+      }
+    }
+    return null;
+  }
+
+
+  /**
+   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+   * This block reader implements the path-based style of local reads
+   * first introduced in HDFS-2246.
+   */
+  private BlockReader getLegacyBlockReaderLocal() throws IOException {
+    LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
+    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
+      LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
+          + "{} is not local", this, inetSocketAddress);
+      return null;
+    }
+    if (clientContext.getDisableLegacyBlockReaderLocal()) {
+      PerformanceAdvisory.LOG.debug("{}: can't construct " +
+          "BlockReaderLocalLegacy because " +
+          "disableLegacyBlockReaderLocal is set.", this);
+      return null;
+    }
+    IOException ioe;
+    try {
+      return BlockReaderLocalLegacy.newBlockReader(conf,
+          userGroupInformation, configuration, fileName, block, token,
+          datanode, startOffset, length, storageType, tracer);
+    } catch (RemoteException remoteException) {
+      ioe = remoteException.unwrapRemoteException(
+                InvalidToken.class, AccessControlException.class);
+    } catch (IOException e) {
+      ioe = e;
+    }
+    if ((!(ioe instanceof AccessControlException)) &&
+        isSecurityException(ioe)) {
+      // Handle security exceptions.
+      // We do not handle AccessControlException here, since
+      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
+      // that the user is not in dfs.block.local-path-access.user, a condition
+      // which requires us to disable legacy SCR.
+      throw ioe;
+    }
+    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
+        "Disabling legacy local reads.", ioe);
+    clientContext.setDisableLegacyBlockReaderLocal();
+    return null;
+  }
+
+  private BlockReader getBlockReaderLocal() throws InvalidToken {
+    LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
+        + " reads.", this);
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+    }
+    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
+      PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
+              "giving up on BlockReaderLocal.", this, pathInfo);
+      return null;
+    }
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+        block.getBlockPoolId());
+    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
+    InvalidToken exc = info.getInvalidTokenException();
+    if (exc != null) {
+      LOG.trace("{}: got InvalidToken exception while trying to construct "
+          + "BlockReaderLocal via {}", this, pathInfo.getPath());
+      throw exc;
+    }
+    if (info.getReplica() == null) {
+      PerformanceAdvisory.LOG.debug("{}: failed to get " +
+          "ShortCircuitReplica. Cannot construct " +
+          "BlockReaderLocal via {}", this, pathInfo.getPath());
+      return null;
+    }
+    return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
+        setFilename(fileName).
+        setBlock(block).
+        setStartOffset(startOffset).
+        setShortCircuitReplica(info.getReplica()).
+        setVerifyChecksum(verifyChecksum).
+        setCachingStrategy(cachingStrategy).
+        setStorageType(storageType).
+        setTracer(tracer).
+        build();
+  }
+
+  /**
+   * Fetch a pair of short-circuit block descriptors from a local DataNode.
+   *
+   * @return    Null if we could not communicate with the datanode,
+   *            a new ShortCircuitReplicaInfo object otherwise.
+   *            ShortCircuitReplicaInfo objects may contain either an
+   *            InvalidToken exception, or a ShortCircuitReplica object ready to
+   *            use.
+   */
+  @Override
+  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+    if (createShortCircuitReplicaInfoCallback != null) {
+      ShortCircuitReplicaInfo info =
+          createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
+      if (info != null) return info;
+    }
+    LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
+    BlockReaderPeer curPeer;
+    while (true) {
+      curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      Slot slot = null;
+      ShortCircuitCache cache = clientContext.getShortCircuitCache();
+      try {
+        MutableBoolean usedPeer = new MutableBoolean(false);
+        slot = cache.allocShmSlot(datanode, peer, usedPeer,
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
+            clientName);
+        if (usedPeer.booleanValue()) {
+          LOG.trace("{}: allocShmSlot used up our previous socket {}.  "
+              + "Allocating a new one...", this, peer.getDomainSocket());
+          curPeer = nextDomainPeer();
+          if (curPeer == null) break;
+          peer = (DomainPeer)curPeer.peer;
+        }
+        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
+        clientContext.getPeerCache().put(datanode, peer);
+        return info;
+      } catch (IOException e) {
+        if (slot != null) {
+          cache.freeSlot(slot);
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached socket.
+          // These are considered less serious, because the socket may be stale.
+          LOG.debug("{}: closing stale domain peer {}", this, peer, e);
+          IOUtilsClient.cleanup(LOG, peer);
+        } else {
+          // Handle an I/O error we got when using a newly created socket.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn(this + ": I/O error requesting file descriptors.  " +
+              "Disabling domain socket " + peer.getDomainSocket(), e);
+          IOUtilsClient.cleanup(LOG, peer);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Request file descriptors from a DomainPeer.
+   *
+   * @param peer   The peer to use for communication.
+   * @param slot   If non-null, the shared memory slot to associate with the
+   *               new ShortCircuitReplica.
+   *
+   * @return  A ShortCircuitReplica object if we could communicate with the
+   *          datanode; null, otherwise.
+   * @throws  IOException If we encountered an I/O exception while communicating
+   *          with the datanode.
+   */
+  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
+          Slot slot) throws IOException {
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    SlotId slotId = slot == null ? null : slot.getSlotId();
+    new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
+        failureInjector.getSupportsReceiptVerification());
+    DataInputStream in = new DataInputStream(peer.getInputStream());
+    BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+        PBHelperClient.vintPrefixed(in));
+    DomainSocket sock = peer.getDomainSocket();
+    failureInjector.injectRequestFileDescriptorsFailure();
+    switch (resp.getStatus()) {
+    case SUCCESS:
+      byte buf[] = new byte[1];
+      FileInputStream[] fis = new FileInputStream[2];
+      sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      ShortCircuitReplica replica = null;
+      try {
+        ExtendedBlockId key =
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+        if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
+          LOG.trace("Sending receipt verification byte for slot {}", slot);
+          sock.getOutputStream().write(0);
+        }
+        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
+            Time.monotonicNow(), slot);
+        return new ShortCircuitReplicaInfo(replica);
+      } catch (IOException e) {
+        // This indicates an error reading from disk, or a format error.  Since
+        // it's not a socket communication problem, we return null rather than
+        // throwing an exception.
+        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
+        return null;
+      } finally {
+        if (replica == null) {
+          IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
+        }
+      }
+    case ERROR_UNSUPPORTED:
+      if (!resp.hasShortCircuitAccessVersion()) {
+        LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanode + ".  reason: " + resp.getMessage());
+        clientContext.getDomainSocketFactory()
+            .disableShortCircuitForPath(pathInfo.getPath());
+      } else {
+        LOG.warn("short-circuit read access for the file " +
+            fileName + " is disabled for DataNode " + datanode +
+            ".  reason: " + resp.getMessage());
+      }
+      return null;
+    case ERROR_ACCESS_TOKEN:
+      String msg = "access control error while " +
+          "attempting to set up short-circuit access to " +
+          fileName + resp.getMessage();
+      LOG.debug("{}:{}", this, msg);
+      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
+    default:
+      LOG.warn(this + ": unknown response code " + resp.getStatus() +
+          " while attempting to set up short-circuit access. " +
+          resp.getMessage());
+      clientContext.getDomainSocketFactory()
+          .disableShortCircuitForPath(pathInfo.getPath());
+      return null;
+    }
+  }
+
+  /**
+   * Get a BlockReaderRemote that communicates over a UNIX domain socket.
+   *
+   * @return The new BlockReader, or null if we failed to create the block
+   * reader.
+   *
+   * @throws InvalidToken    If the block token was invalid.
+   * Potentially other security-related execptions.
+   */
+  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
+    }
+    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
+      PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
+          "remote block reader because the UNIX domain socket at {}" +
+           " is not usable.", this, pathInfo);
+      return null;
+    }
+    LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
+        + "socket at {}", this, pathInfo.getPath());
+
+    while (true) {
+      BlockReaderPeer curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      BlockReader blockReader = null;
+      try {
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        IOUtilsClient.cleanup(LOG, peer);
+        if (isSecurityException(ioe)) {
+          LOG.trace("{}: got security exception while constructing a remote "
+                  + " block reader from the unix domain socket at {}",
+              this, pathInfo.getPath(), ioe);
+          throw ioe;
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious because the underlying socket may be stale.
+          LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
+        } else {
+          // Handle an I/O error we got when using a newly created domain peer.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn("I/O error constructing remote block reader.  Disabling " +
+              "domain socket " + peer.getDomainSocket(), ioe);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtilsClient.cleanup(LOG, peer);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a BlockReaderRemote that communicates over a TCP socket.
+   *
+   * @return The new BlockReader.  We will not return null, but instead throw
+   *         an exception if this fails.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
+    LOG.trace("{}: trying to create a remote block reader from a TCP socket",
+        this);
+    BlockReader blockReader = null;
+    while (true) {
+      BlockReaderPeer curPeer = null;
+      Peer peer = null;
+      try {
+        curPeer = nextTcpPeer();
+        if (curPeer.fromCache) remainingCacheTries--;
+        peer = curPeer.peer;
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        if (isSecurityException(ioe)) {
+          LOG.trace("{}: got security exception while constructing a remote "
+              + "block reader from {}", this, peer, ioe);
+          throw ioe;
+        }
+        if ((curPeer != null) && curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be
+          // stale.
+          LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
+        } else {
+          // Handle an I/O error we got when using a newly created peer.
+          LOG.warn("I/O error constructing remote block reader.", ioe);
+          throw ioe;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtilsClient.cleanup(LOG, peer);
+        }
+      }
+    }
+  }
+
+  public static class BlockReaderPeer {
+    final Peer peer;
+    final boolean fromCache;
+
+    BlockReaderPeer(Peer peer, boolean fromCache) {
+      this.peer = peer;
+      this.fromCache = fromCache;
+    }
+  }
+
+  /**
+   * Get the next DomainPeer-- either from the cache or by creating it.
+   *
+   * @return the next DomainPeer, or null if we could not construct one.
+   */
+  private BlockReaderPeer nextDomainPeer() {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, true);
+      if (peer != null) {
+        LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    DomainSocket sock = clientContext.getDomainSocketFactory().
+        createSocket(pathInfo, conf.getSocketTimeout());
+    if (sock == null) return null;
+    return new BlockReaderPeer(new DomainPeer(sock), false);
+  }
+
+  /**
+   * Get the next TCP-based peer-- either from the cache or by creating it.
+   *
+   * @return the next Peer, or null if we could not construct one.
+   *
+   * @throws IOException  If there was an error while constructing the peer
+   *                      (such as an InvalidEncryptionKeyException)
+   */
+  private BlockReaderPeer nextTcpPeer() throws IOException {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, false);
+      if (peer != null) {
+        LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    try {
+      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+          datanode);
+      LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
+      return new BlockReaderPeer(peer, false);
+    } catch (IOException e) {
+      LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
+          + "{}", datanode);
+      throw e;
+    }
+  }
+
+  /**
+   * Determine if an exception is security-related.
+   *
+   * We need to handle these exceptions differently than other IOExceptions.
+   * They don't indicate a communication problem.  Instead, they mean that there
+   * is some action the client needs to take, such as refetching block tokens,
+   * renewing encryption keys, etc.
+   *
+   * @param ioe    The exception
+   * @return       True only if the exception is security-related.
+   */
+  private static boolean isSecurityException(IOException ioe) {
+    return (ioe instanceof InvalidToken) ||
+            (ioe instanceof InvalidEncryptionKeyException) ||
+            (ioe instanceof InvalidBlockTokenException) ||
+            (ioe instanceof AccessControlException);
+  }
+
+  @SuppressWarnings("deprecation")
+  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
+      return BlockReaderRemote.newBlockReader(fileName,
+          block, token, startOffset, length, conf.getIoBufferSize(),
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy, tracer);
+    } else {
+      return BlockReaderRemote2.newBlockReader(
+          fileName, block, token, startOffset, length,
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy, tracer);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
+  }
+
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message