hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [09/50] [abbrv] hadoop git commit: HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Date Fri, 29 Apr 2016 20:20:05 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java
new file mode 100644
index 0000000..22d4e23
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.PeerCache;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @deprecated this is an old implementation that is being left around
+ * in case any issues spring up with the new {@link BlockReaderRemote2}
+ * implementation.
+ * It will be removed in the next release.
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class BlockReaderRemote extends FSInputChecker implements BlockReader {
+  static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
+
+  private final Peer peer;
+  private final DatanodeID datanodeID;
+  private final DataInputStream in;
+  private DataChecksum checksum;
+
+  /** offset in block of the last chunk received */
+  private long lastChunkOffset = -1;
+  private long lastChunkLen = -1;
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+
+  private final long blockId;
+
+  /** offset in block of of first chunk - may be less than startOffset
+   if startOffset is not chunk-aligned */
+  private final long firstChunkOffset;
+
+  private final int bytesPerChecksum;
+  private final int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private final long bytesNeededToFinish;
+
+  private boolean eos = false;
+  private boolean sentStatusCode = false;
+
+  ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
+  int dataLeft = 0;
+
+  private final PeerCache peerCache;
+
+  private final Tracer tracer;
+
+  private final int networkDistance;
+
+  /* FSInputChecker interface */
+
+  /* same interface as inputStream java.io.InputStream#read()
+   * used by DFSInputStream#read()
+   * This violates one rule when there is a checksum error:
+   * "Read should not modify user buffer before successful read"
+   * because it first reads the data to user buffer and then checks
+   * the checksum.
+   */
+  @Override
+  public synchronized int read(byte[] buf, int off, int len)
+      throws IOException {
+
+    // This has to be set here, *before* the skip, since we can
+    // hit EOS during the skip, in the case that our entire read
+    // is smaller than the checksum chunk.
+    boolean eosBefore = eos;
+
+    //for the first read, skip the extra bytes at the front.
+    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+      // Skip these bytes. But don't call this.skip()!
+      int toSkip = (int)(startOffset - firstChunkOffset);
+      if ( super.readAndDiscard(toSkip) != toSkip ) {
+        // should never happen
+        throw new IOException("Could not skip required number of bytes");
+      }
+    }
+
+    int nRead = super.read(buf, off, len);
+
+    // if eos was set in the previous read, send a status code to the DN
+    if (eos && !eosBefore && nRead >= 0) {
+      if (needChecksum()) {
+        sendReadResult(peer, Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(peer, Status.SUCCESS);
+      }
+    }
+    return nRead;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */
+    long nSkipped = 0;
+    while (nSkipped < n) {
+      int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
+      int ret = readAndDiscard(toSkip);
+      if (ret <= 0) {
+        return nSkipped;
+      }
+      nSkipped += ret;
+    }
+    return nSkipped;
+  }
+
+  @Override
+  public int read() throws IOException {
+    throw new IOException("read() is not expected to be invoked. " +
+        "Use read(buf, off, len) instead.");
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    /* Checksum errors are handled outside the BlockReader.
+     * DFSInputStream does not always call 'seekToNewSource'. In the
+     * case of pread(), it just tries a different replica without seeking.
+     */
+    return false;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throw new IOException("Seek() is not supported in BlockInputChecker");
+  }
+
+  @Override
+  protected long getChunkPosition(long pos) {
+    throw new RuntimeException("getChunkPosition() is not supported, " +
+        "since seek is not required");
+  }
+
+  /**
+   * Makes sure that checksumBytes has enough capacity
+   * and limit is set to the number of checksum bytes needed
+   * to be read.
+   */
+  private void adjustChecksumBytes(int dataLen) {
+    int requiredSize =
+        ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+    } else {
+      checksumBytes.clear();
+    }
+    checksumBytes.limit(requiredSize);
+  }
+
+  @Override
+  protected synchronized int readChunk(long pos, byte[] buf, int offset,
+      int len, byte[] checksumBuf)
+      throws IOException {
+    try (TraceScope ignored = tracer.newScope(
+        "BlockReaderRemote#readChunk(" + blockId + ")")) {
+      return readChunkImpl(pos, buf, offset, len, checksumBuf);
+    }
+  }
+
+  private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
+      int len, byte[] checksumBuf)
+      throws IOException {
+    // Read one chunk.
+    if (eos) {
+      // Already hit EOF
+      return -1;
+    }
+
+    // Read one DATA_CHUNK.
+    long chunkOffset = lastChunkOffset;
+    if ( lastChunkLen > 0 ) {
+      chunkOffset += lastChunkLen;
+    }
+
+    // pos is relative to the start of the first chunk of the read.
+    // chunkOffset is relative to the start of the block.
+    // This makes sure that the read passed from FSInputChecker is the
+    // for the same chunk we expect to be reading from the DN.
+    if ( (pos + firstChunkOffset) != chunkOffset ) {
+      throw new IOException("Mismatch in pos : " + pos + " + " +
+          firstChunkOffset + " != " + chunkOffset);
+    }
+
+    // Read next packet if the previous packet has been read completely.
+    if (dataLeft <= 0) {
+      //Read packet headers.
+      PacketHeader header = new PacketHeader();
+      header.readFields(in);
+
+      LOG.debug("DFSClient readChunk got header {}", header);
+
+      // Sanity check the lengths
+      if (!header.sanityCheck(lastSeqNo)) {
+        throw new IOException("BlockReader: error in packet header " +
+            header);
+      }
+
+      lastSeqNo = header.getSeqno();
+      dataLeft = header.getDataLen();
+      adjustChecksumBytes(header.getDataLen());
+      if (header.getDataLen() > 0) {
+        IOUtils.readFully(in, checksumBytes.array(), 0,
+            checksumBytes.limit());
+      }
+    }
+
+    // Sanity checks
+    assert len >= bytesPerChecksum;
+    assert checksum != null;
+    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+    int checksumsToRead, bytesToRead;
+
+    if (checksumSize > 0) {
+
+      // How many chunks left in our packet - this is a ceiling
+      // since we may have a partial chunk at the end of the file
+      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+      // How many chunks we can fit in databuffer
+      //  - note this is a floor since we always read full chunks
+      int chunksCanFit = Math.min(len / bytesPerChecksum,
+          checksumBuf.length / checksumSize);
+
+      // How many chunks should we read
+      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+      // How many bytes should we actually read
+      bytesToRead = Math.min(
+          checksumsToRead * bytesPerChecksum, // full chunks
+          dataLeft); // in case we have a partial
+    } else {
+      // no checksum
+      bytesToRead = Math.min(dataLeft, len);
+      checksumsToRead = 0;
+    }
+
+    if ( bytesToRead > 0 ) {
+      // Assert we have enough space
+      assert bytesToRead <= len;
+      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+      assert checksumBuf.length >= checksumSize * checksumsToRead;
+      IOUtils.readFully(in, buf, offset, bytesToRead);
+      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+    }
+
+    dataLeft -= bytesToRead;
+    assert dataLeft >= 0;
+
+    lastChunkOffset = chunkOffset;
+    lastChunkLen = bytesToRead;
+
+    // If there's no data left in the current packet after satisfying
+    // this read, and we have satisfied the client read, we expect
+    // an empty packet header from the DN to signify this.
+    // Note that pos + bytesToRead may in fact be greater since the
+    // DN finishes off the entire last chunk.
+    if (dataLeft == 0 &&
+        pos + bytesToRead >= bytesNeededToFinish) {
+
+      // Read header
+      PacketHeader hdr = new PacketHeader();
+      hdr.readFields(in);
+
+      if (!hdr.isLastPacketInBlock() ||
+          hdr.getDataLen() != 0) {
+        throw new IOException("Expected empty end-of-read packet! Header: " +
+            hdr);
+      }
+
+      eos = true;
+    }
+
+    if ( bytesToRead == 0 ) {
+      return -1;
+    }
+
+    return bytesToRead;
+  }
+
+  private BlockReaderRemote(String file, String bpid, long blockId,
+      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+      int networkDistance) {
+    // Path is used only for printing block and file information in debug
+    super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
+            ":" + bpid + ":of:"+ file)/*too non path-like?*/,
+        1, verifyChecksum,
+        checksum.getChecksumSize() > 0? checksum : null,
+        checksum.getBytesPerChecksum(),
+        checksum.getChecksumSize());
+
+    this.peer = peer;
+    this.datanodeID = datanodeID;
+    this.in = in;
+    this.checksum = checksum;
+    this.startOffset = Math.max( startOffset, 0 );
+    this.blockId = blockId;
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+    this.firstChunkOffset = firstChunkOffset;
+    lastChunkOffset = firstChunkOffset;
+    lastChunkLen = -1;
+
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+    this.peerCache = peerCache;
+    this.tracer = tracer;
+    this.networkDistance = networkDistance;
+  }
+
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
+  public static BlockReaderRemote newBlockReader(String file,
+      ExtendedBlock block,
+      Token<BlockTokenIdentifier> blockToken,
+      long startOffset, long len,
+      int bufferSize, boolean verifyChecksum,
+      String clientName, Peer peer,
+      DatanodeID datanodeID,
+      PeerCache peerCache,
+      CachingStrategy cachingStrategy,
+      Tracer tracer, int networkDistance)
+      throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    final DataOutputStream out =
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+        verifyChecksum, cachingStrategy);
+
+    //
+    // Get bytes in block, set streams
+    //
+
+    DataInputStream in = new DataInputStream(
+        new BufferedInputStream(peer.getInputStream(), bufferSize));
+
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        PBHelperClient.vintPrefixed(in));
+    BlockReaderRemote2.checkSuccess(status, peer, block, file);
+    ReadOpChecksumInfoProto checksumInfo =
+        status.getReadOpChecksumInfo();
+    DataChecksum checksum = DataTransferProtoUtil.fromProto(
+        checksumInfo.getChecksum());
+    //Warning when we get CHECKSUM_NULL?
+
+    // Read the first chunk offset.
+    long firstChunkOffset = checksumInfo.getChunkOffset();
+
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+          firstChunkOffset + ") startOffset is " +
+          startOffset + " for file " + file);
+    }
+
+    return new BlockReaderRemote(file, block.getBlockPoolId(), block.getBlockId(),
+        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
+        peer, datanodeID, peerCache, tracer, networkDistance);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    startOffset = -1;
+    checksum = null;
+    if (peerCache != null & sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+
+  @Override
+  public void readFully(byte[] buf, int readOffset, int amtToRead)
+      throws IOException {
+    IOUtils.readFully(this, buf, readOffset, amtToRead);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return readFully(this, buf, offset, len);
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Peer peer, Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
+    try {
+      BlockReaderRemote2.writeReadResult(peer.getOutputStream(), statusCode);
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+          peer.getRemoteAddressString() + ": " + e.getMessage());
+    }
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    throw new UnsupportedOperationException("readDirect unsupported in BlockReaderRemote");
+  }
+
+  @Override
+  public int available() {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return BlockReaderRemote2.TCP_WINDOW_SIZE;
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return false;
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    return null;
+  }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
+
+  @Override
+  public int getNetworkDistance() {
+    return networkDistance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java
new file mode 100644
index 0000000..ebdc3fd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderRemote2.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.EnumSet;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.PeerCache;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.htrace.core.TraceScope;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.htrace.core.Tracer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ *
+ * This is a new implementation introduced in Hadoop 0.23 which
+ * is more efficient and simpler than the older BlockReader
+ * implementation. It should be renamed to BlockReaderRemote
+ * once we are confident in it.
+ */
+@InterfaceAudience.Private
+public class BlockReaderRemote2 implements BlockReader {
+
+  static final Logger LOG = LoggerFactory.getLogger(BlockReaderRemote2.class);
+  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
+
+  final private Peer peer;
+  final private DatanodeID datanodeID;
+  final private PeerCache peerCache;
+  final private long blockId;
+  private final ReadableByteChannel in;
+
+  private DataChecksum checksum;
+  private final PacketReceiver packetReceiver = new PacketReceiver(true);
+
+  private ByteBuffer curDataSlice = null;
+
+  /** offset in block of the last chunk received */
+  private long lastSeqNo = -1;
+
+  /** offset in block where reader wants to actually read */
+  private long startOffset;
+  private final String filename;
+
+  private final int bytesPerChecksum;
+  private final int checksumSize;
+
+  /**
+   * The total number of bytes we need to transfer from the DN.
+   * This is the amount that the user has requested plus some padding
+   * at the beginning so that the read can begin on a chunk boundary.
+   */
+  private long bytesNeededToFinish;
+
+  private final boolean verifyChecksum;
+
+  private boolean sentStatusCode = false;
+
+  private final Tracer tracer;
+
+  private final int networkDistance;
+
+  @VisibleForTesting
+  public Peer getPeer() {
+    return peer;
+  }
+
+  @Override
+  public synchronized int read(byte[] buf, int off, int len)
+      throws IOException {
+    UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
+    LOG.trace("Starting read #{} file {} from datanode {}",
+        randomId, filename, datanodeID.getHostName());
+
+    if (curDataSlice == null ||
+        curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+      try (TraceScope ignored = tracer.newScope(
+          "BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
+        readNextPacket();
+      }
+    }
+
+    LOG.trace("Finishing read #{}", randomId);
+
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
+
+    int nRead = Math.min(curDataSlice.remaining(), len);
+    curDataSlice.get(buf, off, nRead);
+
+    return nRead;
+  }
+
+
+  @Override
+  public synchronized int read(ByteBuffer buf) throws IOException {
+    if (curDataSlice == null ||
+        (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
+      try (TraceScope ignored = tracer.newScope(
+          "BlockReaderRemote2#readNextPacket(" + blockId + ")")) {
+        readNextPacket();
+      }
+    }
+    if (curDataSlice.remaining() == 0) {
+      // we're at EOF now
+      return -1;
+    }
+
+    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+    ByteBuffer writeSlice = curDataSlice.duplicate();
+    writeSlice.limit(writeSlice.position() + nRead);
+    buf.put(writeSlice);
+    curDataSlice.position(writeSlice.position());
+
+    return nRead;
+  }
+
+  private void readNextPacket() throws IOException {
+    //Read packet headers.
+    packetReceiver.receiveNextPacket(in);
+
+    PacketHeader curHeader = packetReceiver.getHeader();
+    curDataSlice = packetReceiver.getDataSlice();
+    assert curDataSlice.capacity() == curHeader.getDataLen();
+
+    LOG.trace("DFSClient readNextPacket got header {}", curHeader);
+
+    // Sanity check the lengths
+    if (!curHeader.sanityCheck(lastSeqNo)) {
+      throw new IOException("BlockReader: error in packet header " +
+          curHeader);
+    }
+
+    if (curHeader.getDataLen() > 0) {
+      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+      int checksumsLen = chunks * checksumSize;
+
+      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+          "checksum slice capacity=" +
+              packetReceiver.getChecksumSlice().capacity() +
+              " checksumsLen=" + checksumsLen;
+
+      lastSeqNo = curHeader.getSeqno();
+      if (verifyChecksum && curDataSlice.remaining() > 0) {
+        // N.B.: the checksum error offset reported here is actually
+        // relative to the start of the block, not the start of the file.
+        // This is slightly misleading, but preserves the behavior from
+        // the older BlockReader.
+        checksum.verifyChunkedSums(curDataSlice,
+            packetReceiver.getChecksumSlice(),
+            filename, curHeader.getOffsetInBlock());
+      }
+      bytesNeededToFinish -= curHeader.getDataLen();
+    }
+
+    // First packet will include some data prior to the first byte
+    // the user requested. Skip it.
+    if (curHeader.getOffsetInBlock() < startOffset) {
+      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+      curDataSlice.position(newPos);
+    }
+
+    // If we've now satisfied the whole client read, read one last packet
+    // header, which should be empty
+    if (bytesNeededToFinish <= 0) {
+      readTrailingEmptyPacket();
+      if (verifyChecksum) {
+        sendReadResult(Status.CHECKSUM_OK);
+      } else {
+        sendReadResult(Status.SUCCESS);
+      }
+    }
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    /* How can we make sure we don't throw a ChecksumException, at least
+     * in majority of the cases?. This one throws. */
+    long skipped = 0;
+    while (skipped < n) {
+      long needToSkip = n - skipped;
+      if (curDataSlice == null ||
+          curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+        readNextPacket();
+      }
+      if (curDataSlice.remaining() == 0) {
+        // we're at EOF now
+        break;
+      }
+
+      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+      curDataSlice.position(curDataSlice.position() + skip);
+      skipped += skip;
+    }
+    return skipped;
+  }
+
+  private void readTrailingEmptyPacket() throws IOException {
+    LOG.trace("Reading empty packet at end of read");
+
+    packetReceiver.receiveNextPacket(in);
+
+    PacketHeader trailer = packetReceiver.getHeader();
+    if (!trailer.isLastPacketInBlock() ||
+        trailer.getDataLen() != 0) {
+      throw new IOException("Expected empty end-of-read packet! Header: " +
+          trailer);
+    }
+  }
+
+  protected BlockReaderRemote2(String file, long blockId,
+      DataChecksum checksum, boolean verifyChecksum,
+      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+      int networkDistance) {
+    // Path is used only for printing block and file information in debug
+    this.peer = peer;
+    this.datanodeID = datanodeID;
+    this.in = peer.getInputStreamChannel();
+    this.checksum = checksum;
+    this.verifyChecksum = verifyChecksum;
+    this.startOffset = Math.max( startOffset, 0 );
+    this.filename = file;
+    this.peerCache = peerCache;
+    this.blockId = blockId;
+
+    // The total number of bytes that we need to transfer from the DN is
+    // the amount that the user wants (bytesToRead), plus the padding at
+    // the beginning in order to chunk-align. Note that the DN may elect
+    // to send more than this amount if the read starts/ends mid-chunk.
+    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+    bytesPerChecksum = this.checksum.getBytesPerChecksum();
+    checksumSize = this.checksum.getChecksumSize();
+    this.tracer = tracer;
+    this.networkDistance = networkDistance;
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    packetReceiver.close();
+    startOffset = -1;
+    checksum = null;
+    if (peerCache != null && sentStatusCode) {
+      peerCache.put(datanodeID, peer);
+    } else {
+      peer.close();
+    }
+
+    // in will be closed when its Socket is closed.
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + peer;
+    try {
+      writeReadResult(peer.getOutputStream(), statusCode);
+      sentStatusCode = true;
+    } catch (IOException e) {
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+          peer.getRemoteAddressString() + ": " + e.getMessage());
+    }
+  }
+
+  /**
+   * Serialize the actual read result on the wire.
+   */
+  static void writeReadResult(OutputStream out, Status statusCode)
+      throws IOException {
+
+    ClientReadStatusProto.newBuilder()
+        .setStatus(statusCode)
+        .build()
+        .writeDelimitedTo(out);
+
+    out.flush();
+  }
+
+  /**
+   * File name to print when accessing a block directly (from servlets)
+   * @param s Address of the block location
+   * @param poolId Block pool ID of the block
+   * @param blockId Block ID of the block
+   * @return string that has a file name for debug purposes
+   */
+  public static String getFileName(final InetSocketAddress s,
+      final String poolId, final long blockId) {
+    return s.toString() + ":" + poolId + ":" + blockId;
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public void readFully(byte[] buf, int off, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, off, len);
+  }
+
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @param peer  The Peer to use
+   * @param datanodeID  The DatanodeID this peer is connected to
+   * @return New BlockReader instance, or null on error.
+   */
+  public static BlockReader newBlockReader(String file,
+      ExtendedBlock block,
+      Token<BlockTokenIdentifier> blockToken,
+      long startOffset, long len,
+      boolean verifyChecksum,
+      String clientName,
+      Peer peer, DatanodeID datanodeID,
+      PeerCache peerCache,
+      CachingStrategy cachingStrategy,
+      Tracer tracer,
+      int networkDistance) throws IOException {
+    // in and out will be closed when sock is closed (by the caller)
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+        peer.getOutputStream()));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+        verifyChecksum, cachingStrategy);
+
+    //
+    // Get bytes in block
+    //
+    DataInputStream in = new DataInputStream(peer.getInputStream());
+
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        PBHelperClient.vintPrefixed(in));
+    checkSuccess(status, peer, block, file);
+    ReadOpChecksumInfoProto checksumInfo =
+        status.getReadOpChecksumInfo();
+    DataChecksum checksum = DataTransferProtoUtil.fromProto(
+        checksumInfo.getChecksum());
+    //Warning when we get CHECKSUM_NULL?
+
+    // Read the first chunk offset.
+    long firstChunkOffset = checksumInfo.getChunkOffset();
+
+    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+      throw new IOException("BlockReader: error in first chunk offset (" +
+          firstChunkOffset + ") startOffset is " +
+          startOffset + " for file " + file);
+    }
+
+    return new BlockReaderRemote2(file, block.getBlockId(), checksum,
+        verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
+        peerCache, tracer, networkDistance);
+  }
+
+  static void checkSuccess(
+      BlockOpResponseProto status, Peer peer,
+      ExtendedBlock block, String file)
+      throws IOException {
+    String logInfo = "for OP_READ_BLOCK"
+        + ", self=" + peer.getLocalAddressString()
+        + ", remote=" + peer.getRemoteAddressString()
+        + ", for file " + file
+        + ", for pool " + block.getBlockPoolId()
+        + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
+    DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
+  }
+
+  @Override
+  public int available() {
+    // An optimistic estimate of how much data is available
+    // to us without doing network I/O.
+    return TCP_WINDOW_SIZE;
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return false;
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    return null;
+  }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
+
+  @Override
+  public int getNetworkDistance() {
+    return networkDistance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java
new file mode 100644
index 0000000..57fbe47
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.BlockReader;
+
+import java.io.IOException;
+
+/**
+ * For sharing between the local and remote block reader implementations.
+ */
+@InterfaceAudience.Private
+class BlockReaderUtil {
+
+  /* See {@link BlockReader#readAll(byte[], int, int)} */
+  public static int readAll(BlockReader reader,
+      byte[] buf, int offset, int len) throws IOException {
+    int n = 0;
+    for (;;) {
+      int nread = reader.read(buf, offset + n, len - n);
+      if (nread <= 0)
+        return (n == 0) ? nread : n;
+      n += nread;
+      if (n >= len)
+        return n;
+    }
+  }
+
+  /* See {@link BlockReader#readFully(byte[], int, int)} */
+  public static void readFully(BlockReader reader,
+      byte[] buf, int off, int len) throws IOException {
+    int toRead = len;
+    while (toRead > 0) {
+      int ret = reader.read(buf, off, toRead);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java
new file mode 100644
index 0000000..3d6d9a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ReplicaAccessor;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
+ * replicas.
+ */
+@InterfaceAudience.Private
+public final class ExternalBlockReader implements BlockReader {
+  private final ReplicaAccessor accessor;
+  private final long visibleLength;
+  private long pos;
+
+  ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
+                      long startOffset) {
+    this.accessor = accessor;
+    this.visibleLength = visibleLength;
+    this.pos = startOffset;
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws IOException {
+    int nread = accessor.read(pos, buf, off, len);
+    if (nread < 0) {
+      return nread;
+    }
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    int nread = accessor.read(pos, buf);
+    if (nread < 0) {
+      return nread;
+    }
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    // You cannot skip backwards
+    if (n <= 0) {
+      return 0;
+    }
+    // You can't skip past the last offset that we want to read with this
+    // block reader.
+    long oldPos = pos;
+    pos += n;
+    if (pos > visibleLength) {
+      pos = visibleLength;
+    }
+    return pos - oldPos;
+  }
+
+  @Override
+  public int available() {
+    // We return the amount of bytes between the current offset and the visible
+    // length.  Some of the other block readers return a shorter length than
+    // that.  The only advantage to returning a shorter length is that the
+    // DFSInputStream will trash your block reader and create a new one if
+    // someone tries to seek() beyond the available() region.
+    long diff = visibleLength - pos;
+    if (diff > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int)diff;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    accessor.close();
+  }
+
+  @Override
+  public void readFully(byte[] buf, int offset, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, offset, len);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return accessor.isShortCircuit();
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    // For now, pluggable ReplicaAccessors do not support zero-copy.
+    return null;
+  }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return null;
+  }
+
+  @Override
+  public int getNetworkDistance() {
+    return accessor.getNetworkDistance();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 3ae8b59..426fb72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -133,7 +133,7 @@
 
      <!-- Don't complain about LocalDatanodeInfo's anonymous class -->
      <Match>
-       <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />
+       <Class name="org.apache.hadoop.hdfs.client.impl.BlockReaderLocal$LocalDatanodeInfo$1" />
        <Bug pattern="SE_BAD_FIELD_INNER_CLASS" />
      </Match>
      <!-- Only one method increments numFailedVolumes and it is synchronized -->

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index 7f71bf7..0379946 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
-import org.apache.hadoop.hdfs.RemoteBlockReader2;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote2;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -112,7 +112,7 @@ class StripedBlockReader {
          *
          * TODO: add proper tracer
          */
-      return RemoteBlockReader2.newBlockReader(
+      return BlockReaderRemote2.newBlockReader(
           "dummy", block, blockToken, offsetInBlock,
           block.getNumBytes() - offsetInBlock, true,
           "", newConnectedPeer(block, dnAddr, blockToken, source), source,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 80f510c..2451df3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.BlockReader;
-import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
index a1af1fc..32a34e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
@@ -38,7 +38,7 @@ import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
deleted file mode 100644
index 1ca1ca5..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsTracer;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
-import org.apache.hadoop.hdfs.server.namenode.CacheManager;
-import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-
-/**
- * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
- */
-public class BlockReaderTestUtil {
-  /**
-   * Returns true if we should run tests that generate large files (> 1GB)
-   */
-  static public boolean shouldTestLargeFiles() {
-    String property = System.getProperty("hdfs.test.large.files");
-    if (property == null) return false;
-    if (property.isEmpty()) return true;
-    return Boolean.parseBoolean(property);
-  }
-
-  private HdfsConfiguration conf = null;
-  private MiniDFSCluster cluster = null;
-
-  /**
-   * Setup the cluster
-   */
-  public BlockReaderTestUtil(int replicationFactor) throws Exception {
-    this(replicationFactor, new HdfsConfiguration());
-  }
-
-  public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception {
-    this.conf = config;
-    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);
-    cluster = new MiniDFSCluster.Builder(conf).format(true).build();
-    cluster.waitActive();
-  }
-
-  /**
-   * Shutdown cluster
-   */
-  public void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  public MiniDFSCluster getCluster() {
-    return cluster;
-  }
-
-  public HdfsConfiguration getConf() {
-    return conf;
-  }
-
-  /**
-   * Create a file of the given size filled with random data.
-   * @return  File data.
-   */
-  public byte[] writeFile(Path filepath, int sizeKB)
-      throws IOException {
-    FileSystem fs = cluster.getFileSystem();
-
-    // Write a file with the specified amount of data
-    DataOutputStream os = fs.create(filepath);
-    byte data[] = new byte[1024 * sizeKB];
-    new Random().nextBytes(data);
-    os.write(data);
-    os.close();
-    return data;
-  }
-
-  /**
-   * Get the list of Blocks for a file.
-   */
-  public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB)
-      throws IOException {
-    // Return the blocks we just wrote
-    DFSClient dfsclient = getDFSClient();
-    return dfsclient.getNamenode().getBlockLocations(
-      filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
-  }
-
-  /**
-   * Get the DFSClient.
-   */
-  public DFSClient getDFSClient() throws IOException {
-    InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort());
-    return new DFSClient(nnAddr, conf);
-  }
-
-  /**
-   * Exercise the BlockReader and read length bytes.
-   *
-   * It does not verify the bytes read.
-   */
-  public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof)
-      throws IOException {
-    byte buf[] = new byte[1024];
-    int nRead = 0;
-    while (nRead < length) {
-      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
-      int n = reader.read(buf, 0, buf.length);
-      assertTrue(n > 0);
-      nRead += n;
-    }
-
-    if (expectEof) {
-      DFSClient.LOG.info("Done reading, expect EOF for next read.");
-      assertEquals(-1, reader.read(buf, 0, buf.length));
-    }
-  }
-
-  /**
-   * Get a BlockReader for the given block.
-   */
-  public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
-      throws IOException {
-    return getBlockReader(cluster.getFileSystem(), testBlock, offset, lenToRead);
-  }
-
-  /**
-   * Get a BlockReader for the given block.
-   */
-  public static BlockReader getBlockReader(final DistributedFileSystem fs,
-      LocatedBlock testBlock, int offset, long lenToRead) throws IOException {
-    InetSocketAddress targetAddr = null;
-    ExtendedBlock block = testBlock.getBlock();
-    DatanodeInfo[] nodes = testBlock.getLocations();
-    targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
-
-    return new BlockReaderFactory(fs.getClient().getConf()).
-      setInetSocketAddress(targetAddr).
-      setBlock(block).
-      setFileName(targetAddr.toString()+ ":" + block.getBlockId()).
-      setBlockToken(testBlock.getBlockToken()).
-      setStartOffset(offset).
-      setLength(lenToRead).
-      setVerifyChecksum(true).
-      setClientName("BlockReaderTestUtil").
-      setDatanodeInfo(nodes[0]).
-      setClientCacheContext(ClientContext.getFromConf(fs.getConf())).
-      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
-      setConfiguration(fs.getConf()).
-      setAllowShortCircuitLocalReads(true).
-      setTracer(FsTracer.get(fs.getConf())).
-      setRemotePeerFactory(new RemotePeerFactory() {
-        @Override
-        public Peer newConnectedPeer(InetSocketAddress addr,
-            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
-            throws IOException {
-          Peer peer = null;
-          Socket sock = NetUtils.
-              getDefaultSocketFactory(fs.getConf()).createSocket();
-          try {
-            sock.connect(addr, HdfsConstants.READ_TIMEOUT);
-            sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
-            peer = DFSUtilClient.peerFromSocket(sock);
-          } finally {
-            if (peer == null) {
-              IOUtils.closeQuietly(sock);
-            }
-          }
-          return peer;
-        }
-      }).
-      build();
-  }
-
-  /**
-   * Get a DataNode that serves our testBlock.
-   */
-  public DataNode getDataNode(LocatedBlock testBlock) {
-    DatanodeInfo[] nodes = testBlock.getLocations();
-    int ipcport = nodes[0].getIpcPort();
-    return cluster.getDataNode(ipcport);
-  }
-  
-  public static void enableHdfsCachingTracing() {
-    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(CacheManager.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(FsDatasetCache.class.getName()).setLevel(
-        Level.TRACE);
-  }
-
-  public static void enableBlockReaderFactoryTracing() {
-    LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
-        Level.TRACE);
-  }
-
-  public static void enableShortCircuitShmTracing() {
-    LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(DataNode.class.getName()).setLevel(
-        Level.TRACE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index ccc94a4..3c58133 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
deleted file mode 100644
index 3d916a7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-abstract public class TestBlockReaderBase {
-  private BlockReaderTestUtil util;
-  private byte[] blockData;
-  private BlockReader reader;
-
-  /**
-   * if override this, make sure return array length is less than
-   * block size.
-   */
-  byte [] getBlockData() {
-    int length = 1 << 22;
-    byte[] data = new byte[length];
-    for (int i = 0; i < length; i++) {
-      data[i] = (byte) (i % 133);
-    }
-    return data;
-  }
-
-  private BlockReader getBlockReader(LocatedBlock block) throws Exception {
-    return util.getBlockReader(block, 0, blockData.length);
-  }
-
-  abstract HdfsConfiguration createConf();
-
-  @Before
-  public void setup() throws Exception {
-    util = new BlockReaderTestUtil(1, createConf());
-    blockData = getBlockData();
-    DistributedFileSystem fs = util.getCluster().getFileSystem();
-    Path testfile = new Path("/testfile");
-    FSDataOutputStream fout = fs.create(testfile);
-    fout.write(blockData);
-    fout.close();
-    LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
-    reader = getBlockReader(blk);
-  }
-
-  @After
-  public void shutdown() throws Exception {
-    util.shutdown();
-  }
-
-  @Test(timeout=60000)
-  public void testSkip() throws IOException {
-    Random random = new Random();
-    byte [] buf = new byte[1];
-    for (int pos = 0; pos < blockData.length;) {
-      long skip = random.nextInt(100) + 1;
-      long skipped = reader.skip(skip);
-      if (pos + skip >= blockData.length) {
-        assertEquals(blockData.length, pos + skipped);
-        break;
-      } else {
-        assertEquals(skip, skipped);
-        pos += skipped;
-        assertEquals(1, reader.read(buf, 0, 1));
-
-        assertEquals(blockData[pos], buf[0]);
-        pos += 1;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
deleted file mode 100644
index a392c6c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
-import static org.hamcrest.CoreMatchers.equalTo;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
-import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.TemporarySocketDirectory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-public class TestBlockReaderFactory {
-  static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class);
-
-  @Before
-  public void init() {
-    DomainSocket.disableBindPathValidation();
-    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
-  }
-
-  @After
-  public void cleanup() {
-    DFSInputStream.tcpReadsDisabledForTesting = false;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
-  }
-
-  public static Configuration createShortCircuitConf(String testName,
-      TemporarySocketDirectory sockDir) {
-    Configuration conf = new Configuration();
-    conf.set(DFS_CLIENT_CONTEXT, testName);
-    conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
-    conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
-        testName + "._PORT").getAbsolutePath());
-    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
-    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
-        false);
-    conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
-    return conf;
-  }
-
-  /**
-   * If we have a UNIX domain socket configured,
-   * and we have dfs.client.domain.socket.data.traffic set to true,
-   * and short-circuit access fails, we should still be able to pass
-   * data traffic over the UNIX domain socket.  Test this.
-   */
-  @Test(timeout=60000)
-  public void testFallbackFromShortCircuitToUnixDomainTraffic()
-      throws Exception {
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-
-    // The server is NOT configured with short-circuit local reads;
-    // the client is.  Both support UNIX domain reads.
-    Configuration clientConf = createShortCircuitConf(
-        "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir);
-    clientConf.set(DFS_CLIENT_CONTEXT,
-        "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
-    clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
-    Configuration serverConf = new Configuration(clientConf);
-    serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
-
-    MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    FileSystem dfs = FileSystem.get(cluster.getURI(0), clientConf);
-    String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 8193;
-    final int SEED = 0xFADED;
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    cluster.shutdown();
-    sockDir.close();
-  }
-  
-  /**
-   * Test the case where we have multiple threads waiting on the
-   * ShortCircuitCache delivering a certain ShortCircuitReplica.
-   *
-   * In this case, there should only be one call to
-   * createShortCircuitReplicaInfo.  This one replica should be shared
-   * by all threads.
-   */
-  @Test(timeout=60000)
-  public void testMultipleWaitersOnShortCircuitCache()
-      throws Exception {
-    final CountDownLatch latch = new CountDownLatch(1);
-    final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
-    final AtomicBoolean testFailed = new AtomicBoolean(false);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
-      new ShortCircuitCache.ShortCircuitReplicaCreator() {
-        @Override
-        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-          Uninterruptibles.awaitUninterruptibly(latch);
-          if (!creationIsBlocked.compareAndSet(true, false)) {
-            Assert.fail("there were multiple calls to "
-                + "createShortCircuitReplicaInfo.  Only one was expected.");
-          }
-          return null;
-        }
-      };
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testMultipleWaitersOnShortCircuitCache", sockDir);
-    MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADED;
-    final int NUM_THREADS = 10;
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    Runnable readerRunnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
-          Assert.assertFalse(creationIsBlocked.get());
-          byte expected[] = DFSTestUtil.
-              calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-          Assert.assertTrue(Arrays.equals(contents, expected));
-        } catch (Throwable e) {
-          LOG.error("readerRunnable error", e);
-          testFailed.set(true);
-        }
-      }
-    };
-    Thread threads[] = new Thread[NUM_THREADS];
-    for (int i = 0; i < NUM_THREADS; i++) {
-      threads[i] = new Thread(readerRunnable);
-      threads[i].start();
-    }
-    Thread.sleep(500);
-    latch.countDown();
-    for (int i = 0; i < NUM_THREADS; i++) {
-      Uninterruptibles.joinUninterruptibly(threads[i]);
-    }
-    cluster.shutdown();
-    sockDir.close();
-    Assert.assertFalse(testFailed.get());
-  }
-
-  /**
-   * Test the case where we have a failure to complete a short circuit read
-   * that occurs, and then later on, we have a success.
-   * Any thread waiting on a cache load should receive the failure (if it
-   * occurs);  however, the failure result should not be cached.  We want 
-   * to be able to retry later and succeed.
-   */
-  @Test(timeout=60000)
-  public void testShortCircuitCacheTemporaryFailure()
-      throws Exception {
-    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
-    final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true);
-    final AtomicBoolean testFailed = new AtomicBoolean(false);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
-      new ShortCircuitCache.ShortCircuitReplicaCreator() {
-        @Override
-        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-          if (replicaCreationShouldFail.get()) {
-            // Insert a short delay to increase the chance that one client
-            // thread waits for the other client thread's failure via
-            // a condition variable.
-            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
-            return new ShortCircuitReplicaInfo();
-          }
-          return null;
-        }
-      };
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testShortCircuitCacheTemporaryFailure", sockDir);
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int NUM_THREADS = 2;
-    final int SEED = 0xFADED;
-    final CountDownLatch gotFailureLatch = new CountDownLatch(NUM_THREADS);
-    final CountDownLatch shouldRetryLatch = new CountDownLatch(1);
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    Runnable readerRunnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          // First time should fail.
-          List<LocatedBlock> locatedBlocks = 
-              cluster.getNameNode().getRpcServer().getBlockLocations(
-              TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
-          LocatedBlock lblock = locatedBlocks.get(0); // first block
-          BlockReader blockReader = null;
-          try {
-            blockReader = BlockReaderTestUtil.getBlockReader(
-                cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
-            Assert.fail("expected getBlockReader to fail the first time.");
-          } catch (Throwable t) { 
-            Assert.assertTrue("expected to see 'TCP reads were disabled " +
-                "for testing' in exception " + t, t.getMessage().contains(
-                "TCP reads were disabled for testing"));
-          } finally {
-            if (blockReader != null) blockReader.close(); // keep findbugs happy
-          }
-          gotFailureLatch.countDown();
-          shouldRetryLatch.await();
-
-          // Second time should succeed.
-          try {
-            blockReader = BlockReaderTestUtil.getBlockReader(
-                cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
-          } catch (Throwable t) { 
-            LOG.error("error trying to retrieve a block reader " +
-                "the second time.", t);
-            throw t;
-          } finally {
-            if (blockReader != null) blockReader.close();
-          }
-        } catch (Throwable t) {
-          LOG.error("getBlockReader failure", t);
-          testFailed.set(true);
-        }
-      }
-    };
-    Thread threads[] = new Thread[NUM_THREADS];
-    for (int i = 0; i < NUM_THREADS; i++) {
-      threads[i] = new Thread(readerRunnable);
-      threads[i].start();
-    }
-    gotFailureLatch.await();
-    replicaCreationShouldFail.set(false);
-    shouldRetryLatch.countDown();
-    for (int i = 0; i < NUM_THREADS; i++) {
-      Uninterruptibles.joinUninterruptibly(threads[i]);
-    }
-    cluster.shutdown();
-    sockDir.close();
-    Assert.assertFalse(testFailed.get());
-  }
-
-   /**
-   * Test that a client which supports short-circuit reads using
-   * shared memory can fall back to not using shared memory when
-   * the server doesn't support it.
-   */
-  @Test
-  public void testShortCircuitReadFromServerWithoutShm() throws Exception {
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration clientConf = createShortCircuitConf(
-        "testShortCircuitReadFromServerWithoutShm", sockDir);
-    Configuration serverConf = new Configuration(clientConf);
-    serverConf.setInt(
-        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    clientConf.set(DFS_CLIENT_CONTEXT,
-        "testShortCircuitReadFromServerWithoutShm_clientContext");
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADEC;
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    final ShortCircuitCache cache =
-        fs.dfs.getClientContext().getShortCircuitCache();
-    final DatanodeInfo datanode =
-        new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
-    cache.getDfsClientShmManager().visit(new Visitor() {
-      @Override
-      public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
-          throws IOException {
-        Assert.assertEquals(1,  info.size());
-        PerDatanodeVisitorInfo vinfo = info.get(datanode);
-        Assert.assertTrue(vinfo.disabled);
-        Assert.assertEquals(0, vinfo.full.size());
-        Assert.assertEquals(0, vinfo.notFull.size());
-      }
-    });
-    cluster.shutdown();
-    sockDir.close();
-  }
- 
-  /**
-   * Test that a client which does not support short-circuit reads using
-   * shared memory can talk with a server which supports it.
-   */
-  @Test
-  public void testShortCircuitReadFromClientWithoutShm() throws Exception {
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration clientConf = createShortCircuitConf(
-        "testShortCircuitReadWithoutShm", sockDir);
-    Configuration serverConf = new Configuration(clientConf);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    clientConf.setInt(
-        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
-    clientConf.set(DFS_CLIENT_CONTEXT,
-        "testShortCircuitReadFromClientWithoutShm_clientContext");
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADEC;
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    final ShortCircuitCache cache =
-        fs.dfs.getClientContext().getShortCircuitCache();
-    Assert.assertEquals(null, cache.getDfsClientShmManager());
-    cluster.shutdown();
-    sockDir.close();
-  }
-  
-  /**
-   * Test shutting down the ShortCircuitCache while there are things in it.
-   */
-  @Test
-  public void testShortCircuitCacheShutdown() throws Exception {
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testShortCircuitCacheShutdown", sockDir);
-    conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
-    Configuration serverConf = new Configuration(conf);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADEC;
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    final ShortCircuitCache cache =
-        fs.dfs.getClientContext().getShortCircuitCache();
-    cache.close();
-    Assert.assertTrue(cache.getDfsClientShmManager().
-        getDomainSocketWatcher().isClosed());
-    cluster.shutdown();
-    sockDir.close();
-  }
-
-  /**
-   * When an InterruptedException is sent to a thread calling
-   * FileChannel#read, the FileChannel is immediately closed and the
-   * thread gets an exception.  This effectively means that we might have
-   * someone asynchronously calling close() on the file descriptors we use
-   * in BlockReaderLocal.  So when unreferencing a ShortCircuitReplica in
-   * ShortCircuitCache#unref, we should check if the FileChannel objects
-   * are still open.  If not, we should purge the replica to avoid giving
-   * it out to any future readers.
-   *
-   * This is a regression test for HDFS-6227: Short circuit read failed
-   * due to ClosedChannelException.
-   *
-   * Note that you may still get ClosedChannelException errors if two threads
-   * are reading from the same replica and an InterruptedException is delivered
-   * to one of them.
-   */
-  @Test(timeout=120000)
-  public void testPurgingClosedReplicas() throws Exception {
-    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
-    final AtomicInteger replicasCreated = new AtomicInteger(0);
-    final AtomicBoolean testFailed = new AtomicBoolean(false);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
-        new ShortCircuitCache.ShortCircuitReplicaCreator() {
-          @Override
-          public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-            replicasCreated.incrementAndGet();
-            return null;
-          }
-        };
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testPurgingClosedReplicas", sockDir);
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4095;
-    final int SEED = 0xFADE0;
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-
-    final Semaphore sem = new Semaphore(0);
-    final List<LocatedBlock> locatedBlocks =
-        cluster.getNameNode().getRpcServer().getBlockLocations(
-            TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
-    final LocatedBlock lblock = locatedBlocks.get(0); // first block
-    final byte[] buf = new byte[TEST_FILE_LEN];
-    Runnable readerRunnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          while (true) {
-            BlockReader blockReader = null;
-            try {
-              blockReader = BlockReaderTestUtil.getBlockReader(
-                  cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
-              sem.release();
-              try {
-                blockReader.readAll(buf, 0, TEST_FILE_LEN);
-              } finally {
-                sem.acquireUninterruptibly();
-              }
-            } catch (ClosedByInterruptException e) {
-              LOG.info("got the expected ClosedByInterruptException", e);
-              sem.release();
-              break;
-            } finally {
-              if (blockReader != null) blockReader.close();
-            }
-            LOG.info("read another " + TEST_FILE_LEN + " bytes.");
-          }
-        } catch (Throwable t) {
-          LOG.error("getBlockReader failure", t);
-          testFailed.set(true);
-          sem.release();
-        }
-      }
-    };
-    Thread thread = new Thread(readerRunnable);
-    thread.start();
-
-    // While the thread is reading, send it interrupts.
-    // These should trigger a ClosedChannelException.
-    while (thread.isAlive()) {
-      sem.acquireUninterruptibly();
-      thread.interrupt();
-      sem.release();
-    }
-    Assert.assertFalse(testFailed.get());
-
-    // We should be able to read from the file without
-    // getting a ClosedChannelException.
-    BlockReader blockReader = null;
-    try {
-      blockReader = BlockReaderTestUtil.getBlockReader(
-          cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
-      blockReader.readFully(buf, 0, TEST_FILE_LEN);
-    } finally {
-      if (blockReader != null) blockReader.close();
-    }
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(buf, expected));
-
-    // Another ShortCircuitReplica object should have been created.
-    Assert.assertEquals(2, replicasCreated.get());
-
-    dfs.close();
-    cluster.shutdown();
-    sockDir.close();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message