hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [03/50] [abbrv] hadoop git commit: HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates short-circuit related conf to ShortCircuitConf.
Date Fri, 17 Apr 2015 22:35:31 GMT
HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates short-circuit related conf to ShortCircuitConf.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffe48015
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffe48015
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffe48015

Branch: refs/heads/YARN-2928
Commit: ffe48015c341d5db908e8efed96876bd5ad53c1a
Parents: f5d1172
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Fri Apr 10 14:48:45 2015 -0700
Committer: Zhijie Shen <zjshen@apache.org>
Committed: Fri Apr 17 15:29:39 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |  31 +-
 .../apache/hadoop/hdfs/BlockReaderLocal.java    |  10 +-
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     |  27 +-
 .../org/apache/hadoop/hdfs/ClientContext.java   |  75 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 446 +----------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  21 +-
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  22 +-
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  34 +-
 .../hadoop/hdfs/DistributedFileSystem.java      |   4 +-
 .../org/apache/hadoop/hdfs/LeaseRenewer.java    |  15 +-
 .../org/apache/hadoop/hdfs/NameNodeProxies.java |  10 +-
 .../hadoop/hdfs/client/impl/DfsClientConf.java  | 738 +++++++++++++++++++
 .../hdfs/shortcircuit/DomainSocketFactory.java  |   8 +-
 .../hdfs/shortcircuit/ShortCircuitCache.java    |  12 +
 .../hadoop/fs/TestEnhancedByteBufferAccess.java |   6 +-
 .../hadoop/hdfs/TestBlockReaderLocal.java       |   3 +-
 .../hadoop/hdfs/TestDFSClientRetries.java       |   2 +-
 .../apache/hadoop/hdfs/TestDFSOutputStream.java |   3 +-
 .../apache/hadoop/hdfs/TestLeaseRenewer.java    |  13 +-
 .../hadoop/hdfs/TestParallelReadUtil.java       |   3 +-
 .../blockmanagement/TestBlockTokenWithDFS.java  |   3 +-
 .../datanode/TestDataNodeVolumeFailure.java     |   4 +-
 .../shortcircuit/TestShortCircuitCache.java     |   2 +-
 24 files changed, 930 insertions(+), 565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 333a1b1..c2f0363 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -415,6 +415,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8102. Separate webhdfs retry configuration keys from DFSConfigKeys.
     (wheat9)
 
+    HDFS-8100. Refactor DFSClient.Conf to a standalone class and separates
+    short-circuit related conf to ShortCircuitConf.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 8f33899..5175a87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -81,7 +83,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
   static ShortCircuitReplicaCreator
       createShortCircuitReplicaInfoCallback = null;
 
-  private final DFSClient.Conf conf;
+  private final DfsClientConf conf;
 
   /**
    * Injects failures into specific operations during unit tests.
@@ -180,10 +182,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
    */
   private int remainingCacheTries;
 
-  public BlockReaderFactory(DFSClient.Conf conf) {
+  public BlockReaderFactory(DfsClientConf conf) {
     this.conf = conf;
-    this.failureInjector = conf.brfFailureInjector;
-    this.remainingCacheTries = conf.nCachedConnRetry;
+    this.failureInjector = conf.getShortCircuitConf().brfFailureInjector;
+    this.remainingCacheTries = conf.getNumCachedConnRetry();
   }
 
   public BlockReaderFactory setFileName(String fileName) {
@@ -317,7 +319,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
     BlockReader reader = null;
 
     Preconditions.checkNotNull(configuration);
-    if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
+    if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
       if (clientContext.getUseLegacyBlockReaderLocal()) {
         reader = getLegacyBlockReaderLocal();
         if (reader != null) {
@@ -336,7 +339,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
         }
       }
     }
-    if (conf.domainSocketDataTraffic) {
+    if (scConf.isDomainSocketDataTraffic()) {
       reader = getRemoteBlockReaderFromDomain();
       if (reader != null) {
         if (LOG.isTraceEnabled()) {
@@ -406,8 +409,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
           "for short-circuit reads.");
     }
     if (pathInfo == null) {
-      pathInfo = clientContext.getDomainSocketFactory().
-                      getPathInfo(inetSocketAddress, conf);
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
     }
     if (!pathInfo.getPathState().getUsableForShortCircuit()) {
       PerformanceAdvisory.LOG.debug("{}: {} is not usable for short circuit; " +
@@ -431,7 +434,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
           "BlockReaderLocal via {}", this, pathInfo.getPath());
       return null;
     }
-    return new BlockReaderLocal.Builder(conf).
+    return new BlockReaderLocal.Builder(conf.getShortCircuitConf()).
         setFilename(fileName).
         setBlock(block).
         setStartOffset(startOffset).
@@ -604,8 +607,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
    */
   private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
     if (pathInfo == null) {
-      pathInfo = clientContext.getDomainSocketFactory().
-                      getPathInfo(inetSocketAddress, conf);
+      pathInfo = clientContext.getDomainSocketFactory()
+          .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
     }
     if (!pathInfo.getPathState().getUsableForDataTransfer()) {
       PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
@@ -744,7 +747,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
       }
     }
     DomainSocket sock = clientContext.getDomainSocketFactory().
-        createSocket(pathInfo, conf.socketTimeout);
+        createSocket(pathInfo, conf.getSocketTimeout());
     if (sock == null) return null;
     return new BlockReaderPeer(new DomainPeer(sock), false);
   }
@@ -803,9 +806,9 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
 
   @SuppressWarnings("deprecation")
   private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
-    if (conf.useLegacyBlockReader) {
+    if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
       return RemoteBlockReader.newBlockReader(fileName,
-          block, token, startOffset, length, conf.ioBufferSize,
+          block, token, startOffset, length, conf.getIoBufferSize(),
           verifyChecksum, clientName, peer, datanode,
           clientContext.getPeerCache(), cachingStrategy);
     } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index ab93441..d913f3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -27,14 +27,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.util.DirectBufferPool;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
 import org.apache.htrace.Sampler;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -74,10 +74,10 @@ class BlockReaderLocal implements BlockReader {
     private ExtendedBlock block;
     private StorageType storageType;
 
-    public Builder(Conf conf) {
+    public Builder(ShortCircuitConf conf) {
       this.maxReadahead = Integer.MAX_VALUE;
-      this.verifyChecksum = !conf.skipShortCircuitChecksums;
-      this.bufferSize = conf.shortCircuitBufferSize;
+      this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
+      this.bufferSize = conf.getShortCircuitBufferSize();
     }
 
     public Builder setVerifyChecksum(boolean verifyChecksum) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 0c9ec45..8df44f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -42,12 +44,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.util.DirectBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DirectBufferPool;
 import org.apache.htrace.Sampler;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -180,12 +182,13 @@ class BlockReaderLocalLegacy implements BlockReader {
   /**
    * The only way this object can be instantiated.
    */
-  static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
+  static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
       UserGroupInformation userGroupInformation,
       Configuration configuration, String file, ExtendedBlock blk,
       Token<BlockTokenIdentifier> token, DatanodeInfo node, 
       long startOffset, long length, StorageType storageType)
       throws IOException {
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
@@ -195,8 +198,8 @@ class BlockReaderLocalLegacy implements BlockReader {
         userGroupInformation = UserGroupInformation.getCurrentUser();
       }
       pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
-          configuration, conf.socketTimeout, token,
-          conf.connectToDnViaHostname, storageType);
+          configuration, conf.getSocketTimeout(), token,
+          conf.isConnectToDnViaHostname(), storageType);
     }
 
     // check to see if the file exists. It may so happen that the
@@ -208,8 +211,8 @@ class BlockReaderLocalLegacy implements BlockReader {
     FileInputStream dataIn = null;
     FileInputStream checksumIn = null;
     BlockReaderLocalLegacy localBlockReader = null;
-    boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
-        storageType.isTransient();
+    final boolean skipChecksumCheck = scConf.isSkipShortCircuitChecksums()
+        || storageType.isTransient();
     try {
       // get a local file system
       File blkfile = new File(pathinfo.getBlockPath());
@@ -230,11 +233,11 @@ class BlockReaderLocalLegacy implements BlockReader {
             new DataInputStream(checksumIn), blk);
         long firstChunkOffset = startOffset
             - (startOffset % checksum.getBytesPerChecksum());
-        localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
+        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
             startOffset, length, pathinfo, checksum, true, dataIn,
             firstChunkOffset, checksumIn);
       } else {
-        localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
+        localBlockReader = new BlockReaderLocalLegacy(scConf, file, blk, token,
             startOffset, length, pathinfo, dataIn);
       }
     } catch (IOException e) {
@@ -312,7 +315,7 @@ class BlockReaderLocalLegacy implements BlockReader {
     return bufferSizeBytes / bytesPerChecksum;
   }
 
-  private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
+  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
       ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
       long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
       throws IOException {
@@ -321,7 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader {
         dataIn, startOffset, null);
   }
 
-  private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
+  private BlockReaderLocalLegacy(ShortCircuitConf conf, String hdfsfile,
       ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
       long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
       boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
@@ -339,8 +342,8 @@ class BlockReaderLocalLegacy implements BlockReader {
     this.checksumIn = checksumIn;
     this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
 
-    int chunksPerChecksumRead = getSlowReadBufferNumChunks(
-        conf.shortCircuitBufferSize, bytesPerChecksum);
+    final int chunksPerChecksumRead = getSlowReadBufferNumChunks(
+        conf.getShortCircuitBufferSize(), bytesPerChecksum);
     slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
     checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
     // Initially the buffers have nothing to read.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index af7c095..6359def 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -23,13 +23,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
 import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
 
 /**
  * ClientContext contains context information for a client.
@@ -99,59 +99,24 @@ public class ClientContext {
    */
   private boolean printedConfWarning = false;
 
-  private ClientContext(String name, Conf conf) {
-    this.name = name;
-    this.confString = confAsString(conf);
-    this.shortCircuitCache = new ShortCircuitCache(
-        conf.shortCircuitStreamsCacheSize,
-        conf.shortCircuitStreamsCacheExpiryMs,
-        conf.shortCircuitMmapCacheSize,
-        conf.shortCircuitMmapCacheExpiryMs,
-        conf.shortCircuitMmapCacheRetryTimeout,
-        conf.shortCircuitCacheStaleThresholdMs,
-        conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
-    this.peerCache =
-          new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
-    this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs);
-    this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
-    this.domainSocketFactory = new DomainSocketFactory(conf);
-
-    this.byteArrayManager = ByteArrayManager.newInstance(conf.writeByteArrayManagerConf);
-  }
+  private ClientContext(String name, DfsClientConf conf) {
+    final ShortCircuitConf scConf = conf.getShortCircuitConf();
 
-  public static String confAsString(Conf conf) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("shortCircuitStreamsCacheSize = ").
-      append(conf.shortCircuitStreamsCacheSize).
-      append(", shortCircuitStreamsCacheExpiryMs = ").
-      append(conf.shortCircuitStreamsCacheExpiryMs).
-      append(", shortCircuitMmapCacheSize = ").
-      append(conf.shortCircuitMmapCacheSize).
-      append(", shortCircuitMmapCacheExpiryMs = ").
-      append(conf.shortCircuitMmapCacheExpiryMs).
-      append(", shortCircuitMmapCacheRetryTimeout = ").
-      append(conf.shortCircuitMmapCacheRetryTimeout).
-      append(", shortCircuitCacheStaleThresholdMs = ").
-      append(conf.shortCircuitCacheStaleThresholdMs).
-      append(", socketCacheCapacity = ").
-      append(conf.socketCacheCapacity).
-      append(", socketCacheExpiry = ").
-      append(conf.socketCacheExpiry).
-      append(", shortCircuitLocalReads = ").
-      append(conf.shortCircuitLocalReads).
-      append(", useLegacyBlockReaderLocal = ").
-      append(conf.useLegacyBlockReaderLocal).
-      append(", domainSocketDataTraffic = ").
-      append(conf.domainSocketDataTraffic).
-      append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
-      append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs).
-      append(", keyProviderCacheExpiryMs = ").
-      append(conf.keyProviderCacheExpiryMs);
-
-    return builder.toString();
+    this.name = name;
+    this.confString = scConf.confAsString();
+    this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
+    this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
+        scConf.getSocketCacheExpiry());
+    this.keyProviderCache = new KeyProviderCache(
+        scConf.getKeyProviderCacheExpiryMs());
+    this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal();
+    this.domainSocketFactory = new DomainSocketFactory(scConf);
+
+    this.byteArrayManager = ByteArrayManager.newInstance(
+        conf.getWriteByteArrayManagerConf());
   }
 
-  public static ClientContext get(String name, Conf conf) {
+  public static ClientContext get(String name, DfsClientConf conf) {
     ClientContext context;
     synchronized(ClientContext.class) {
       context = CACHES.get(name);
@@ -175,12 +140,12 @@ public class ClientContext {
   public static ClientContext getFromConf(Configuration conf) {
     return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT,
         DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
-            new DFSClient.Conf(conf));
+            new DfsClientConf(conf));
   }
 
-  private void printConfWarningIfNeeded(Conf conf) {
+  private void printConfWarningIfNeeded(DfsClientConf conf) {
     String existing = this.getConfString();
-    String requested = confAsString(conf);
+    String requested = conf.getShortCircuitConf().confAsString();
     if (!existing.equals(requested)) {
       if (!printedConfWarning) {
         printedConfWarning = true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index d43e7de..f79d160 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -18,48 +18,11 @@
 package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -109,7 +72,6 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -136,9 +98,9 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.AclException;
@@ -195,14 +157,12 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
-import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
@@ -250,7 +210,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
 
   private final Configuration conf;
-  private final Conf dfsClientConf;
+  private final DfsClientConf dfsClientConf;
   final ClientProtocol namenode;
   /* The service used for delegation tokens */
   private Text dtService;
@@ -278,307 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   private final Sampler<?> traceSampler;
 
-  /**
-   * DFSClient configuration 
-   */
-  public static class Conf {
-    final int hdfsTimeout;    // timeout value for a DFS operation.
-
-    final int maxFailoverAttempts;
-    final int maxRetryAttempts;
-    final int failoverSleepBaseMillis;
-    final int failoverSleepMaxMillis;
-    final int maxBlockAcquireFailures;
-    final int confTime;
-    final int ioBufferSize;
-    final ChecksumOpt defaultChecksumOpt;
-    final int writePacketSize;
-    final int writeMaxPackets;
-    final ByteArrayManager.Conf writeByteArrayManagerConf;
-    final int socketTimeout;
-    final int socketCacheCapacity;
-    final long socketCacheExpiry;
-    final long excludedNodesCacheExpiry;
-    /** Wait time window (in msec) if BlockMissingException is caught */
-    final int timeWindow;
-    final int nCachedConnRetry;
-    final int nBlockWriteRetry;
-    final int nBlockWriteLocateFollowingRetry;
-    final int blockWriteLocateFollowingInitialDelayMs;
-    final long defaultBlockSize;
-    final long prefetchSize;
-    final short defaultReplication;
-    final String taskId;
-    final FsPermission uMask;
-    final boolean connectToDnViaHostname;
-    final boolean getHdfsBlocksMetadataEnabled;
-    final int getFileBlockStorageLocationsNumThreads;
-    final int getFileBlockStorageLocationsTimeoutMs;
-    final int retryTimesForGetLastBlockLength;
-    final int retryIntervalForGetLastBlockLength;
-    final long datanodeRestartTimeout;
-    final long dfsclientSlowIoWarningThresholdMs;
-
-    final boolean useLegacyBlockReader;
-    final boolean useLegacyBlockReaderLocal;
-    final String domainSocketPath;
-    final boolean skipShortCircuitChecksums;
-    final int shortCircuitBufferSize;
-    final boolean shortCircuitLocalReads;
-    final boolean domainSocketDataTraffic;
-    final int shortCircuitStreamsCacheSize;
-    final long shortCircuitStreamsCacheExpiryMs; 
-    final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
-    
-    final boolean shortCircuitMmapEnabled;
-    final int shortCircuitMmapCacheSize;
-    final long shortCircuitMmapCacheExpiryMs;
-    final long shortCircuitMmapCacheRetryTimeout;
-    final long shortCircuitCacheStaleThresholdMs;
-
-    final long keyProviderCacheExpiryMs;
-    public BlockReaderFactory.FailureInjector brfFailureInjector =
-      new BlockReaderFactory.FailureInjector();
-
-    public Conf(Configuration conf) {
-      // The hdfsTimeout is currently the same as the ipc timeout 
-      hdfsTimeout = Client.getTimeout(conf);
-      maxFailoverAttempts = conf.getInt(
-          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
-          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
-      maxRetryAttempts = conf.getInt(
-          HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
-          HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
-      failoverSleepBaseMillis = conf.getInt(
-          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
-          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
-      failoverSleepMaxMillis = conf.getInt(
-          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
-          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
-
-      maxBlockAcquireFailures = conf.getInt(
-          DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
-          DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
-      confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
-          HdfsServerConstants.WRITE_TIMEOUT);
-      ioBufferSize = conf.getInt(
-          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
-          CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-      defaultChecksumOpt = getChecksumOptFromConf(conf);
-      socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
-          HdfsServerConstants.READ_TIMEOUT);
-      /** dfs.write.packet.size is an internal config variable */
-      writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
-          DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-      writeMaxPackets = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
-          DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
-      
-      final boolean byteArrayManagerEnabled = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
-          DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
-      if (!byteArrayManagerEnabled) {
-        writeByteArrayManagerConf = null;
-      } else {
-        final int countThreshold = conf.getInt(
-            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
-            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
-        final int countLimit = conf.getInt(
-            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
-            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
-        final long countResetTimePeriodMs = conf.getLong(
-            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
-            DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
-        writeByteArrayManagerConf = new ByteArrayManager.Conf(
-            countThreshold, countLimit, countResetTimePeriodMs); 
-      }
-      
-      
-      defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
-          DFS_BLOCK_SIZE_DEFAULT);
-      defaultReplication = (short) conf.getInt(
-          DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
-      taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
-      socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
-          DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
-      socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
-          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
-      excludedNodesCacheExpiry = conf.getLong(
-          DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
-          DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
-      prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
-          10 * defaultBlockSize);
-      timeWindow = conf.getInt(
-          HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY,
-          HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT);
-      nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
-          DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
-      nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
-          DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
-      nBlockWriteLocateFollowingRetry = conf.getInt(
-          DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
-          DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
-      blockWriteLocateFollowingInitialDelayMs = conf.getInt(
-          DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
-          DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT);
-      uMask = FsPermission.getUMask(conf);
-      connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
-          DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
-      getHdfsBlocksMetadataEnabled = conf.getBoolean(
-          DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
-          DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-      getFileBlockStorageLocationsNumThreads = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
-          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
-      getFileBlockStorageLocationsTimeoutMs = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
-          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
-      retryTimesForGetLastBlockLength = conf.getInt(
-          HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
-          HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
-      retryIntervalForGetLastBlockLength = conf.getInt(
-          HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
-          HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
-
-      useLegacyBlockReader = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
-          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
-      useLegacyBlockReaderLocal = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
-          DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
-      shortCircuitLocalReads = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
-      domainSocketDataTraffic = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
-          DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
-      domainSocketPath = conf.getTrimmed(
-          DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
-          DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
-
-      if (BlockReaderLocal.LOG.isDebugEnabled()) {
-        BlockReaderLocal.LOG.debug(
-            DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
-            + " = " + useLegacyBlockReaderLocal);
-        BlockReaderLocal.LOG.debug(
-            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
-            + " = " + shortCircuitLocalReads);
-        BlockReaderLocal.LOG.debug(
-            DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
-            + " = " + domainSocketDataTraffic);
-        BlockReaderLocal.LOG.debug(
-            DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
-            + " = " + domainSocketPath);
-      }
-
-      skipShortCircuitChecksums = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
-      shortCircuitBufferSize = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-      shortCircuitStreamsCacheSize = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
-      shortCircuitStreamsCacheExpiryMs = conf.getLong(
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
-      shortCircuitMmapEnabled = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
-          DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
-      shortCircuitMmapCacheSize = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
-          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
-      shortCircuitMmapCacheExpiryMs = conf.getLong(
-          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
-          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
-      shortCircuitMmapCacheRetryTimeout = conf.getLong(
-          DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
-          DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
-      shortCircuitCacheStaleThresholdMs = conf.getLong(
-          DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
-          DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
-      shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
-          DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
-          DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
-
-      datanodeRestartTimeout = conf.getLong(
-          DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
-          DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
-      dfsclientSlowIoWarningThresholdMs = conf.getLong(
-          DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
-          DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
-
-      keyProviderCacheExpiryMs = conf.getLong(
-          DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
-          DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
-    }
-
-    public boolean isUseLegacyBlockReaderLocal() {
-      return useLegacyBlockReaderLocal;
-    }
-
-    public String getDomainSocketPath() {
-      return domainSocketPath;
-    }
-
-    public boolean isShortCircuitLocalReads() {
-      return shortCircuitLocalReads;
-    }
-
-    public boolean isDomainSocketDataTraffic() {
-      return domainSocketDataTraffic;
-    }
-
-    private DataChecksum.Type getChecksumType(Configuration conf) {
-      final String checksum = conf.get(
-          DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
-          DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
-      try {
-        return DataChecksum.Type.valueOf(checksum);
-      } catch(IllegalArgumentException iae) {
-        LOG.warn("Bad checksum type: " + checksum + ". Using default "
-            + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
-        return DataChecksum.Type.valueOf(
-            DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); 
-      }
-    }
-
-    // Construct a checksum option from conf
-    private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
-      DataChecksum.Type type = getChecksumType(conf);
-      int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
-          DFS_BYTES_PER_CHECKSUM_DEFAULT);
-      return new ChecksumOpt(type, bytesPerChecksum);
-    }
-
-    // create a DataChecksum with the default option.
-    private DataChecksum createChecksum() throws IOException {
-      return createChecksum(null);
-    }
-
-    private DataChecksum createChecksum(ChecksumOpt userOpt) {
-      // Fill in any missing field with the default.
-      ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
-          defaultChecksumOpt, userOpt);
-      DataChecksum dataChecksum = DataChecksum.newDataChecksum(
-          myOpt.getChecksumType(),
-          myOpt.getBytesPerChecksum());
-      if (dataChecksum == null) {
-        throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
-            + userOpt + ", default=" + defaultChecksumOpt
-            + ", effective=null");
-      }
-      return dataChecksum;
-    }
-
-    @VisibleForTesting
-    public int getBlockWriteLocateFollowingInitialDelayMs() {
-      return blockWriteLocateFollowingInitialDelayMs;
-    }
-  }
- 
-  public Conf getConf() {
+  public DfsClientConf getConf() {
     return dfsClientConf;
   }
 
@@ -642,10 +302,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     SpanReceiverHost.getInstance(conf);
     traceSampler = new SamplerBuilder(TraceUtils.wrapHadoopConf(conf)).build();
     // Copy only the required DFSClient configuration
-    this.dfsClientConf = new Conf(conf);
-    if (this.dfsClientConf.useLegacyBlockReaderLocal) {
-      LOG.debug("Using legacy short-circuit local reads.");
-    }
+    this.dfsClientConf = new DfsClientConf(conf);
     this.conf = conf;
     this.stats = stats;
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
@@ -654,7 +311,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     this.ugi = UserGroupInformation.getCurrentUser();
     
     this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
-    this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
+    this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" + 
         DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
     int numResponseToDrop = conf.getInt(
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
@@ -779,30 +436,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   }
 
   /**
-   * Return the number of times the client should go back to the namenode
-   * to retrieve block locations when reading.
-   */
-  int getMaxBlockAcquireFailures() {
-    return dfsClientConf.maxBlockAcquireFailures;
-  }
-
-  /**
    * Return the timeout that clients should use when writing to datanodes.
    * @param numNodes the number of nodes in the pipeline.
    */
   int getDatanodeWriteTimeout(int numNodes) {
-    return (dfsClientConf.confTime > 0) ?
-      (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
+    final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
+    return t > 0? t + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
   }
 
   int getDatanodeReadTimeout(int numNodes) {
-    return dfsClientConf.socketTimeout > 0 ?
-        (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes +
-            dfsClientConf.socketTimeout) : 0;
-  }
-  
-  int getHdfsTimeout() {
-    return dfsClientConf.hdfsTimeout;
+    final int t = dfsClientConf.getSocketTimeout();
+    return t > 0? HdfsServerConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
   }
   
   @VisibleForTesting
@@ -992,14 +636,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   }
 
   /**
-   * Get the default block size for this cluster
-   * @return the default block size in bytes
-   */
-  public long getDefaultBlockSize() {
-    return dfsClientConf.defaultBlockSize;
-  }
-    
-  /**
    * @see ClientProtocol#getPreferredBlockSize(String)
    */
   public long getBlockSize(String f) throws IOException {
@@ -1211,13 +847,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     namenode.reportBadBlocks(blocks);
   }
   
-  public short getDefaultReplication() {
-    return dfsClientConf.defaultReplication;
-  }
-  
   public LocatedBlocks getLocatedBlocks(String src, long start)
       throws IOException {
-    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
+    return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
   }
 
   /*
@@ -1319,7 +951,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public BlockStorageLocation[] getBlockStorageLocations(
       List<BlockLocation> blockLocations) throws IOException,
       UnsupportedOperationException, InvalidBlockTokenException {
-    if (!getConf().getHdfsBlocksMetadataEnabled) {
+    if (!getConf().isHdfsBlocksMetadataEnabled()) {
       throw new UnsupportedOperationException("Datanode-side support for " +
           "getVolumeBlockLocations() must also be enabled in the client " +
           "configuration.");
@@ -1356,9 +988,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     try {
       metadatas = BlockStorageLocationUtil.
           queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
-              getConf().getFileBlockStorageLocationsNumThreads,
-              getConf().getFileBlockStorageLocationsTimeoutMs,
-              getConf().connectToDnViaHostname);
+              getConf().getFileBlockStorageLocationsNumThreads(),
+              getConf().getFileBlockStorageLocationsTimeoutMs(),
+              getConf().isConnectToDnViaHostname());
       if (LOG.isTraceEnabled()) {
         LOG.trace("metadata returned: "
             + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
@@ -1512,7 +1144,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
   public DFSInputStream open(String src) 
       throws IOException, UnresolvedLinkException {
-    return open(src, dfsClientConf.ioBufferSize, true, null);
+    return open(src, dfsClientConf.getIoBufferSize(), true, null);
   }
 
   /**
@@ -1563,8 +1195,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    */
   public OutputStream create(String src, boolean overwrite) 
       throws IOException {
-    return create(src, overwrite, dfsClientConf.defaultReplication,
-        dfsClientConf.defaultBlockSize, null);
+    return create(src, overwrite, dfsClientConf.getDefaultReplication(),
+        dfsClientConf.getDefaultBlockSize(), null);
   }
     
   /**
@@ -1574,8 +1206,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public OutputStream create(String src, 
                              boolean overwrite,
                              Progressable progress) throws IOException {
-    return create(src, overwrite, dfsClientConf.defaultReplication,
-        dfsClientConf.defaultBlockSize, progress);
+    return create(src, overwrite, dfsClientConf.getDefaultReplication(),
+        dfsClientConf.getDefaultBlockSize(), progress);
   }
     
   /**
@@ -1596,7 +1228,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public OutputStream create(String src, boolean overwrite, short replication,
       long blockSize, Progressable progress) throws IOException {
     return create(src, overwrite, replication, blockSize, progress,
-        dfsClientConf.ioBufferSize);
+        dfsClientConf.getIoBufferSize());
   }
 
   /**
@@ -1678,6 +1310,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
         progress, buffersize, checksumOpt, null);
   }
 
+  private FsPermission applyUMask(FsPermission permission) {
+    if (permission == null) {
+      permission = FsPermission.getFileDefault();
+    }
+    return permission.applyUMask(dfsClientConf.getUMask());
+  }
+
   /**
    * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
    * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
@@ -1698,10 +1337,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
                              ChecksumOpt checksumOpt,
                              InetSocketAddress[] favoredNodes) throws IOException {
     checkOpen();
-    if (permission == null) {
-      permission = FsPermission.getFileDefault();
-    }
-    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
+    final FsPermission masked = applyUMask(permission);
     if(LOG.isDebugEnabled()) {
       LOG.debug(src + ": masked=" + masked);
     }
@@ -1783,8 +1419,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       throws IOException {
     TraceScope scope = getPathTraceScope("createSymlink", target);
     try {
-      FsPermission dirPerm = 
-          FsPermission.getDefault().applyUMask(dfsClientConf.uMask); 
+      final FsPermission dirPerm = applyUMask(null);
       namenode.createSymlink(target, link, dirPerm, createParent);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
@@ -1828,7 +1463,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
           new EnumSetWritable<>(flag, CreateFlag.class));
       return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
           progress, blkWithStatus.getLastBlock(),
-          blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(),
+          blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null),
           favoredNodes);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
@@ -2253,7 +1888,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       final DatanodeInfo[] datanodes = lb.getLocations();
       
       //try each datanode location of the block
-      final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
+      final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout();
       boolean done = false;
       for(int j = 0; !done && j < datanodes.length; j++) {
         DataOutputStream out = null;
@@ -2391,7 +2026,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     Socket sock = null;
     try {
       sock = socketFactory.createSocket();
-      String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
+      String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
       if (LOG.isDebugEnabled()) {
         LOG.debug("Connecting to datanode " + dnAddr);
       }
@@ -2424,7 +2059,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    */
   private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
       throws IOException {
-    IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
+    IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
 
     try {
       DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
@@ -2979,10 +2614,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    */
   public boolean mkdirs(String src, FsPermission permission,
       boolean createParent) throws IOException {
-    if (permission == null) {
-      permission = FsPermission.getDefault();
-    }
-    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
+    final FsPermission masked = applyUMask(permission);
     return primitiveMkdir(src, masked, createParent);
   }
 
@@ -3004,8 +2636,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     throws IOException {
     checkOpen();
     if (absPermission == null) {
-      absPermission = 
-        FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
+      absPermission = applyUMask(null);
     } 
 
     if(LOG.isDebugEnabled()) {
@@ -3447,14 +3078,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     Peer peer = null;
     boolean success = false;
     Socket sock = null;
+    final int socketTimeout = dfsClientConf.getSocketTimeout(); 
     try {
       sock = socketFactory.createSocket();
-      NetUtils.connect(sock, addr,
-        getRandomLocalInterfaceAddr(),
-        dfsClientConf.socketTimeout);
+      NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout);
       peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
           blockToken, datanodeId);
-      peer.setReadTimeout(dfsClientConf.socketTimeout);
+      peer.setReadTimeout(socketTimeout);
       success = true;
       return peer;
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 41b9d50..dd0f6fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -52,14 +52,15 @@ import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@@ -265,9 +266,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Grab the open-file info from namenode
    */
   void openInfo() throws IOException, UnresolvedLinkException {
+    final DfsClientConf conf = dfsClient.getConf();
     synchronized(infoLock) {
       lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
-      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
+      int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
       while (retriesForLastBlockLength > 0) {
         // Getting last block length as -1 is a special case. When cluster
         // restarts, DNs may not report immediately. At this time partial block
@@ -277,7 +279,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           DFSClient.LOG.warn("Last block locations not available. "
               + "Datanodes might not have reported blocks completely."
               + " Will retry for " + retriesForLastBlockLength + " times");
-          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
+          waitFor(conf.getRetryIntervalForGetLastBlockLength());
           lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
         } else {
           break;
@@ -346,13 +348,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     assert locatedblock != null : "LocatedBlock cannot be null";
     int replicaNotFoundCount = locatedblock.getLocations().length;
     
+    final DfsClientConf conf = dfsClient.getConf();
     for(DatanodeInfo datanode : locatedblock.getLocations()) {
       ClientDatanodeProtocol cdp = null;
       
       try {
         cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode,
-            dfsClient.getConfiguration(), dfsClient.getConf().socketTimeout,
-            dfsClient.getConf().connectToDnViaHostname, locatedblock);
+            dfsClient.getConfiguration(), conf.getSocketTimeout(),
+            conf.isConnectToDnViaHostname(), locatedblock);
         
         final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
         
@@ -938,7 +941,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
           deadNodes, ignoredNodes);
         String blockInfo = block.getBlock() + " file=" + src;
-        if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
+        if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
           String description = "Could not obtain block: " + blockInfo;
           DFSClient.LOG.warn(description + errMsg
               + ". Throwing a BlockMissingException");
@@ -963,7 +966,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           // alleviating the request rate from the server. Similarly the 3rd retry
           // will wait 6000ms grace period before retry and the waiting window is
           // expanded to 9000ms. 
-          final int timeWindow = dfsClient.getConf().timeWindow;
+          final int timeWindow = dfsClient.getConf().getTimeWindow();
           double waitTime = timeWindow * failures +       // grace period for the last round of attempt
             timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
           DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
@@ -1012,7 +1015,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           ", ignoredNodes = " + ignoredNodes);
     }
     final String dnAddr =
-        chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
+        chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
     }
@@ -1706,7 +1709,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       }
     }
     ByteBuffer buffer = null;
-    if (dfsClient.getConf().shortCircuitMmapEnabled) {
+    if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) {
       buffer = tryReadZeroCopy(maxLength, opts);
     }
     if (buffer != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index f6733e3..8cde274 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -33,10 +33,11 @@ import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -211,7 +212,7 @@ public class DFSOutputStream extends FSOutputSummer
     this(dfsClient, src, progress, stat, checksum);
     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
-    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+    computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
 
     streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
         cachingStrategy, byteArrayManager);
@@ -297,7 +298,7 @@ public class DFSOutputStream extends FSOutputSummer
       adjustPacketChunkSize(stat);
       streamer.setPipelineInConstruction(lastBlock);
     } else {
-      computePacketChunkSize(dfsClient.getConf().writePacketSize,
+      computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
           bytesPerChecksum);
       streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
           dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager);
@@ -334,7 +335,8 @@ public class DFSOutputStream extends FSOutputSummer
       // that expected size of of a packet, then create
       // smaller size packet.
       //
-      computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
+      computePacketChunkSize(
+          Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
           bytesPerChecksum);
     }
   }
@@ -445,7 +447,7 @@ public class DFSOutputStream extends FSOutputSummer
 
     if (!streamer.getAppendChunk()) {
       int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
-          dfsClient.getConf().writePacketSize);
+          dfsClient.getConf().getWritePacketSize());
       computePacketChunkSize(psize, bytesPerChecksum);
     }
   }
@@ -717,7 +719,7 @@ public class DFSOutputStream extends FSOutputSummer
       return;
     }
     streamer.setLastException(new IOException("Lease timeout of "
-        + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+        + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
     closeThreads(true);
     dfsClient.endFileLease(fileId);
   }
@@ -806,15 +808,15 @@ public class DFSOutputStream extends FSOutputSummer
   // be called during unit tests
   protected void completeFile(ExtendedBlock last) throws IOException {
     long localstart = Time.monotonicNow();
-    long sleeptime = dfsClient.getConf().
-        blockWriteLocateFollowingInitialDelayMs;
+    final DfsClientConf conf = dfsClient.getConf();
+    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
     boolean fileComplete = false;
-    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
+    int retries = conf.getNumBlockWriteLocateFollowingRetry();
     while (!fileComplete) {
       fileComplete =
           dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
       if (!fileComplete) {
-        final int hdfsTimeout = dfsClient.getHdfsTimeout();
+        final int hdfsTimeout = conf.getHdfsTimeout();
         if (!dfsClient.clientRunning
             || (hdfsTimeout > 0
                 && localstart + hdfsTimeout < Time.monotonicNow())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 0c6b4a3..405f775 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -84,6 +85,7 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceInfo;
 import org.apache.htrace.TraceScope;
+
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -123,15 +125,15 @@ class DataStreamer extends Daemon {
    */
   static Socket createSocketForPipeline(final DatanodeInfo first,
       final int length, final DFSClient client) throws IOException {
-    final String dnAddr = first.getXferAddr(
-        client.getConf().connectToDnViaHostname);
+    final DfsClientConf conf = client.getConf();
+    final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
     }
     final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
     final Socket sock = client.socketFactory.createSocket();
     final int timeout = client.getDatanodeReadTimeout(length);
-    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout);
+    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
     sock.setSoTimeout(timeout);
     sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
     if(DFSClient.LOG.isDebugEnabled()) {
@@ -244,7 +246,7 @@ class DataStreamer extends Daemon {
     this.byteArrayManager = byteArrayManage;
     isLazyPersistFile = isLazyPersist(stat);
     this.dfsclientSlowLogThresholdMs =
-        dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
+        dfsClient.getConf().getSlowIoWarningThresholdMs();
     excludedNodes = initExcludedNodes();
   }
 
@@ -368,6 +370,7 @@ class DataStreamer extends Daemon {
           doSleep = processDatanodeError();
         }
 
+        final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; 
         synchronized (dataQueue) {
           // wait for a packet to be sent.
           long now = Time.monotonicNow();
@@ -375,8 +378,8 @@ class DataStreamer extends Daemon {
               && dataQueue.size() == 0 &&
               (stage != BlockConstructionStage.DATA_STREAMING ||
                   stage == BlockConstructionStage.DATA_STREAMING &&
-                      now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
-            long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
+                      now - lastPacket < halfSocketTimeout)) || doSleep ) {
+            long timeout = halfSocketTimeout - (now-lastPacket);
             timeout = timeout <= 0 ? 1000 : timeout;
             timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                 timeout : 1000;
@@ -627,7 +630,7 @@ class DataStreamer extends Daemon {
         boolean firstWait = true;
         try {
           while (!streamerClosed && dataQueue.size() + ackQueue.size() >
-              dfsClient.getConf().writeMaxPackets) {
+              dfsClient.getConf().getWriteMaxPackets()) {
             if (firstWait) {
               Span span = Trace.currentSpan();
               if (span != null) {
@@ -842,7 +845,7 @@ class DataStreamer extends Daemon {
             // the local node or the only one in the pipeline.
             if (PipelineAck.isRestartOOBStatus(reply) &&
                 shouldWaitForRestart(i)) {
-              restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+              restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
                   + Time.monotonicNow();
               setRestartingNodeIndex(i);
               String message = "A datanode is restarting: " + targets[i];
@@ -1158,7 +1161,7 @@ class DataStreamer extends Daemon {
         // 4 seconds or the configured deadline period, whichever is shorter.
         // This is the retry interval and recovery will be retried in this
         // interval until timeout or success.
-        long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
+        long delay = Math.min(dfsClient.getConf().getDatanodeRestartTimeout(),
             4000L);
         try {
           Thread.sleep(delay);
@@ -1311,7 +1314,7 @@ class DataStreamer extends Daemon {
     LocatedBlock lb = null;
     DatanodeInfo[] nodes = null;
     StorageType[] storageTypes = null;
-    int count = dfsClient.getConf().nBlockWriteRetry;
+    int count = dfsClient.getConf().getNumBlockWriteRetry();
     boolean success = false;
     ExtendedBlock oldBlock = block;
     do {
@@ -1471,7 +1474,7 @@ class DataStreamer extends Daemon {
         }
         // Check whether there is a restart worth waiting for.
         if (checkRestart && shouldWaitForRestart(errorIndex)) {
-          restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+          restartDeadline = dfsClient.getConf().getDatanodeRestartTimeout()
               + Time.monotonicNow();
           restartingNodeIndex.set(errorIndex);
           errorIndex = -1;
@@ -1524,9 +1527,9 @@ class DataStreamer extends Daemon {
 
   protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
       throws IOException {
-    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
-    long sleeptime = dfsClient.getConf().
-        blockWriteLocateFollowingInitialDelayMs;
+    final DfsClientConf conf = dfsClient.getConf(); 
+    int retries = conf.getNumBlockWriteLocateFollowingRetry();
+    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
     while (true) {
       long localstart = Time.monotonicNow();
       while (true) {
@@ -1674,7 +1677,8 @@ class DataStreamer extends Daemon {
 
   private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
     return CacheBuilder.newBuilder().expireAfterWrite(
-        dfsClient.getConf().excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
+        dfsClient.getConf().getExcludedNodesCacheExpiry(),
+        TimeUnit.MILLISECONDS)
         .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
           @Override
           public void onRemoval(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 3edab48..21f5107 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -160,12 +160,12 @@ public class DistributedFileSystem extends FileSystem {
 
   @Override
   public long getDefaultBlockSize() {
-    return dfs.getDefaultBlockSize();
+    return dfs.getConf().getDefaultBlockSize();
   }
 
   @Override
   public short getDefaultReplication() {
-    return dfs.getDefaultReplication();
+    return dfs.getConf().getDefaultReplication();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
index 3e0abce..511bddb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
@@ -225,8 +225,9 @@ class LeaseRenewer {
     dfsclients.add(dfsc);
 
     //update renewal time
-    if (dfsc.getHdfsTimeout() > 0) {
-      final long half = dfsc.getHdfsTimeout()/2;
+    final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
+    if (hdfsTimeout > 0) {
+      final long half = hdfsTimeout/2;
       if (half < renewal) {
         this.renewal = half;
       }
@@ -368,14 +369,12 @@ class LeaseRenewer {
     }
 
     //update renewal time
-    if (renewal == dfsc.getHdfsTimeout()/2) {
+    if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
       long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
       for(DFSClient c : dfsclients) {
-        if (c.getHdfsTimeout() > 0) {
-          final long timeout = c.getHdfsTimeout();
-          if (timeout < min) {
-            min = timeout;
-          }
+        final int timeout = c.getConf().getHdfsTimeout();
+        if (timeout > 0 && timeout < min) {
+          min = timeout;
         }
       }
       renewal = min/2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffe48015/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index ec2223f..5a929fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -40,8 +40,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -178,12 +178,12 @@ public class NameNodeProxies {
           UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
     } else {
       // HA case
-      Conf config = new Conf(conf);
+      DfsClientConf config = new DfsClientConf(conf);
       T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
           RetryPolicies.failoverOnNetworkException(
-              RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
-              config.maxRetryAttempts, config.failoverSleepBaseMillis,
-              config.failoverSleepMaxMillis));
+              RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
+              config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
+              config.getFailoverSleepMaxMillis()));
 
       Text dtService;
       if (failoverProxyProvider.useLogicalURI()) {


Mime
View raw message