hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [11/50] [abbrv] hadoop git commit: HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Date Fri, 29 Apr 2016 20:20:07 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
deleted file mode 100644
index 65a8373..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ /dev/null
@@ -1,744 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.hdfs.util.IOUtilsClient;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DirectBufferPool;
-import org.apache.htrace.core.TraceScope;
-import org.apache.htrace.core.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client
- * is on the same machine as the datanode, then the client can read files
- * directly from the local file system rather than going through the datanode
- * for better performance. <br>
- *
- * This is the legacy implementation based on HDFS-2246, which requires
- * permissions on the datanode to be set so that clients can directly access the
- * blocks. The new implementation based on HDFS-347 should be preferred on UNIX
- * systems where the required native code has been implemented.<br>
- *
- * {@link BlockReaderLocalLegacy} works as follows:
- * <ul>
- * <li>The client performing short circuit reads must be configured at the
- * datanode.</li>
- * <li>The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call</li>
- * <li>Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.</li>
- * </ul>
- */
-@InterfaceAudience.Private
-class BlockReaderLocalLegacy implements BlockReader {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      BlockReaderLocalLegacy.class);
-
-  //Stores the cache and proxy for a local datanode.
-  private static class LocalDatanodeInfo {
-    private ClientDatanodeProtocol proxy = null;
-    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
-
-    LocalDatanodeInfo() {
-      final int cacheSize = 10000;
-      final float hashTableLoadFactor = 0.75f;
-      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor)
-          + 1;
-      cache = Collections
-          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
-              hashTableCapacity, hashTableLoadFactor, true) {
-            private static final long serialVersionUID = 1;
-
-            @Override
-            protected boolean removeEldestEntry(
-                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
-              return size() > cacheSize;
-            }
-          });
-    }
-
-    private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        UserGroupInformation ugi, final DatanodeInfo node,
-        final Configuration conf, final int socketTimeout,
-        final boolean connectToDnViaHostname) throws IOException {
-      if (proxy == null) {
-        try {
-          proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf,
-                  socketTimeout, connectToDnViaHostname);
-            }
-          });
-        } catch (InterruptedException e) {
-          LOG.warn("encountered exception ", e);
-        }
-      }
-      return proxy;
-    }
-
-    private synchronized void resetDatanodeProxy() {
-      if (null != proxy) {
-        RPC.stopProxy(proxy);
-        proxy = null;
-      }
-    }
-
-    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
-      return cache.get(b);
-    }
-
-    private void setBlockLocalPathInfo(ExtendedBlock b,
-        BlockLocalPathInfo info) {
-      cache.put(b, info);
-    }
-
-    private void removeBlockLocalPathInfo(ExtendedBlock b) {
-      cache.remove(b);
-    }
-  }
-
-  // Multiple datanodes could be running on the local machine. Store proxies in
-  // a map keyed by the ipc port of the datanode.
-  private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap =
-      new HashMap<>();
-
-  private final FileInputStream dataIn; // reader for the data file
-  private final FileInputStream checksumIn;   // reader for the checksum file
-
-  /**
-   * Offset from the most recent chunk boundary at which the next read should
-   * take place. Is only set to non-zero at construction time, and is
-   * decremented (usually to 0) by subsequent reads. This avoids having to do a
-   * checksum read at construction to position the read cursor correctly.
-   */
-  private int offsetFromChunkBoundary;
-
-  private byte[] skipBuf = null;
-
-  /**
-   * Used for checksummed reads that need to be staged before copying to their
-   * output buffer because they are either a) smaller than the checksum chunk
-   * size or b) issued by the slower read(byte[]...) path
-   */
-  private ByteBuffer slowReadBuff = null;
-  private ByteBuffer checksumBuff = null;
-  private DataChecksum checksum;
-  private final boolean verifyChecksum;
-
-  private static final DirectBufferPool bufferPool = new DirectBufferPool();
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-  private final String filename;
-  private long blockId;
-  private final Tracer tracer;
-
-  /**
-   * The only way this object can be instantiated.
-   */
-  static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
-      UserGroupInformation userGroupInformation,
-      Configuration configuration, String file, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> token, DatanodeInfo node,
-      long startOffset, long length, StorageType storageType,
-      Tracer tracer) throws IOException {
-    final ShortCircuitConf scConf = conf.getShortCircuitConf();
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
-        .getIpcPort());
-    // check the cache first
-    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
-    if (pathinfo == null) {
-      if (userGroupInformation == null) {
-        userGroupInformation = UserGroupInformation.getCurrentUser();
-      }
-      pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
-          configuration, conf.getSocketTimeout(), token,
-          conf.isConnectToDnViaHostname(), storageType);
-    }
-
-    // check to see if the file exists. It may so happen that the
-    // HDFS file has been deleted and this block-lookup is occurring
-    // on behalf of a new HDFS file. This time, the block file could
-    // be residing in a different portion of the fs.data.dir directory.
-    // In this case, we remove this entry from the cache. The next
-    // call to this method will re-populate the cache.
-    FileInputStream dataIn = null;
-    FileInputStream checksumIn = null;
-    BlockReaderLocalLegacy localBlockReader = null;
-    final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
-        || storageType.isTransient();
-    try {
-      // get a local file system
-      File blkfile = new File(pathinfo.getBlockPath());
-      dataIn = new FileInputStream(blkfile);
-
-      LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset "
-              + "{} length {} short circuit checksum {}",
-          blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck);
-
-      if (!skipChecksumCheck) {
-        // get the metadata file
-        File metafile = new File(pathinfo.getMetaPath());
-        checksumIn = new FileInputStream(metafile);
-
-        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
-            new DataInputStream(checksumIn), blk);
-        long firstChunkOffset = startOffset
-            - (startOffset % checksum.getBytesPerChecksum());
-        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
-            startOffset, checksum, true, dataIn, firstChunkOffset, checksumIn,
-            tracer);
-      } else {
-        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk,
-            startOffset, dataIn, tracer);
-      }
-    } catch (IOException e) {
-      // remove from cache
-      localDatanodeInfo.removeBlockLocalPathInfo(blk);
-      LOG.warn("BlockReaderLocalLegacy: Removing " + blk
-          + " from cache because local file " + pathinfo.getBlockPath()
-          + " could not be opened.");
-      throw e;
-    } finally {
-      if (localBlockReader == null) {
-        if (dataIn != null) {
-          dataIn.close();
-        }
-        if (checksumIn != null) {
-          checksumIn.close();
-        }
-      }
-    }
-    return localBlockReader;
-  }
-
-  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
-    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
-    if (ldInfo == null) {
-      ldInfo = new LocalDatanodeInfo();
-      localDatanodeInfoMap.put(port, ldInfo);
-    }
-    return ldInfo;
-  }
-
-  private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
-      ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
-      StorageType storageType) throws IOException {
-    LocalDatanodeInfo localDatanodeInfo =
-        getLocalDatanodeInfo(node.getIpcPort());
-    BlockLocalPathInfo pathinfo;
-    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
-        conf, timeout, connectToDnViaHostname);
-    try {
-      // make RPC to local datanode to find local pathnames of blocks
-      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
-      // We can't cache the path information for a replica on transient storage.
-      // If the replica gets evicted, then it moves to a different path.  Then,
-      // our next attempt to read from the cached path would fail to find the
-      // file.  Additionally, the failure would cause us to disable legacy
-      // short-circuit read for all subsequent use in the ClientContext.  Unlike
-      // the newer short-circuit read implementation, we have no communication
-      // channel for the DataNode to notify the client that the path has been
-      // invalidated.  Therefore, our only option is to skip caching.
-      if (pathinfo != null && !storageType.isTransient()) {
-        LOG.debug("Cached location of block {} as {}", blk, pathinfo);
-        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
-      }
-    } catch (IOException e) {
-      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
-      throw e;
-    }
-    return pathinfo;
-  }
-
-  private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
-      int bytesPerChecksum) {
-    if (bufferSizeBytes < bytesPerChecksum) {
-      throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
-          "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
-          "a single chunk (" + bytesPerChecksum +  "). Please configure " +
-          HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
-          " appropriately");
-    }
-
-    // Round down to nearest chunk size
-    return bufferSizeBytes / bytesPerChecksum;
-  }
-
-  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
-      ExtendedBlock block, long startOffset, FileInputStream dataIn,
-      Tracer tracer) throws IOException {
-    this(conf, hdfsfile, block, startOffset,
-        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
-        dataIn, startOffset, null, tracer);
-  }
-
-  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
-      ExtendedBlock block, long startOffset, DataChecksum checksum,
-      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
-      FileInputStream checksumIn, Tracer tracer) throws IOException {
-    this.filename = hdfsfile;
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max(startOffset, 0);
-    this.blockId = block.getBlockId();
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-
-    this.dataIn = dataIn;
-    this.checksumIn = checksumIn;
-    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-
-    final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
-        conf.getShortCircuitBufferSize(), bytesPerChecksum);
-    slowReadBuff = bufferPool.getBuffer(
-        bytesPerChecksum * chunksPerChecksumRead);
-    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
-    // Initially the buffers have nothing to read.
-    slowReadBuff.flip();
-    checksumBuff.flip();
-    boolean success = false;
-    try {
-      // Skip both input streams to beginning of the chunk containing
-      // startOffset
-      IOUtils.skipFully(dataIn, firstChunkOffset);
-      if (checksumIn != null) {
-        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) *
-            checksumSize;
-        IOUtils.skipFully(checksumIn, checkSumOffset);
-      }
-      success = true;
-    } finally {
-      if (!success) {
-        bufferPool.returnBuffer(slowReadBuff);
-        bufferPool.returnBuffer(checksumBuff);
-      }
-    }
-    this.tracer = tracer;
-  }
-
-  /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached
-   */
-  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
-      throws IOException {
-    try (TraceScope ignored = tracer.
-        newScope("BlockReaderLocalLegacy#fillBuffer(" + blockId + ")")) {
-      int bytesRead = stream.getChannel().read(buf);
-      if (bytesRead < 0) {
-        //EOF
-        return bytesRead;
-      }
-      while (buf.remaining() > 0) {
-        int n = stream.getChannel().read(buf);
-        if (n < 0) {
-          //EOF
-          return bytesRead;
-        }
-        bytesRead += n;
-      }
-      return bytesRead;
-    }
-  }
-
-  /**
-   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
-   * another.
-   */
-  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
-    int oldLimit = from.limit();
-    from.limit(from.position() + length);
-    try {
-      to.put(from);
-    } finally {
-      from.limit(oldLimit);
-    }
-  }
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    int nRead = 0;
-    if (verifyChecksum) {
-      // A 'direct' read actually has three phases. The first drains any
-      // remaining bytes from the slow read buffer. After this the read is
-      // guaranteed to be on a checksum chunk boundary. If there are still bytes
-      // to read, the fast direct path is used for as many remaining bytes as
-      // possible, up to a multiple of the checksum chunk size. Finally, any
-      // 'odd' bytes remaining at the end of the read cause another slow read to
-      // be issued, which involves an extra copy.
-
-      // Every 'slow' read tries to fill the slow read buffer in one go for
-      // efficiency's sake. As described above, all non-checksum-chunk-aligned
-      // reads will be served from the slower read path.
-
-      if (slowReadBuff.hasRemaining()) {
-        // There are remaining bytes from a small read available. This usually
-        // means this read is unaligned, which falls back to the slow path.
-        int fromSlowReadBuff = Math.min(buf.remaining(),
-            slowReadBuff.remaining());
-        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-        nRead += fromSlowReadBuff;
-      }
-
-      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
-        // Since we have drained the 'small read' buffer, we are guaranteed to
-        // be chunk-aligned
-        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
-
-        // There's only enough checksum buffer space available to checksum one
-        // entire slow read buffer. This saves keeping the number of checksum
-        // chunks around.
-        len = Math.min(len, slowReadBuff.capacity());
-        int oldlimit = buf.limit();
-        buf.limit(buf.position() + len);
-        int readResult = 0;
-        try {
-          readResult = doByteBufferRead(buf);
-        } finally {
-          buf.limit(oldlimit);
-        }
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          nRead += readResult;
-          buf.position(buf.position() + readResult);
-        }
-      }
-
-      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
-      // until chunk boundary
-      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) ||
-          offsetFromChunkBoundary > 0) {
-        int toRead = Math.min(buf.remaining(),
-            bytesPerChecksum - offsetFromChunkBoundary);
-        int readResult = fillSlowReadBuffer(toRead);
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
-          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-          nRead += fromSlowReadBuff;
-        }
-      }
-    } else {
-      // Non-checksummed reads are much easier; we can just fill the buffer
-      // directly.
-      nRead = doByteBufferRead(buf);
-      if (nRead > 0) {
-        buf.position(buf.position() + nRead);
-      }
-    }
-    return nRead;
-  }
-
-  /**
-   * Tries to read as many bytes as possible into supplied buffer, checksumming
-   * each chunk if needed.
-   *
-   * <b>Preconditions:</b>
-   * <ul>
-   * <li>
-   * If checksumming is enabled, buf.remaining must be a multiple of
-   * bytesPerChecksum. Note that this is not a requirement for clients of
-   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
-   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
-   * method.
-   * </li>
-   * </ul>
-   * <b>Postconditions:</b>
-   * <ul>
-   * <li>buf.limit and buf.mark are unchanged.</li>
-   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
-   * requested bytes can be read straight from the buffer</li>
-   * </ul>
-   *
-   * @param buf
-   *          byte buffer to write bytes to. If checksums are not required, buf
-   *          can have any number of bytes remaining, otherwise there must be a
-   *          multiple of the checksum chunk size remaining.
-   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
-   *         that is, the the number of useful bytes (up to the amount
-   *         requested) readable from the buffer by the client.
-   */
-  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
-    if (verifyChecksum) {
-      assert buf.remaining() % bytesPerChecksum == 0;
-    }
-    int dataRead;
-
-    int oldpos = buf.position();
-    // Read as much as we can into the buffer.
-    dataRead = fillBuffer(dataIn, buf);
-
-    if (dataRead == -1) {
-      return -1;
-    }
-
-    if (verifyChecksum) {
-      ByteBuffer toChecksum = buf.duplicate();
-      toChecksum.position(oldpos);
-      toChecksum.limit(oldpos + dataRead);
-
-      checksumBuff.clear();
-      // Equivalent to
-      // (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
-      int numChunks =
-          (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
-      checksumBuff.limit(checksumSize * numChunks);
-
-      fillBuffer(checksumIn, checksumBuff);
-      checksumBuff.flip();
-
-      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
-          this.startOffset);
-    }
-
-    if (dataRead >= 0) {
-      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
-    }
-
-    if (dataRead < offsetFromChunkBoundary) {
-      // yikes, didn't even get enough bytes to honour offset. This can happen
-      // even if we are verifying checksums if we are at EOF.
-      offsetFromChunkBoundary -= dataRead;
-      dataRead = 0;
-    } else {
-      dataRead -= offsetFromChunkBoundary;
-      offsetFromChunkBoundary = 0;
-    }
-
-    return dataRead;
-  }
-
-  /**
-   * Ensures that up to len bytes are available and checksummed in the slow read
-   * buffer. The number of bytes available to read is returned. If the buffer is
-   * not already empty, the number of remaining bytes is returned and no actual
-   * read happens.
-   *
-   * @param len
-   *          the maximum number of bytes to make available. After len bytes
-   *          are read, the underlying bytestream <b>must</b> be at a checksum
-   *          boundary, or EOF. That is, (len + currentPosition) %
-   *          bytesPerChecksum == 0.
-   * @return the number of bytes available to read, or -1 if EOF.
-   */
-  private synchronized int fillSlowReadBuffer(int len) throws IOException {
-    int nRead;
-    if (slowReadBuff.hasRemaining()) {
-      // Already got data, good to go.
-      nRead = Math.min(len, slowReadBuff.remaining());
-    } else {
-      // Round a complete read of len bytes (plus any implicit offset) to the
-      // next chunk boundary, since we try and read in multiples of a chunk
-      int nextChunk = len + offsetFromChunkBoundary +
-          (bytesPerChecksum -
-              ((len + offsetFromChunkBoundary) % bytesPerChecksum));
-      int limit = Math.min(nextChunk, slowReadBuff.capacity());
-      assert limit % bytesPerChecksum == 0;
-
-      slowReadBuff.clear();
-      slowReadBuff.limit(limit);
-
-      nRead = doByteBufferRead(slowReadBuff);
-
-      if (nRead > 0) {
-        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
-        // false positive.
-        slowReadBuff.limit(nRead + slowReadBuff.position());
-      }
-    }
-    return nRead;
-  }
-
-  @Override
-  public synchronized int read(byte[] buf, int off, int len)
-      throws IOException {
-    LOG.trace("read off {} len {}", off, len);
-    if (!verifyChecksum) {
-      return dataIn.read(buf, off, len);
-    }
-
-    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
-
-    if (nRead > 0) {
-      // Possible that buffer is filled with a larger read than we need, since
-      // we tried to read as much as possible at once
-      nRead = Math.min(len, nRead);
-      slowReadBuff.get(buf, off, nRead);
-    }
-
-    return nRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    LOG.debug("skip {}", n);
-    if (n <= 0) {
-      return 0;
-    }
-    if (!verifyChecksum) {
-      return dataIn.skip(n);
-    }
-
-    // caller made sure newPosition is not beyond EOF.
-    int remaining = slowReadBuff.remaining();
-    int position = slowReadBuff.position();
-    int newPosition = position + (int)n;
-
-    // if the new offset is already read into dataBuff, just reposition
-    if (n <= remaining) {
-      assert offsetFromChunkBoundary == 0;
-      slowReadBuff.position(newPosition);
-      return n;
-    }
-
-    // for small gap, read through to keep the data/checksum in sync
-    if (n - remaining <= bytesPerChecksum) {
-      slowReadBuff.position(position + remaining);
-      if (skipBuf == null) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      int ret = read(skipBuf, 0, (int)(n - remaining));
-      return (remaining + ret);
-    }
-
-    // optimize for big gap: discard the current buffer, skip to
-    // the beginning of the appropriate checksum chunk and then
-    // read to the middle of that chunk to be in sync with checksums.
-
-    // We can't use this.offsetFromChunkBoundary because we need to know how
-    // many bytes of the offset were really read. Calling read(..) with a
-    // positive this.offsetFromChunkBoundary causes that many bytes to get
-    // silently skipped.
-    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
-    long toskip = n - remaining - myOffsetFromChunkBoundary;
-
-    slowReadBuff.position(slowReadBuff.limit());
-    checksumBuff.position(checksumBuff.limit());
-
-    IOUtils.skipFully(dataIn, toskip);
-    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
-    IOUtils.skipFully(checksumIn, checkSumOffset);
-
-    // read into the middle of the chunk
-    if (skipBuf == null) {
-      skipBuf = new byte[bytesPerChecksum];
-    }
-    assert skipBuf.length == bytesPerChecksum;
-    assert myOffsetFromChunkBoundary < bytesPerChecksum;
-
-    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
-
-    if (ret == -1) {  // EOS
-      return (toskip + remaining);
-    } else {
-      return (toskip + remaining + ret);
-    }
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    IOUtilsClient.cleanup(LOG, dataIn, checksumIn);
-    if (slowReadBuff != null) {
-      bufferPool.returnBuffer(slowReadBuff);
-      slowReadBuff = null;
-    }
-    if (checksumBuff != null) {
-      bufferPool.returnBuffer(checksumBuff);
-      checksumBuff = null;
-    }
-    startOffset = -1;
-    checksum = null;
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
-  }
-
-  @Override
-  public int available() {
-    // We never do network I/O in BlockReaderLocalLegacy.
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return true;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-
-  @Override
-  public DataChecksum getDataChecksum() {
-    return checksum;
-  }
-
-  @Override
-  public int getNetworkDistance() {
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
deleted file mode 100644
index 85f925f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.io.IOException;
-
-/**
- * For sharing between the local and remote block reader implementations.
- */
-@InterfaceAudience.Private
-class BlockReaderUtil {
-
-  /* See {@link BlockReader#readAll(byte[], int, int)} */
-  public static int readAll(BlockReader reader,
-      byte[] buf, int offset, int len) throws IOException {
-    int n = 0;
-    for (;;) {
-      int nread = reader.read(buf, offset + n, len - n);
-      if (nread <= 0)
-        return (n == 0) ? nread : n;
-      n += nread;
-      if (n >= len)
-        return n;
-    }
-  }
-
-  /* See {@link BlockReader#readFully(byte[], int, int)} */
-  public static void readFully(BlockReader reader,
-      byte[] buf, int off, int len) throws IOException {
-    int toRead = len;
-    while (toRead > 0) {
-      int ret = reader.read(buf, off, toRead);
-      if (ret < 0) {
-        throw new IOException("Premature EOF from inputStream");
-      }
-      toRead -= ret;
-      off += ret;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 9e67ff2..2ed0abd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
deleted file mode 100644
index 707a56a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.util.DataChecksum;
-
-/**
- * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
- * replicas.
- */
-@InterfaceAudience.Private
-public final class ExternalBlockReader implements BlockReader {
-  private final ReplicaAccessor accessor;
-  private final long visibleLength;
-  private long pos;
-
-  ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
-                      long startOffset) {
-    this.accessor = accessor;
-    this.visibleLength = visibleLength;
-    this.pos = startOffset;
-  }
-
-  @Override
-  public int read(byte[] buf, int off, int len) throws IOException {
-    int nread = accessor.read(pos, buf, off, len);
-    if (nread < 0) {
-      return nread;
-    }
-    pos += nread;
-    return nread;
-  }
-
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    int nread = accessor.read(pos, buf);
-    if (nread < 0) {
-      return nread;
-    }
-    pos += nread;
-    return nread;
-  }
-
-  @Override
-  public long skip(long n) throws IOException {
-    // You cannot skip backwards
-    if (n <= 0) {
-      return 0;
-    }
-    // You can't skip past the last offset that we want to read with this
-    // block reader.
-    long oldPos = pos;
-    pos += n;
-    if (pos > visibleLength) {
-      pos = visibleLength;
-    }
-    return pos - oldPos;
-  }
-
-  @Override
-  public int available() {
-    // We return the amount of bytes between the current offset and the visible
-    // length.  Some of the other block readers return a shorter length than
-    // that.  The only advantage to returning a shorter length is that the
-    // DFSInputStream will trash your block reader and create a new one if
-    // someone tries to seek() beyond the available() region.
-    long diff = visibleLength - pos;
-    if (diff > Integer.MAX_VALUE) {
-      return Integer.MAX_VALUE;
-    } else {
-      return (int)diff;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    accessor.close();
-  }
-
-  @Override
-  public void readFully(byte[] buf, int offset, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, offset, len);
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return accessor.isShortCircuit();
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    // For now, pluggable ReplicaAccessors do not support zero-copy.
-    return null;
-  }
-
-  @Override
-  public DataChecksum getDataChecksum() {
-    return null;
-  }
-
-  @Override
-  public int getNetworkDistance() {
-    return accessor.getNetworkDistance();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
deleted file mode 100644
index 7e094f5..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.core.TraceScope;
-import org.apache.htrace.core.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @deprecated this is an old implementation that is being left around
- * in case any issues spring up with the new {@link RemoteBlockReader2}
- * implementation.
- * It will be removed in the next release.
- */
-@InterfaceAudience.Private
-@Deprecated
-public class RemoteBlockReader extends FSInputChecker implements BlockReader {
-  static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
-
-  private final Peer peer;
-  private final DatanodeID datanodeID;
-  private final DataInputStream in;
-  private DataChecksum checksum;
-
-  /** offset in block of the last chunk received */
-  private long lastChunkOffset = -1;
-  private long lastChunkLen = -1;
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-
-  private final long blockId;
-
-  /** offset in block of of first chunk - may be less than startOffset
-   if startOffset is not chunk-aligned */
-  private final long firstChunkOffset;
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private final long bytesNeededToFinish;
-
-  private boolean eos = false;
-  private boolean sentStatusCode = false;
-
-  ByteBuffer checksumBytes = null;
-  /** Amount of unread data in the current received packet */
-  int dataLeft = 0;
-
-  private final PeerCache peerCache;
-
-  private final Tracer tracer;
-
-  private final int networkDistance;
-
-  /* FSInputChecker interface */
-
-  /* same interface as inputStream java.io.InputStream#read()
-   * used by DFSInputStream#read()
-   * This violates one rule when there is a checksum error:
-   * "Read should not modify user buffer before successful read"
-   * because it first reads the data to user buffer and then checks
-   * the checksum.
-   */
-  @Override
-  public synchronized int read(byte[] buf, int off, int len)
-      throws IOException {
-
-    // This has to be set here, *before* the skip, since we can
-    // hit EOS during the skip, in the case that our entire read
-    // is smaller than the checksum chunk.
-    boolean eosBefore = eos;
-
-    //for the first read, skip the extra bytes at the front.
-    if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-      // Skip these bytes. But don't call this.skip()!
-      int toSkip = (int)(startOffset - firstChunkOffset);
-      if ( super.readAndDiscard(toSkip) != toSkip ) {
-        // should never happen
-        throw new IOException("Could not skip required number of bytes");
-      }
-    }
-
-    int nRead = super.read(buf, off, len);
-
-    // if eos was set in the previous read, send a status code to the DN
-    if (eos && !eosBefore && nRead >= 0) {
-      if (needChecksum()) {
-        sendReadResult(peer, Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(peer, Status.SUCCESS);
-      }
-    }
-    return nRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */
-    long nSkipped = 0;
-    while (nSkipped < n) {
-      int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
-      int ret = readAndDiscard(toSkip);
-      if (ret <= 0) {
-        return nSkipped;
-      }
-      nSkipped += ret;
-    }
-    return nSkipped;
-  }
-
-  @Override
-  public int read() throws IOException {
-    throw new IOException("read() is not expected to be invoked. " +
-        "Use read(buf, off, len) instead.");
-  }
-
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    /* Checksum errors are handled outside the BlockReader.
-     * DFSInputStream does not always call 'seekToNewSource'. In the
-     * case of pread(), it just tries a different replica without seeking.
-     */
-    return false;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    throw new IOException("Seek() is not supported in BlockInputChecker");
-  }
-
-  @Override
-  protected long getChunkPosition(long pos) {
-    throw new RuntimeException("getChunkPosition() is not supported, " +
-        "since seek is not required");
-  }
-
-  /**
-   * Makes sure that checksumBytes has enough capacity
-   * and limit is set to the number of checksum bytes needed
-   * to be read.
-   */
-  private void adjustChecksumBytes(int dataLen) {
-    int requiredSize =
-        ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-    if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-      checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-    } else {
-      checksumBytes.clear();
-    }
-    checksumBytes.limit(requiredSize);
-  }
-
-  @Override
-  protected synchronized int readChunk(long pos, byte[] buf, int offset,
-      int len, byte[] checksumBuf)
-      throws IOException {
-    try (TraceScope ignored = tracer.newScope(
-        "RemoteBlockReader#readChunk(" + blockId + ")")) {
-      return readChunkImpl(pos, buf, offset, len, checksumBuf);
-    }
-  }
-
-  private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
-      int len, byte[] checksumBuf)
-      throws IOException {
-    // Read one chunk.
-    if (eos) {
-      // Already hit EOF
-      return -1;
-    }
-
-    // Read one DATA_CHUNK.
-    long chunkOffset = lastChunkOffset;
-    if ( lastChunkLen > 0 ) {
-      chunkOffset += lastChunkLen;
-    }
-
-    // pos is relative to the start of the first chunk of the read.
-    // chunkOffset is relative to the start of the block.
-    // This makes sure that the read passed from FSInputChecker is the
-    // for the same chunk we expect to be reading from the DN.
-    if ( (pos + firstChunkOffset) != chunkOffset ) {
-      throw new IOException("Mismatch in pos : " + pos + " + " +
-          firstChunkOffset + " != " + chunkOffset);
-    }
-
-    // Read next packet if the previous packet has been read completely.
-    if (dataLeft <= 0) {
-      //Read packet headers.
-      PacketHeader header = new PacketHeader();
-      header.readFields(in);
-
-      LOG.debug("DFSClient readChunk got header {}", header);
-
-      // Sanity check the lengths
-      if (!header.sanityCheck(lastSeqNo)) {
-        throw new IOException("BlockReader: error in packet header " +
-            header);
-      }
-
-      lastSeqNo = header.getSeqno();
-      dataLeft = header.getDataLen();
-      adjustChecksumBytes(header.getDataLen());
-      if (header.getDataLen() > 0) {
-        IOUtils.readFully(in, checksumBytes.array(), 0,
-            checksumBytes.limit());
-      }
-    }
-
-    // Sanity checks
-    assert len >= bytesPerChecksum;
-    assert checksum != null;
-    assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-    int checksumsToRead, bytesToRead;
-
-    if (checksumSize > 0) {
-
-      // How many chunks left in our packet - this is a ceiling
-      // since we may have a partial chunk at the end of the file
-      int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-      // How many chunks we can fit in databuffer
-      //  - note this is a floor since we always read full chunks
-      int chunksCanFit = Math.min(len / bytesPerChecksum,
-          checksumBuf.length / checksumSize);
-
-      // How many chunks should we read
-      checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-      // How many bytes should we actually read
-      bytesToRead = Math.min(
-          checksumsToRead * bytesPerChecksum, // full chunks
-          dataLeft); // in case we have a partial
-    } else {
-      // no checksum
-      bytesToRead = Math.min(dataLeft, len);
-      checksumsToRead = 0;
-    }
-
-    if ( bytesToRead > 0 ) {
-      // Assert we have enough space
-      assert bytesToRead <= len;
-      assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-      assert checksumBuf.length >= checksumSize * checksumsToRead;
-      IOUtils.readFully(in, buf, offset, bytesToRead);
-      checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
-    }
-
-    dataLeft -= bytesToRead;
-    assert dataLeft >= 0;
-
-    lastChunkOffset = chunkOffset;
-    lastChunkLen = bytesToRead;
-
-    // If there's no data left in the current packet after satisfying
-    // this read, and we have satisfied the client read, we expect
-    // an empty packet header from the DN to signify this.
-    // Note that pos + bytesToRead may in fact be greater since the
-    // DN finishes off the entire last chunk.
-    if (dataLeft == 0 &&
-        pos + bytesToRead >= bytesNeededToFinish) {
-
-      // Read header
-      PacketHeader hdr = new PacketHeader();
-      hdr.readFields(in);
-
-      if (!hdr.isLastPacketInBlock() ||
-          hdr.getDataLen() != 0) {
-        throw new IOException("Expected empty end-of-read packet! Header: " +
-            hdr);
-      }
-
-      eos = true;
-    }
-
-    if ( bytesToRead == 0 ) {
-      return -1;
-    }
-
-    return bytesToRead;
-  }
-
-  private RemoteBlockReader(String file, String bpid, long blockId,
-      DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
-      int networkDistance) {
-    // Path is used only for printing block and file information in debug
-    super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
-            ":" + bpid + ":of:"+ file)/*too non path-like?*/,
-        1, verifyChecksum,
-        checksum.getChecksumSize() > 0? checksum : null,
-        checksum.getBytesPerChecksum(),
-        checksum.getChecksumSize());
-
-    this.peer = peer;
-    this.datanodeID = datanodeID;
-    this.in = in;
-    this.checksum = checksum;
-    this.startOffset = Math.max( startOffset, 0 );
-    this.blockId = blockId;
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-    this.firstChunkOffset = firstChunkOffset;
-    lastChunkOffset = firstChunkOffset;
-    lastChunkLen = -1;
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-    this.peerCache = peerCache;
-    this.tracer = tracer;
-    this.networkDistance = networkDistance;
-  }
-
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @return New BlockReader instance, or null on error.
-   */
-  public static RemoteBlockReader newBlockReader(String file,
-      ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken,
-      long startOffset, long len,
-      int bufferSize, boolean verifyChecksum,
-      String clientName, Peer peer,
-      DatanodeID datanodeID,
-      PeerCache peerCache,
-      CachingStrategy cachingStrategy,
-      Tracer tracer, int networkDistance)
-      throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum, cachingStrategy);
-
-    //
-    // Get bytes in block, set streams
-    //
-
-    DataInputStream in = new DataInputStream(
-        new BufferedInputStream(peer.getInputStream(), bufferSize));
-
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    RemoteBlockReader2.checkSuccess(status, peer, block, file);
-    ReadOpChecksumInfoProto checksumInfo =
-        status.getReadOpChecksumInfo();
-    DataChecksum checksum = DataTransferProtoUtil.fromProto(
-        checksumInfo.getChecksum());
-    //Warning when we get CHECKSUM_NULL?
-
-    // Read the first chunk offset.
-    long firstChunkOffset = checksumInfo.getChunkOffset();
-
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-          firstChunkOffset + ") startOffset is " +
-          startOffset + " for file " + file);
-    }
-
-    return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
-        in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
-        peer, datanodeID, peerCache, tracer, networkDistance);
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null & sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-
-  @Override
-  public void readFully(byte[] buf, int readOffset, int amtToRead)
-      throws IOException {
-    IOUtils.readFully(this, buf, readOffset, amtToRead);
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return readFully(this, buf, offset, len);
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Peer peer, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-          peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
-  }
-
-  @Override
-  public int available() {
-    // An optimistic estimate of how much data is available
-    // to us without doing network I/O.
-    return RemoteBlockReader2.TCP_WINDOW_SIZE;
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return false;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-
-  @Override
-  public DataChecksum getDataChecksum() {
-    return checksum;
-  }
-
-  @Override
-  public int getNetworkDistance() {
-    return networkDistance;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
deleted file mode 100644
index 9437353..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.EnumSet;
-import java.util.UUID;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.core.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.htrace.core.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
- *
- * This is a new implementation introduced in Hadoop 0.23 which
- * is more efficient and simpler than the older BlockReader
- * implementation. It should be renamed to RemoteBlockReader
- * once we are confident in it.
- */
-@InterfaceAudience.Private
-public class RemoteBlockReader2  implements BlockReader {
-
-  static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
-  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
-
-  final private Peer peer;
-  final private DatanodeID datanodeID;
-  final private PeerCache peerCache;
-  final private long blockId;
-  private final ReadableByteChannel in;
-
-  private DataChecksum checksum;
-  private final PacketReceiver packetReceiver = new PacketReceiver(true);
-
-  private ByteBuffer curDataSlice = null;
-
-  /** offset in block of the last chunk received */
-  private long lastSeqNo = -1;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-  private final String filename;
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /**
-   * The total number of bytes we need to transfer from the DN.
-   * This is the amount that the user has requested plus some padding
-   * at the beginning so that the read can begin on a chunk boundary.
-   */
-  private long bytesNeededToFinish;
-
-  private final boolean verifyChecksum;
-
-  private boolean sentStatusCode = false;
-
-  private final Tracer tracer;
-
-  private final int networkDistance;
-
-  @VisibleForTesting
-  public Peer getPeer() {
-    return peer;
-  }
-
-  @Override
-  public synchronized int read(byte[] buf, int off, int len)
-      throws IOException {
-    UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
-    LOG.trace("Starting read #{} file {} from datanode {}",
-        randomId, filename, datanodeID.getHostName());
-
-    if (curDataSlice == null ||
-        curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-      try (TraceScope ignored = tracer.newScope(
-          "RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
-        readNextPacket();
-      }
-    }
-
-    LOG.trace("Finishing read #{}", randomId);
-
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-
-    int nRead = Math.min(curDataSlice.remaining(), len);
-    curDataSlice.get(buf, off, nRead);
-
-    return nRead;
-  }
-
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    if (curDataSlice == null ||
-        (curDataSlice.remaining() == 0 && bytesNeededToFinish > 0)) {
-      try (TraceScope ignored = tracer.newScope(
-          "RemoteBlockReader2#readNextPacket(" + blockId + ")")) {
-        readNextPacket();
-      }
-    }
-    if (curDataSlice.remaining() == 0) {
-      // we're at EOF now
-      return -1;
-    }
-
-    int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
-    ByteBuffer writeSlice = curDataSlice.duplicate();
-    writeSlice.limit(writeSlice.position() + nRead);
-    buf.put(writeSlice);
-    curDataSlice.position(writeSlice.position());
-
-    return nRead;
-  }
-
-  private void readNextPacket() throws IOException {
-    //Read packet headers.
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader curHeader = packetReceiver.getHeader();
-    curDataSlice = packetReceiver.getDataSlice();
-    assert curDataSlice.capacity() == curHeader.getDataLen();
-
-    LOG.trace("DFSClient readNextPacket got header {}", curHeader);
-
-    // Sanity check the lengths
-    if (!curHeader.sanityCheck(lastSeqNo)) {
-      throw new IOException("BlockReader: error in packet header " +
-          curHeader);
-    }
-
-    if (curHeader.getDataLen() > 0) {
-      int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
-      int checksumsLen = chunks * checksumSize;
-
-      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
-          "checksum slice capacity=" +
-              packetReceiver.getChecksumSlice().capacity() +
-              " checksumsLen=" + checksumsLen;
-
-      lastSeqNo = curHeader.getSeqno();
-      if (verifyChecksum && curDataSlice.remaining() > 0) {
-        // N.B.: the checksum error offset reported here is actually
-        // relative to the start of the block, not the start of the file.
-        // This is slightly misleading, but preserves the behavior from
-        // the older BlockReader.
-        checksum.verifyChunkedSums(curDataSlice,
-            packetReceiver.getChecksumSlice(),
-            filename, curHeader.getOffsetInBlock());
-      }
-      bytesNeededToFinish -= curHeader.getDataLen();
-    }
-
-    // First packet will include some data prior to the first byte
-    // the user requested. Skip it.
-    if (curHeader.getOffsetInBlock() < startOffset) {
-      int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
-      curDataSlice.position(newPos);
-    }
-
-    // If we've now satisfied the whole client read, read one last packet
-    // header, which should be empty
-    if (bytesNeededToFinish <= 0) {
-      readTrailingEmptyPacket();
-      if (verifyChecksum) {
-        sendReadResult(Status.CHECKSUM_OK);
-      } else {
-        sendReadResult(Status.SUCCESS);
-      }
-    }
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    /* How can we make sure we don't throw a ChecksumException, at least
-     * in majority of the cases?. This one throws. */
-    long skipped = 0;
-    while (skipped < n) {
-      long needToSkip = n - skipped;
-      if (curDataSlice == null ||
-          curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-        readNextPacket();
-      }
-      if (curDataSlice.remaining() == 0) {
-        // we're at EOF now
-        break;
-      }
-
-      int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
-      curDataSlice.position(curDataSlice.position() + skip);
-      skipped += skip;
-    }
-    return skipped;
-  }
-
-  private void readTrailingEmptyPacket() throws IOException {
-    LOG.trace("Reading empty packet at end of read");
-
-    packetReceiver.receiveNextPacket(in);
-
-    PacketHeader trailer = packetReceiver.getHeader();
-    if (!trailer.isLastPacketInBlock() ||
-        trailer.getDataLen() != 0) {
-      throw new IOException("Expected empty end-of-read packet! Header: " +
-          trailer);
-    }
-  }
-
-  protected RemoteBlockReader2(String file, long blockId,
-      DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
-      int networkDistance) {
-    // Path is used only for printing block and file information in debug
-    this.peer = peer;
-    this.datanodeID = datanodeID;
-    this.in = peer.getInputStreamChannel();
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max( startOffset, 0 );
-    this.filename = file;
-    this.peerCache = peerCache;
-    this.blockId = blockId;
-
-    // The total number of bytes that we need to transfer from the DN is
-    // the amount that the user wants (bytesToRead), plus the padding at
-    // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read starts/ends mid-chunk.
-    this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-    this.tracer = tracer;
-    this.networkDistance = networkDistance;
-  }
-
-
-  @Override
-  public synchronized void close() throws IOException {
-    packetReceiver.close();
-    startOffset = -1;
-    checksum = null;
-    if (peerCache != null && sentStatusCode) {
-      peerCache.put(datanodeID, peer);
-    } else {
-      peer.close();
-    }
-
-    // in will be closed when its Socket is closed.
-  }
-
-  /**
-   * When the reader reaches end of the read, it sends a status response
-   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
-   * closing our connection (which we will re-open), but won't affect
-   * data correctness.
-   */
-  void sendReadResult(Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + peer;
-    try {
-      writeReadResult(peer.getOutputStream(), statusCode);
-      sentStatusCode = true;
-    } catch (IOException e) {
-      // It's ok not to be able to send this. But something is probably wrong.
-      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-          peer.getRemoteAddressString() + ": " + e.getMessage());
-    }
-  }
-
-  /**
-   * Serialize the actual read result on the wire.
-   */
-  static void writeReadResult(OutputStream out, Status statusCode)
-      throws IOException {
-
-    ClientReadStatusProto.newBuilder()
-        .setStatus(statusCode)
-        .build()
-        .writeDelimitedTo(out);
-
-    out.flush();
-  }
-
-  /**
-   * File name to print when accessing a block directly (from servlets)
-   * @param s Address of the block location
-   * @param poolId Block pool ID of the block
-   * @param blockId Block ID of the block
-   * @return string that has a file name for debug purposes
-   */
-  public static String getFileName(final InetSocketAddress s,
-      final String poolId, final long blockId) {
-    return s.toString() + ":" + poolId + ":" + blockId;
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
-  }
-
-  /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   *
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @param peer  The Peer to use
-   * @param datanodeID  The DatanodeID this peer is connected to
-   * @return New BlockReader instance, or null on error.
-   */
-  public static BlockReader newBlockReader(String file,
-      ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken,
-      long startOffset, long len,
-      boolean verifyChecksum,
-      String clientName,
-      Peer peer, DatanodeID datanodeID,
-      PeerCache peerCache,
-      CachingStrategy cachingStrategy,
-      Tracer tracer,
-      int networkDistance) throws IOException {
-    // in and out will be closed when sock is closed (by the caller)
-    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-        peer.getOutputStream()));
-    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum, cachingStrategy);
-
-    //
-    // Get bytes in block
-    //
-    DataInputStream in = new DataInputStream(peer.getInputStream());
-
-    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
-        PBHelperClient.vintPrefixed(in));
-    checkSuccess(status, peer, block, file);
-    ReadOpChecksumInfoProto checksumInfo =
-        status.getReadOpChecksumInfo();
-    DataChecksum checksum = DataTransferProtoUtil.fromProto(
-        checksumInfo.getChecksum());
-    //Warning when we get CHECKSUM_NULL?
-
-    // Read the first chunk offset.
-    long firstChunkOffset = checksumInfo.getChunkOffset();
-
-    if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-        firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
-      throw new IOException("BlockReader: error in first chunk offset (" +
-          firstChunkOffset + ") startOffset is " +
-          startOffset + " for file " + file);
-    }
-
-    return new RemoteBlockReader2(file, block.getBlockId(), checksum,
-        verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
-        peerCache, tracer, networkDistance);
-  }
-
-  static void checkSuccess(
-      BlockOpResponseProto status, Peer peer,
-      ExtendedBlock block, String file)
-      throws IOException {
-    String logInfo = "for OP_READ_BLOCK"
-        + ", self=" + peer.getLocalAddressString()
-        + ", remote=" + peer.getRemoteAddressString()
-        + ", for file " + file
-        + ", for pool " + block.getBlockPoolId()
-        + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
-    DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
-  }
-
-  @Override
-  public int available() {
-    // An optimistic estimate of how much data is available
-    // to us without doing network I/O.
-    return TCP_WINDOW_SIZE;
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return false;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-
-  @Override
-  public DataChecksum getDataChecksum() {
-    return checksum;
-  }
-
-  @Override
-  public int getNetworkDistance() {
-    return networkDistance;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message