hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [2/4] hadoop git commit: HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu.
Date Fri, 28 Aug 2015 21:38:41 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
deleted file mode 100644
index c16ffdf..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DirectBufferPool;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-/**
- * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
- * the same machine as the datanode, then the client can read files directly
- * from the local file system rather than going through the datanode for better
- * performance. <br>
- *
- * This is the legacy implementation based on HDFS-2246, which requires
- * permissions on the datanode to be set so that clients can directly access the
- * blocks. The new implementation based on HDFS-347 should be preferred on UNIX
- * systems where the required native code has been implemented.<br>
- *
- * {@link BlockReaderLocalLegacy} works as follows:
- * <ul>
- * <li>The client performing short circuit reads must be configured at the
- * datanode.</li>
- * <li>The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call</li>
- * <li>Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.</li>
- * </ul>
- */
-@InterfaceAudience.Private
-class BlockReaderLocalLegacy implements BlockReader {
-  private static final Log LOG = LogFactory.getLog(BlockReaderLocalLegacy.class);
-
-  //Stores the cache and proxy for a local datanode.
-  private static class LocalDatanodeInfo {
-    private ClientDatanodeProtocol proxy = null;
-    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
-
-    LocalDatanodeInfo() {
-      final int cacheSize = 10000;
-      final float hashTableLoadFactor = 0.75f;
-      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
-      cache = Collections
-          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
-              hashTableCapacity, hashTableLoadFactor, true) {
-            private static final long serialVersionUID = 1;
-
-            @Override
-            protected boolean removeEldestEntry(
-                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
-              return size() > cacheSize;
-            }
-          });
-    }
-
-    private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        UserGroupInformation ugi, final DatanodeInfo node,
-        final Configuration conf, final int socketTimeout,
-        final boolean connectToDnViaHostname) throws IOException {
-      if (proxy == null) {
-        try {
-          proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(node, conf,
-                  socketTimeout, connectToDnViaHostname);
-            }
-          });
-        } catch (InterruptedException e) {
-          LOG.warn("encountered exception ", e);
-        }
-      }
-      return proxy;
-    }
-    
-    private synchronized void resetDatanodeProxy() {
-      if (null != proxy) {
-        RPC.stopProxy(proxy);
-        proxy = null;
-      }
-    }
-
-    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
-      return cache.get(b);
-    }
-
-    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
-      cache.put(b, info);
-    }
-
-    private void removeBlockLocalPathInfo(ExtendedBlock b) {
-      cache.remove(b);
-    }
-  }
-  
-  // Multiple datanodes could be running on the local machine. Store proxies in
-  // a map keyed by the ipc port of the datanode.
-  private static final Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
-
-  private final FileInputStream dataIn; // reader for the data file
-  private final FileInputStream checksumIn;   // reader for the checksum file
-
-  /**
-   * Offset from the most recent chunk boundary at which the next read should
-   * take place. Is only set to non-zero at construction time, and is
-   * decremented (usually to 0) by subsequent reads. This avoids having to do a
-   * checksum read at construction to position the read cursor correctly.
-   */
-  private int offsetFromChunkBoundary;
-  
-  private byte[] skipBuf = null;
-
-  /**
-   * Used for checksummed reads that need to be staged before copying to their
-   * output buffer because they are either a) smaller than the checksum chunk
-   * size or b) issued by the slower read(byte[]...) path
-   */
-  private ByteBuffer slowReadBuff = null;
-  private ByteBuffer checksumBuff = null;
-  private DataChecksum checksum;
-  private final boolean verifyChecksum;
-
-  private static final DirectBufferPool bufferPool = new DirectBufferPool();
-
-  private final int bytesPerChecksum;
-  private final int checksumSize;
-
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
-  private final String filename;
-  private long blockId;
-  
-  /**
-   * The only way this object can be instantiated.
-   */
-  static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
-      UserGroupInformation userGroupInformation,
-      Configuration configuration, String file, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> token, DatanodeInfo node, 
-      long startOffset, long length, StorageType storageType)
-      throws IOException {
-    final ShortCircuitConf scConf = conf.getShortCircuitConf();
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
-        .getIpcPort());
-    // check the cache first
-    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
-    if (pathinfo == null) {
-      if (userGroupInformation == null) {
-        userGroupInformation = UserGroupInformation.getCurrentUser();
-      }
-      pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
-          configuration, conf.getSocketTimeout(), token,
-          conf.isConnectToDnViaHostname(), storageType);
-    }
-
-    // check to see if the file exists. It may so happen that the
-    // HDFS file has been deleted and this block-lookup is occurring
-    // on behalf of a new HDFS file. This time, the block file could
-    // be residing in a different portion of the fs.data.dir directory.
-    // In this case, we remove this entry from the cache. The next
-    // call to this method will re-populate the cache.
-    FileInputStream dataIn = null;
-    FileInputStream checksumIn = null;
-    BlockReaderLocalLegacy localBlockReader = null;
-    final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
-        || storageType.isTransient();
-    try {
-      // get a local file system
-      File blkfile = new File(pathinfo.getBlockPath());
-      dataIn = new FileInputStream(blkfile);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
-            + blkfile.length() + " startOffset " + startOffset + " length "
-            + length + " short circuit checksum " + !skipChecksumCheck);
-      }
-
-      if (!skipChecksumCheck) {
-        // get the metadata file
-        File metafile = new File(pathinfo.getMetaPath());
-        checksumIn = new FileInputStream(metafile);
-
-        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
-            new DataInputStream(checksumIn), blk);
-        long firstChunkOffset = startOffset
-            - (startOffset % checksum.getBytesPerChecksum());
-        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
-            startOffset, length, pathinfo, checksum, true, dataIn,
-            firstChunkOffset, checksumIn);
-      } else {
-        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
-            startOffset, length, pathinfo, dataIn);
-      }
-    } catch (IOException e) {
-      // remove from cache
-      localDatanodeInfo.removeBlockLocalPathInfo(blk);
-      DFSClient.LOG.warn("BlockReaderLocalLegacy: Removing " + blk
-          + " from cache because local file " + pathinfo.getBlockPath()
-          + " could not be opened.");
-      throw e;
-    } finally {
-      if (localBlockReader == null) {
-        if (dataIn != null) {
-          dataIn.close();
-        }
-        if (checksumIn != null) {
-          checksumIn.close();
-        }
-      }
-    }
-    return localBlockReader;
-  }
-  
-  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
-    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
-    if (ldInfo == null) {
-      ldInfo = new LocalDatanodeInfo();
-      localDatanodeInfoMap.put(port, ldInfo);
-    }
-    return ldInfo;
-  }
-  
-  private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
-      ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
-      StorageType storageType) throws IOException {
-    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
-    BlockLocalPathInfo pathinfo = null;
-    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
-        conf, timeout, connectToDnViaHostname);
-    try {
-      // make RPC to local datanode to find local pathnames of blocks
-      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
-      // We cannot cache the path information for a replica on transient storage.
-      // If the replica gets evicted, then it moves to a different path.  Then,
-      // our next attempt to read from the cached path would fail to find the
-      // file.  Additionally, the failure would cause us to disable legacy
-      // short-circuit read for all subsequent use in the ClientContext.  Unlike
-      // the newer short-circuit read implementation, we have no communication
-      // channel for the DataNode to notify the client that the path has been
-      // invalidated.  Therefore, our only option is to skip caching.
-      if (pathinfo != null && !storageType.isTransient()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
-        }
-        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
-      }
-    } catch (IOException e) {
-      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
-      throw e;
-    }
-    return pathinfo;
-  }
-  
-  private static int getSlowReadBufferNumChunks(int bufferSizeBytes,
-      int bytesPerChecksum) {
-    if (bufferSizeBytes < bytesPerChecksum) {
-      throw new IllegalArgumentException("Configured BlockReaderLocalLegacy " +
-          "buffer size (" + bufferSizeBytes + ") is not large enough to hold " +
-          "a single chunk (" + bytesPerChecksum +  "). Please configure " +
-          HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY +
-          " appropriately");
-    }
-
-    // Round down to nearest chunk size
-    return bufferSizeBytes / bytesPerChecksum;
-  }
-
-  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
-      throws IOException {
-    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
-        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
-        dataIn, startOffset, null);
-  }
-
-  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
-      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
-      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
-      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
-      FileInputStream checksumIn) throws IOException {
-    this.filename = hdfsfile;
-    this.checksum = checksum;
-    this.verifyChecksum = verifyChecksum;
-    this.startOffset = Math.max(startOffset, 0);
-    this.blockId = block.getBlockId();
-
-    bytesPerChecksum = this.checksum.getBytesPerChecksum();
-    checksumSize = this.checksum.getChecksumSize();
-
-    this.dataIn = dataIn;
-    this.checksumIn = checksumIn;
-    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-
-    final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
-        conf.getShortCircuitBufferSize(), bytesPerChecksum);
-    slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
-    checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
-    // Initially the buffers have nothing to read.
-    slowReadBuff.flip();
-    checksumBuff.flip();
-    boolean success = false;
-    try {
-      // Skip both input streams to beginning of the chunk containing startOffset
-      IOUtils.skipFully(dataIn, firstChunkOffset);
-      if (checksumIn != null) {
-        long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
-        IOUtils.skipFully(checksumIn, checkSumOffset);
-      }
-      success = true;
-    } finally {
-      if (!success) {
-        bufferPool.returnBuffer(slowReadBuff);
-        bufferPool.returnBuffer(checksumBuff);
-      }
-    }
-  }
-
-  /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached
-   */
-  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
-      throws IOException {
-    TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
-        blockId + ")", Sampler.NEVER);
-    try {
-      int bytesRead = stream.getChannel().read(buf);
-      if (bytesRead < 0) {
-        //EOF
-        return bytesRead;
-      }
-      while (buf.remaining() > 0) {
-        int n = stream.getChannel().read(buf);
-        if (n < 0) {
-          //EOF
-          return bytesRead;
-        }
-        bytesRead += n;
-      }
-      return bytesRead;
-    } finally {
-      scope.close();
-    }
-  }
-  
-  /**
-   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
-   * another.
-   */
-  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
-    int oldLimit = from.limit();
-    from.limit(from.position() + length);
-    try {
-      to.put(from);
-    } finally {
-      from.limit(oldLimit);
-    }
-  }
-
-  @Override
-  public synchronized int read(ByteBuffer buf) throws IOException {
-    int nRead = 0;
-    if (verifyChecksum) {
-      // A 'direct' read actually has three phases. The first drains any
-      // remaining bytes from the slow read buffer. After this the read is
-      // guaranteed to be on a checksum chunk boundary. If there are still bytes
-      // to read, the fast direct path is used for as many remaining bytes as
-      // possible, up to a multiple of the checksum chunk size. Finally, any
-      // 'odd' bytes remaining at the end of the read cause another slow read to
-      // be issued, which involves an extra copy.
-
-      // Every 'slow' read tries to fill the slow read buffer in one go for
-      // efficiency's sake. As described above, all non-checksum-chunk-aligned
-      // reads will be served from the slower read path.
-
-      if (slowReadBuff.hasRemaining()) {
-        // There are remaining bytes from a small read available. This usually
-        // means this read is unaligned, which falls back to the slow path.
-        int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
-        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-        nRead += fromSlowReadBuff;
-      }
-
-      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
-        // Since we have drained the 'small read' buffer, we are guaranteed to
-        // be chunk-aligned
-        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
-
-        // There's only enough checksum buffer space available to checksum one
-        // entire slow read buffer. This saves keeping the number of checksum
-        // chunks around.
-        len = Math.min(len, slowReadBuff.capacity());
-        int oldlimit = buf.limit();
-        buf.limit(buf.position() + len);
-        int readResult = 0;
-        try {
-          readResult = doByteBufferRead(buf);
-        } finally {
-          buf.limit(oldlimit);
-        }
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          nRead += readResult;
-          buf.position(buf.position() + readResult);
-        }
-      }
-
-      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
-      // until chunk boundary
-      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
-        int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
-        int readResult = fillSlowReadBuffer(toRead);
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
-          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-          nRead += fromSlowReadBuff;
-        }
-      }
-    } else {
-      // Non-checksummed reads are much easier; we can just fill the buffer directly.
-      nRead = doByteBufferRead(buf);
-      if (nRead > 0) {
-        buf.position(buf.position() + nRead);
-      }
-    }
-    return nRead;
-  }
-
-  /**
-   * Tries to read as many bytes as possible into supplied buffer, checksumming
-   * each chunk if needed.
-   *
-   * <b>Preconditions:</b>
-   * <ul>
-   * <li>
-   * If checksumming is enabled, buf.remaining must be a multiple of
-   * bytesPerChecksum. Note that this is not a requirement for clients of
-   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
-   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
-   * method.
-   * </li>
-   * </ul>
-   * <b>Postconditions:</b>
-   * <ul>
-   * <li>buf.limit and buf.mark are unchanged.</li>
-   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
-   * requested bytes can be read straight from the buffer</li>
-   * </ul>
-   *
-   * @param buf
-   *          byte buffer to write bytes to. If checksums are not required, buf
-   *          can have any number of bytes remaining, otherwise there must be a
-   *          multiple of the checksum chunk size remaining.
-   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
-   *         that is, the the number of useful bytes (up to the amount
-   *         requested) readable from the buffer by the client.
-   */
-  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
-    if (verifyChecksum) {
-      assert buf.remaining() % bytesPerChecksum == 0;
-    }
-    int dataRead = -1;
-
-    int oldpos = buf.position();
-    // Read as much as we can into the buffer.
-    dataRead = fillBuffer(dataIn, buf);
-
-    if (dataRead == -1) {
-      return -1;
-    }
-
-    if (verifyChecksum) {
-      ByteBuffer toChecksum = buf.duplicate();
-      toChecksum.position(oldpos);
-      toChecksum.limit(oldpos + dataRead);
-
-      checksumBuff.clear();
-      // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
-      int numChunks =
-        (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
-      checksumBuff.limit(checksumSize * numChunks);
-
-      fillBuffer(checksumIn, checksumBuff);
-      checksumBuff.flip();
-
-      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
-          this.startOffset);
-    }
-
-    if (dataRead >= 0) {
-      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
-    }
-
-    if (dataRead < offsetFromChunkBoundary) {
-      // yikes, didn't even get enough bytes to honour offset. This can happen
-      // even if we are verifying checksums if we are at EOF.
-      offsetFromChunkBoundary -= dataRead;
-      dataRead = 0;
-    } else {
-      dataRead -= offsetFromChunkBoundary;
-      offsetFromChunkBoundary = 0;
-    }
-
-    return dataRead;
-  }
-
-  /**
-   * Ensures that up to len bytes are available and checksummed in the slow read
-   * buffer. The number of bytes available to read is returned. If the buffer is
-   * not already empty, the number of remaining bytes is returned and no actual
-   * read happens.
-   *
-   * @param len
-   *          the maximum number of bytes to make available. After len bytes
-   *          are read, the underlying bytestream <b>must</b> be at a checksum
-   *          boundary, or EOF. That is, (len + currentPosition) %
-   *          bytesPerChecksum == 0.
-   * @return the number of bytes available to read, or -1 if EOF.
-   */
-  private synchronized int fillSlowReadBuffer(int len) throws IOException {
-    int nRead = -1;
-    if (slowReadBuff.hasRemaining()) {
-      // Already got data, good to go.
-      nRead = Math.min(len, slowReadBuff.remaining());
-    } else {
-      // Round a complete read of len bytes (plus any implicit offset) to the
-      // next chunk boundary, since we try and read in multiples of a chunk
-      int nextChunk = len + offsetFromChunkBoundary +
-          (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
-      int limit = Math.min(nextChunk, slowReadBuff.capacity());
-      assert limit % bytesPerChecksum == 0;
-
-      slowReadBuff.clear();
-      slowReadBuff.limit(limit);
-
-      nRead = doByteBufferRead(slowReadBuff);
-
-      if (nRead > 0) {
-        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
-        // false positive.
-        slowReadBuff.limit(nRead + slowReadBuff.position());
-      }
-    }
-    return nRead;
-  }
-
-  @Override
-  public synchronized int read(byte[] buf, int off, int len) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("read off " + off + " len " + len);
-    }
-    if (!verifyChecksum) {
-      return dataIn.read(buf, off, len);
-    }
-
-    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
-
-    if (nRead > 0) {
-      // Possible that buffer is filled with a larger read than we need, since
-      // we tried to read as much as possible at once
-      nRead = Math.min(len, nRead);
-      slowReadBuff.get(buf, off, nRead);
-    }
-
-    return nRead;
-  }
-
-  @Override
-  public synchronized long skip(long n) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("skip " + n);
-    }
-    if (n <= 0) {
-      return 0;
-    }
-    if (!verifyChecksum) {
-      return dataIn.skip(n);
-    }
-  
-    // caller made sure newPosition is not beyond EOF.
-    int remaining = slowReadBuff.remaining();
-    int position = slowReadBuff.position();
-    int newPosition = position + (int)n;
-  
-    // if the new offset is already read into dataBuff, just reposition
-    if (n <= remaining) {
-      assert offsetFromChunkBoundary == 0;
-      slowReadBuff.position(newPosition);
-      return n;
-    }
-  
-    // for small gap, read through to keep the data/checksum in sync
-    if (n - remaining <= bytesPerChecksum) {
-      slowReadBuff.position(position + remaining);
-      if (skipBuf == null) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      int ret = read(skipBuf, 0, (int)(n - remaining));
-      return (remaining + ret);
-    }
-  
-    // optimize for big gap: discard the current buffer, skip to
-    // the beginning of the appropriate checksum chunk and then
-    // read to the middle of that chunk to be in sync with checksums.
-  
-    // We can't use this.offsetFromChunkBoundary because we need to know how
-    // many bytes of the offset were really read. Calling read(..) with a
-    // positive this.offsetFromChunkBoundary causes that many bytes to get
-    // silently skipped.
-    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
-    long toskip = n - remaining - myOffsetFromChunkBoundary;
-
-    slowReadBuff.position(slowReadBuff.limit());
-    checksumBuff.position(checksumBuff.limit());
-  
-    IOUtils.skipFully(dataIn, toskip);
-    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
-    IOUtils.skipFully(checksumIn, checkSumOffset);
-
-    // read into the middle of the chunk
-    if (skipBuf == null) {
-      skipBuf = new byte[bytesPerChecksum];
-    }
-    assert skipBuf.length == bytesPerChecksum;
-    assert myOffsetFromChunkBoundary < bytesPerChecksum;
-
-    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
-
-    if (ret == -1) {  // EOS
-      return (toskip + remaining);
-    } else {
-      return (toskip + remaining + ret);
-    }
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    IOUtils.cleanup(LOG, dataIn, checksumIn);
-    if (slowReadBuff != null) {
-      bufferPool.returnBuffer(slowReadBuff);
-      slowReadBuff = null;
-    }
-    if (checksumBuff != null) {
-      bufferPool.returnBuffer(checksumBuff);
-      checksumBuff = null;
-    }
-    startOffset = -1;
-    checksum = null;
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
-  }
-
-  @Override
-  public int available() throws IOException {
-    // We never do network I/O in BlockReaderLocalLegacy.
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public boolean isLocal() {
-    return true;
-  }
-  
-  @Override
-  public boolean isShortCircuit() {
-    return true;
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
deleted file mode 100644
index dbc528e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.io.IOException;
-
-/**
- * For sharing between the local and remote block reader implementations.
- */
-@InterfaceAudience.Private
-class BlockReaderUtil {
-
-  /* See {@link BlockReader#readAll(byte[], int, int)} */
-  public static int readAll(BlockReader reader,
-      byte[] buf, int offset, int len) throws IOException {
-    int n = 0;
-    for (;;) {
-      int nread = reader.read(buf, offset + n, len - n);
-      if (nread <= 0)
-        return (n == 0) ? nread : n;
-      n += nread;
-      if (n >= len)
-        return n;
-    }
-  }
-
-  /* See {@link BlockReader#readFully(byte[], int, int)} */
-  public static void readFully(BlockReader reader,
-      byte[] buf, int off, int len) throws IOException {
-    int toRead = len;
-    while (toRead > 0) {
-      int ret = reader.read(buf, off, toRead);
-      if (ret < 0) {
-        throw new IOException("Premature EOF from inputStream");
-      }
-      toRead -= ret;
-      off += ret;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
deleted file mode 100644
index bf11463..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.util.HashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * ClientContext contains context information for a client.
- * 
- * This allows us to share caches such as the socket cache across
- * DFSClient instances.
- */
-@InterfaceAudience.Private
-public class ClientContext {
-  private static final Log LOG = LogFactory.getLog(ClientContext.class);
-
-  /**
-   * Global map of context names to caches contexts.
-   */
-  private final static HashMap<String, ClientContext> CACHES =
-      new HashMap<String, ClientContext>();
-
-  /**
-   * Name of context.
-   */
-  private final String name;
-
-  /**
-   * String representation of the configuration.
-   */
-  private final String confString;
-
-  /**
-   * Caches short-circuit file descriptors, mmap regions.
-   */
-  private final ShortCircuitCache shortCircuitCache;
-
-  /**
-   * Caches TCP and UNIX domain sockets for reuse.
-   */
-  private final PeerCache peerCache;
-
-  /**
-   * Stores information about socket paths.
-   */
-  private final DomainSocketFactory domainSocketFactory;
-
-  /**
-   * Caches key Providers for the DFSClient
-   */
-  private final KeyProviderCache keyProviderCache;
-  /**
-   * True if we should use the legacy BlockReaderLocal.
-   */
-  private final boolean useLegacyBlockReaderLocal;
-
-  /**
-   * True if the legacy BlockReaderLocal is disabled.
-   *
-   * The legacy block reader local gets disabled completely whenever there is an
-   * error or miscommunication.  The new block reader local code handles this
-   * case more gracefully inside DomainSocketFactory.
-   */
-  private volatile boolean disableLegacyBlockReaderLocal = false;
-
-  /** Creating byte[] for {@link DFSOutputStream}. */
-  private final ByteArrayManager byteArrayManager;  
-
-  /**
-   * Whether or not we complained about a DFSClient fetching a CacheContext that
-   * didn't match its config values yet.
-   */
-  private boolean printedConfWarning = false;
-
-  private ClientContext(String name, DfsClientConf conf) {
-    final ShortCircuitConf scConf = conf.getShortCircuitConf();
-
-    this.name = name;
-    this.confString = scConf.confAsString();
-    this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
-    this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
-        scConf.getSocketCacheExpiry());
-    this.keyProviderCache = new KeyProviderCache(
-        scConf.getKeyProviderCacheExpiryMs());
-    this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal();
-    this.domainSocketFactory = new DomainSocketFactory(scConf);
-
-    this.byteArrayManager = ByteArrayManager.newInstance(
-        conf.getWriteByteArrayManagerConf());
-  }
-
-  public static ClientContext get(String name, DfsClientConf conf) {
-    ClientContext context;
-    synchronized(ClientContext.class) {
-      context = CACHES.get(name);
-      if (context == null) {
-        context = new ClientContext(name, conf);
-        CACHES.put(name, context);
-      } else {
-        context.printConfWarningIfNeeded(conf);
-      }
-    }
-    return context;
-  }
-
-  /**
-   * Get a client context, from a Configuration object.
-   *
-   * This method is less efficient than the version which takes a DFSClient#Conf
-   * object, and should be mostly used by tests.
-   */
-  @VisibleForTesting
-  public static ClientContext getFromConf(Configuration conf) {
-    return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
-        HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
-            new DfsClientConf(conf));
-  }
-
-  private void printConfWarningIfNeeded(DfsClientConf conf) {
-    String existing = this.getConfString();
-    String requested = conf.getShortCircuitConf().confAsString();
-    if (!existing.equals(requested)) {
-      if (!printedConfWarning) {
-        printedConfWarning = true;
-        LOG.warn("Existing client context '" + name + "' does not match " +
-            "requested configuration.  Existing: " + existing + 
-            ", Requested: " + requested);
-      }
-    }
-  }
-
-  public String getConfString() {
-    return confString;
-  }
-
-  public ShortCircuitCache getShortCircuitCache() {
-    return shortCircuitCache;
-  }
-
-  public PeerCache getPeerCache() {
-    return peerCache;
-  }
-
-  public KeyProviderCache getKeyProviderCache() {
-    return keyProviderCache;
-  }
-
-  public boolean getUseLegacyBlockReaderLocal() {
-    return useLegacyBlockReaderLocal;
-  }
-
-  public boolean getDisableLegacyBlockReaderLocal() {
-    return disableLegacyBlockReaderLocal;
-  }
-
-  public void setDisableLegacyBlockReaderLocal() {
-    disableLegacyBlockReaderLocal = true;
-  }
-
-  public DomainSocketFactory getDomainSocketFactory() {
-    return domainSocketFactory;
-  }
-
-  public ByteArrayManager getByteArrayManager() {
-    return byteArrayManager;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 57a5aed..6420b55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -80,8 +80,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
   public static final String  DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves";
   public static final int     DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5;
-  public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
-  public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
+  @Deprecated
+  public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY =
+      HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;
+  @Deprecated
+  public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT =
+      HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
   public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
   public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
   public static final String  DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
@@ -505,7 +509,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_WEB_UGI_KEY = "dfs.web.ugi";
   public static final String  DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup";
   public static final String  DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file";
-  public static final String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal";
+  public static final String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
+      HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
   @Deprecated
   public static final String  DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
   public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths";
@@ -604,7 +609,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
   public static final int    DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
-  public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
+  public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI =
+      HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI;
 
   // Journal-node related configs. These are read on the JN side.
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 7f3722f..139a27c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -364,7 +364,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       ClientDatanodeProtocol cdp = null;
       
       try {
-        cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode,
+        cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
             dfsClient.getConfiguration(), conf.getSocketTimeout(),
             conf.isConnectToDnViaHostname(), locatedblock);
         

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index cae56c0..5c8a700 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -53,8 +53,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 
-import javax.net.SocketFactory;
-
 import com.google.common.collect.Sets;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -69,16 +67,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
-import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.http.HttpConfig;
@@ -932,29 +925,6 @@ public class DFSUtil {
   public static int roundBytesToGB(long bytes) {
     return Math.round((float)bytes/ 1024 / 1024 / 1024);
   }
-  
-  /** Create a {@link ClientDatanodeProtocol} proxy */
-  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
-      DatanodeID datanodeid, Configuration conf, int socketTimeout,
-      boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
-    return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
-        connectToDnViaHostname, locatedBlock);
-  }
-  
-  /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
-  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
-      DatanodeID datanodeid, Configuration conf, int socketTimeout,
-      boolean connectToDnViaHostname) throws IOException {
-    return new ClientDatanodeProtocolTranslatorPB(
-        datanodeid, conf, socketTimeout, connectToDnViaHostname);
-  }
-  
-  /** Create a {@link ClientDatanodeProtocol} proxy */
-  public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
-      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
-      SocketFactory factory) throws IOException {
-    return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
-  }
 
   /**
    * Get nameservice Id for the {@link NameNode} based on namenode RPC address
@@ -1450,41 +1420,6 @@ public class DFSUtil {
   }
 
   /**
-   * Creates a new KeyProvider from the given Configuration.
-   *
-   * @param conf Configuration
-   * @return new KeyProvider, or null if no provider was found.
-   * @throws IOException if the KeyProvider is improperly specified in
-   *                             the Configuration
-   */
-  public static KeyProvider createKeyProvider(
-      final Configuration conf) throws IOException {
-    final String providerUriStr =
-        conf.getTrimmed(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
-    // No provider set in conf
-    if (providerUriStr.isEmpty()) {
-      return null;
-    }
-    final URI providerUri;
-    try {
-      providerUri = new URI(providerUriStr);
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-    KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
-    if (keyProvider == null) {
-      throw new IOException("Could not instantiate KeyProvider from " + 
-          DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '" + 
-          providerUriStr +"'");
-    }
-    if (keyProvider.isTransient()) {
-      throw new IOException("KeyProvider " + keyProvider.toString()
-          + " was found but it is a transient provider.");
-    }
-    return keyProvider;
-  }
-
-  /**
    * Creates a new KeyProviderCryptoExtension by wrapping the
    * KeyProvider specified in the given Configuration.
    *
@@ -1495,7 +1430,7 @@ public class DFSUtil {
    */
   public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
       final Configuration conf) throws IOException {
-    KeyProvider keyProvider = createKeyProvider(conf);
+    KeyProvider keyProvider = DFSUtilClient.createKeyProvider(conf);
     if (keyProvider == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
deleted file mode 100644
index e135d8e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-
-/**
- * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
- * replicas.
- */
-@InterfaceAudience.Private
-public final class ExternalBlockReader implements BlockReader {
-  private final ReplicaAccessor accessor;
-  private final long visibleLength;
-  private long pos;
-
-  ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
-                      long startOffset) {
-    this.accessor = accessor;
-    this.visibleLength = visibleLength;
-    this.pos = startOffset;
-  }
-
-  @Override
-  public int read(byte[] buf, int off, int len) throws IOException {
-    int nread = accessor.read(pos, buf, off, len);
-    pos += nread;
-    return nread;
-  }
-
-  @Override
-  public int read(ByteBuffer buf) throws IOException {
-    int nread = accessor.read(pos, buf);
-    pos += nread;
-    return nread;
-  }
-
-  @Override
-  public long skip(long n) throws IOException {
-    // You cannot skip backwards
-    if (n <= 0) {
-      return 0;
-    }
-    // You can't skip past the end of the replica.
-    long oldPos = pos;
-    pos += n;
-    if (pos > visibleLength) {
-      pos = visibleLength;
-    }
-    return pos - oldPos;
-  }
-
-  @Override
-  public int available() throws IOException {
-    // We return the amount of bytes that we haven't read yet from the
-    // replica, based on our current position.  Some of the other block
-    // readers return a shorter length than that.  The only advantage to
-    // returning a shorter length is that the DFSInputStream will
-    // trash your block reader and create a new one if someone tries to
-    // seek() beyond the available() region.
-    long diff = visibleLength - pos;
-    if (diff > Integer.MAX_VALUE) {
-      return Integer.MAX_VALUE;
-    } else {
-      return (int)diff;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    accessor.close();
-  }
-
-  @Override
-  public void readFully(byte[] buf, int offset, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, offset, len);
-  }
-
-  @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public boolean isLocal() {
-    return accessor.isLocal();
-  }
-
-  @Override
-  public boolean isShortCircuit() {
-    return accessor.isShortCircuit();
-  }
-
-  @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    // For now, pluggable ReplicaAccessors do not support zero-copy.
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
deleted file mode 100644
index a2b6c7e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/KeyProviderCache.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.key.KeyProvider;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-@InterfaceAudience.Private
-public class KeyProviderCache {
-
-  public static final Log LOG = LogFactory.getLog(KeyProviderCache.class);
-
-  private final Cache<URI, KeyProvider> cache;
-
-  public KeyProviderCache(long expiryMs) {
-    cache = CacheBuilder.newBuilder()
-        .expireAfterAccess(expiryMs, TimeUnit.MILLISECONDS)
-        .removalListener(new RemovalListener<URI, KeyProvider>() {
-          @Override
-          public void onRemoval(
-              RemovalNotification<URI, KeyProvider> notification) {
-            try {
-              notification.getValue().close();
-            } catch (Throwable e) {
-              LOG.error(
-                  "Error closing KeyProvider with uri ["
-                      + notification.getKey() + "]", e);
-              ;
-            }
-          }
-        })
-        .build();
-  }
-
-  public KeyProvider get(final Configuration conf) {
-    URI kpURI = createKeyProviderURI(conf);
-    if (kpURI == null) {
-      return null;
-    }
-    try {
-      return cache.get(kpURI, new Callable<KeyProvider>() {
-        @Override
-        public KeyProvider call() throws Exception {
-          return DFSUtil.createKeyProvider(conf);
-        }
-      });
-    } catch (Exception e) {
-      LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause());
-      return null;
-    }
-  }
-
-  private URI createKeyProviderURI(Configuration conf) {
-    final String providerUriStr =
-        conf.getTrimmed(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
-    // No provider set in conf
-    if (providerUriStr.isEmpty()) {
-      LOG.error("Could not find uri with key ["
-          + DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI
-          + "] to create a keyProvider !!");
-      return null;
-    }
-    final URI providerUri;
-    try {
-      providerUri = new URI(providerUriStr);
-    } catch (URISyntaxException e) {
-      LOG.error("KeyProvider URI string is invalid [" + providerUriStr
-          + "]!!", e.getCause());
-      return null;
-    }
-    return providerUri;
-  }
-
-  @VisibleForTesting
-  public void setKeyProvider(Configuration conf, KeyProvider keyProvider)
-      throws IOException {
-    URI uri = createKeyProviderURI(conf);
-    cache.put(uri, keyProvider);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
deleted file mode 100644
index 08b0468..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.LinkedListMultimap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
-
-/**
- * A cache of input stream sockets to Data Node.
- */
-@InterfaceStability.Unstable
-@InterfaceAudience.Private
-@VisibleForTesting
-public class PeerCache {
-  private static final Log LOG = LogFactory.getLog(PeerCache.class);
-  
-  private static class Key {
-    final DatanodeID dnID;
-    final boolean isDomain;
-    
-    Key(DatanodeID dnID, boolean isDomain) {
-      this.dnID = dnID;
-      this.isDomain = isDomain;
-    }
-    
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof Key)) {
-        return false;
-      }
-      Key other = (Key)o;
-      return dnID.equals(other.dnID) && isDomain == other.isDomain;
-    }
-
-    @Override
-    public int hashCode() {
-      return dnID.hashCode() ^ (isDomain ? 1 : 0);
-    }
-  }
-  
-  private static class Value {
-    private final Peer peer;
-    private final long time;
-
-    Value(Peer peer, long time) {
-      this.peer = peer;
-      this.time = time;
-    }
-
-    Peer getPeer() {
-      return peer;
-    }
-
-    long getTime() {
-      return time;
-    }
-  }
-
-  private Daemon daemon;
-  /** A map for per user per datanode. */
-  private final LinkedListMultimap<Key, Value> multimap =
-    LinkedListMultimap.create();
-  private final int capacity;
-  private final long expiryPeriod;
-  
-  public PeerCache(int c, long e) {
-    this.capacity = c;
-    this.expiryPeriod = e;
-
-    if (capacity == 0 ) {
-      LOG.info("SocketCache disabled.");
-    } else if (expiryPeriod == 0) {
-      throw new IllegalStateException("Cannot initialize expiryPeriod to " +
-         expiryPeriod + " when cache is enabled.");
-    }
-  }
- 
-  private boolean isDaemonStarted() {
-    return (daemon == null)? false: true;
-  }
-
-  private synchronized void startExpiryDaemon() {
-    // start daemon only if not already started
-    if (isDaemonStarted() == true) {
-      return;
-    }
-    
-    daemon = new Daemon(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          PeerCache.this.run();
-        } catch(InterruptedException e) {
-          //noop
-        } finally {
-          PeerCache.this.clear();
-        }
-      }
-
-      @Override
-      public String toString() {
-        return String.valueOf(PeerCache.this);
-      }
-    });
-    daemon.start();
-  }
-
-  /**
-   * Get a cached peer connected to the given DataNode.
-   * @param dnId         The DataNode to get a Peer for.
-   * @param isDomain     Whether to retrieve a DomainPeer or not.
-   *
-   * @return             An open Peer connected to the DN, or null if none
-   *                     was found. 
-   */
-  public Peer get(DatanodeID dnId, boolean isDomain) {
-
-    if (capacity <= 0) { // disabled
-      return null;
-    }
-    return getInternal(dnId, isDomain);
-  }
-
-  private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) {
-    List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
-    if (sockStreamList == null) {
-      return null;
-    }
-
-    Iterator<Value> iter = sockStreamList.iterator();
-    while (iter.hasNext()) {
-      Value candidate = iter.next();
-      iter.remove();
-      long ageMs = Time.monotonicNow() - candidate.getTime();
-      Peer peer = candidate.getPeer();
-      if (ageMs >= expiryPeriod) {
-        try {
-          peer.close();
-        } catch (IOException e) {
-          LOG.warn("got IOException closing stale peer " + peer +
-                ", which is " + ageMs + " ms old");
-        }
-      } else if (!peer.isClosed()) {
-        return peer;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Give an unused socket to the cache.
-   */
-  public void put(DatanodeID dnId, Peer peer) {
-    Preconditions.checkNotNull(dnId);
-    Preconditions.checkNotNull(peer);
-    if (peer.isClosed()) return;
-    if (capacity <= 0) {
-      // Cache disabled.
-      IOUtils.cleanup(LOG, peer);
-      return;
-    }
-    putInternal(dnId, peer);
-  }
-
-  private synchronized void putInternal(DatanodeID dnId, Peer peer) {
-    startExpiryDaemon();
-
-    if (capacity == multimap.size()) {
-      evictOldest();
-    }
-    multimap.put(new Key(dnId, peer.getDomainSocket() != null),
-        new Value(peer, Time.monotonicNow()));
-  }
-
-  public synchronized int size() {
-    return multimap.size();
-  }
-
-  /**
-   * Evict and close sockets older than expiry period from the cache.
-   */
-  private synchronized void evictExpired(long expiryPeriod) {
-    while (multimap.size() != 0) {
-      Iterator<Entry<Key, Value>> iter =
-        multimap.entries().iterator();
-      Entry<Key, Value> entry = iter.next();
-      // if oldest socket expired, remove it
-      if (entry == null || 
-        Time.monotonicNow() - entry.getValue().getTime() <
-        expiryPeriod) {
-        break;
-      }
-      IOUtils.cleanup(LOG, entry.getValue().getPeer());
-      iter.remove();
-    }
-  }
-
-  /**
-   * Evict the oldest entry in the cache.
-   */
-  private synchronized void evictOldest() {
-    // We can get the oldest element immediately, because of an interesting
-    // property of LinkedListMultimap: its iterator traverses entries in the
-    // order that they were added.
-    Iterator<Entry<Key, Value>> iter =
-      multimap.entries().iterator();
-    if (!iter.hasNext()) {
-      throw new IllegalStateException("Cannot evict from empty cache! " +
-        "capacity: " + capacity);
-    }
-    Entry<Key, Value> entry = iter.next();
-    IOUtils.cleanup(LOG, entry.getValue().getPeer());
-    iter.remove();
-  }
-
-  /**
-   * Periodically check in the cache and expire the entries
-   * older than expiryPeriod minutes
-   */
-  private void run() throws InterruptedException {
-    for(long lastExpiryTime = Time.monotonicNow();
-        !Thread.interrupted();
-        Thread.sleep(expiryPeriod)) {
-      final long elapsed = Time.monotonicNow() - lastExpiryTime;
-      if (elapsed >= expiryPeriod) {
-        evictExpired(expiryPeriod);
-        lastExpiryTime = Time.monotonicNow();
-      }
-    }
-    clear();
-    throw new InterruptedException("Daemon Interrupted");
-  }
-
-  /**
-   * Empty the cache, and close all sockets.
-   */
-  @VisibleForTesting
-  synchronized void clear() {
-    for (Value value : multimap.values()) {
-      IOUtils.cleanup(LOG, value.getPeer());
-    }
-    multimap.clear();
-  }
-  
-  @VisibleForTesting
-  void close() {
-    clear();
-    if (daemon != null) {
-      daemon.interrupt();
-      try {
-        daemon.join();
-      } catch (InterruptedException e) {
-        throw new RuntimeException("failed to join thread");
-      }
-    }
-    daemon = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
deleted file mode 100644
index 07f4836..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.client;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Options that can be specified when manually triggering a block report.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public final class BlockReportOptions {
-  private final boolean incremental;
-
-  private BlockReportOptions(boolean incremental) {
-    this.incremental = incremental;
-  }
-
-  public boolean isIncremental() {
-    return incremental;
-  }
-
-  public static class Factory {
-    private boolean incremental = false;
-
-    public Factory() {
-    }
-
-    public Factory setIncremental(boolean incremental) {
-      this.incremental = incremental;
-      return this;
-    }
-
-    public BlockReportOptions build() {
-      return new BlockReportOptions(incremental);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "BlockReportOptions{incremental=" + incremental + "}";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
deleted file mode 100644
index 69fa52d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * A block and the full path information to the block data file and
- * the metadata file stored on the local file system.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class BlockLocalPathInfo {
-  private final ExtendedBlock block;
-  private String localBlockPath = "";  // local file storing the data
-  private String localMetaPath = "";   // local file storing the checksum
-
-  /**
-   * Constructs BlockLocalPathInfo.
-   * @param b The block corresponding to this lock path info. 
-   * @param file Block data file.
-   * @param metafile Metadata file for the block.
-   */
-  public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) {
-    block = b;
-    localBlockPath = file;
-    localMetaPath = metafile;
-  }
-
-  /**
-   * Get the Block data file.
-   * @return Block data file.
-   */
-  public String getBlockPath() {return localBlockPath;}
-  
-  /**
-   * @return the Block
-   */
-  public ExtendedBlock getBlock() { return block;}
-  
-  /**
-   * Get the Block metadata file.
-   * @return Block metadata file.
-   */
-  public String getMetaPath() {return localMetaPath;}
-
-  /**
-   * Get number of bytes in the block.
-   * @return Number of bytes in the block.
-   */
-  public long getNumBytes() {
-    return block.getNumBytes();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
deleted file mode 100644
index da8f4ab..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.ReconfigurationTaskStatus;
-import org.apache.hadoop.hdfs.client.BlockReportOptions;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
-import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenInfo;
-
-/** An client-datanode protocol for block recovery
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-@KerberosInfo(
-    serverPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
-@TokenInfo(BlockTokenSelector.class)
-public interface ClientDatanodeProtocol {
-  /**
-   * Until version 9, this class ClientDatanodeProtocol served as both
-   * the client interface to the DN AND the RPC protocol used to 
-   * communicate with the NN.
-   * 
-   * This class is used by both the DFSClient and the 
-   * DN server side to insulate from the protocol serialization.
-   * 
-   * If you are adding/changing DN's interface then you need to 
-   * change both this class and ALSO related protocol buffer
-   * wire protocol definition in ClientDatanodeProtocol.proto.
-   * 
-   * For more details on protocol buffer wire protocol, please see 
-   * .../org/apache/hadoop/hdfs/protocolPB/overview.html
-   * 
-   * The log of historical changes can be retrieved from the svn).
-   * 9: Added deleteBlockPool method
-   * 
-   * 9 is the last version id when this class was used for protocols
-   *  serialization. DO not update this version any further. 
-   */
-  public static final long versionID = 9L;
-
-  /** Return the visible length of a replica. */
-  long getReplicaVisibleLength(ExtendedBlock b) throws IOException;
-  
-  /**
-   * Refresh the list of federated namenodes from updated configuration
-   * Adds new namenodes and stops the deleted namenodes.
-   * 
-   * @throws IOException on error
-   **/
-  void refreshNamenodes() throws IOException;
-
-  /**
-   * Delete the block pool directory. If force is false it is deleted only if
-   * it is empty, otherwise it is deleted along with its contents.
-   * 
-   * @param bpid Blockpool id to be deleted.
-   * @param force If false blockpool directory is deleted only if it is empty 
-   *          i.e. if it doesn't contain any block files, otherwise it is 
-   *          deleted along with its contents.
-   * @throws IOException
-   */
-  void deleteBlockPool(String bpid, boolean force) throws IOException;
-  
-  /**
-   * Retrieves the path names of the block file and metadata file stored on the
-   * local file system.
-   * 
-   * In order for this method to work, one of the following should be satisfied:
-   * <ul>
-   * <li>
-   * The client user must be configured at the datanode to be able to use this
-   * method.</li>
-   * <li>
-   * When security is enabled, kerberos authentication must be used to connect
-   * to the datanode.</li>
-   * </ul>
-   * 
-   * @param block
-   *          the specified block on the local datanode
-   * @param token
-   *          the block access token.
-   * @return the BlockLocalPathInfo of a block
-   * @throws IOException
-   *           on error
-   */
-  BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
-      Token<BlockTokenIdentifier> token) throws IOException;
-  
-  /**
-   * Shuts down a datanode.
-   *
-   * @param forUpgrade If true, data node does extra prep work before shutting
-   *          down. The work includes advising clients to wait and saving
-   *          certain states for quick restart. This should only be used when
-   *          the stored data will remain the same during upgrade/restart.
-   * @throws IOException 
-   */
-  void shutdownDatanode(boolean forUpgrade) throws IOException;  
-
-  /**
-   * Obtains datanode info
-   *
-   * @return software/config version and uptime of the datanode
-   */
-  DatanodeLocalInfo getDatanodeInfo() throws IOException;
-
-  /**
-   * Asynchronously reload configuration on disk and apply changes.
-   */
-  void startReconfiguration() throws IOException;
-
-  /**
-   * Get the status of the previously issued reconfig task.
-   * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
-   */
-  ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
-
-  /**
-   * Get a list of allowed properties for reconfiguration.
-   */
-  List<String> listReconfigurableProperties() throws IOException;
-
-  /**
-   * Trigger a new block report.
-   */
-  void triggerBlockReport(BlockReportOptions options)
-    throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
deleted file mode 100644
index 170467e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Encryption key verification failed.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class InvalidEncryptionKeyException extends IOException {
-  private static final long serialVersionUID = 0l;
-
-  public InvalidEncryptionKeyException() {
-    super();
-  }
-
-  public InvalidEncryptionKeyException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 694f521..85da414 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCustomProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
@@ -115,7 +114,7 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+      readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
         PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
         proto.getOffset(),
@@ -136,7 +135,7 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+      writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
           PBHelperClient.convertStorageType(proto.getStorageType()),
           PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
           proto.getHeader().getClientName(),
@@ -167,7 +166,7 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
+      transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
           PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
           proto.getHeader().getClientName(),
           targets,
@@ -186,7 +185,7 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
+      requestShortCircuitFds(PBHelperClient.convert(proto.getHeader().getBlock()),
           PBHelper.convert(proto.getHeader().getToken()),
           slotId, proto.getMaxVersion(),
           proto.getSupportsReceiptVerification());
@@ -228,7 +227,7 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
+      replaceBlock(PBHelperClient.convert(proto.getHeader().getBlock()),
           PBHelperClient.convertStorageType(proto.getStorageType()),
           PBHelper.convert(proto.getHeader().getToken()),
           proto.getDelHint(),
@@ -244,7 +243,7 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-      copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
+      copyBlock(PBHelperClient.convert(proto.getHeader().getBlock()),
           PBHelper.convert(proto.getHeader().getToken()));
     } finally {
       if (traceScope != null) traceScope.close();
@@ -257,7 +256,7 @@ public abstract class Receiver implements DataTransferProtocol {
     TraceScope traceScope = continueTraceSpan(proto.getHeader(),
         proto.getClass().getSimpleName());
     try {
-    blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
+    blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()),
         PBHelper.convert(proto.getHeader().getToken()));
     } finally {
       if (traceScope != null) traceScope.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
deleted file mode 100644
index 21073eb..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocolPB;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.token.TokenInfo;
-
-@KerberosInfo(
-    serverPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
-@TokenInfo(BlockTokenSelector.class)
-@ProtocolInfo(protocolName = 
-    "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol",
-    protocolVersion = 1)
-@InterfaceAudience.Private
-public interface ClientDatanodeProtocolPB extends
-    ClientDatanodeProtocolService.BlockingInterface {
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
index 3886007..5efcf67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hdfs.protocolPB;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Optional;
@@ -86,7 +84,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       throws ServiceException {
     long len;
     try {
-      len = impl.getReplicaVisibleLength(PBHelper.convert(request.getBlock()));
+      len = impl.getReplicaVisibleLength(PBHelperClient.convert(request.getBlock()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -123,7 +121,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
       throws ServiceException {
     BlockLocalPathInfo resp;
     try {
-      resp = impl.getBlockLocalPathInfo(PBHelper.convert(request.getBlock()), PBHelper.convert(request.getToken()));
+      resp = impl.getBlockLocalPathInfo(PBHelperClient.convert(request.getBlock()), PBHelper.convert(request.getToken()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }


Mime
View raw message