hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1567720 [1/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/server/common/ src/main/java/org/apache/had...
Date Wed, 12 Feb 2014 19:08:53 GMT
Author: cmccabe
Date: Wed Feb 12 19:08:52 2014
New Revision: 1567720

URL: http://svn.apache.org/r1567720
Log:
HDFS-5810. Unify mmap cache and short-circuit file descriptor cache (cmccabe)

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplicaInfo.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Feb 12 19:08:52 2014
@@ -368,6 +368,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5929. Add blockpool % usage to HDFS federated nn page.
     (Siqi Li via suresh)
 
+    HDFS-5810. Unify mmap cache and short-circuit file descriptor cache
+    (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Wed Feb 12 19:08:52 2014
@@ -23,7 +23,6 @@ import java.util.EnumSet;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
@@ -97,6 +96,5 @@ public interface BlockReader extends Byt
    * @return              The ClientMmap object, or null if mmap is not
    *                      supported.
    */
-  ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager);
+  ClientMmap getClientMmap(EnumSet<ReadOption> opts);
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Wed Feb 12 19:08:52 2014
@@ -24,218 +24,750 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 
 /** 
  * Utility class to create BlockReader implementations.
  */
 @InterfaceAudience.Private
-public class BlockReaderFactory {
+public class BlockReaderFactory implements ShortCircuitReplicaCreator {
+  static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
+
+  @VisibleForTesting
+  static ShortCircuitReplicaCreator
+      createShortCircuitReplicaInfoCallback = null;
+
+  private final DFSClient.Conf conf;
+
+  /**
+   * The file name, for logging and debugging purposes.
+   */
+  private String fileName;
+
+  /**
+   * The block ID and block pool ID to use.
+   */
+  private ExtendedBlock block;
+
+  /**
+   * The block token to use for security purposes.
+   */
+  private Token<BlockTokenIdentifier> token;
+
+  /**
+   * The offset within the block to start reading at.
+   */
+  private long startOffset;
+
+  /**
+   * If false, we won't try to verify the block checksum.
+   */
+  private boolean verifyChecksum;
+
+  /**
+   * The name of this client.
+   */
+  private String clientName; 
+
+  /**
+   * The DataNode we're talking to.
+   */
+  private DatanodeInfo datanode;
+
+  /**
+   * If false, we won't try short-circuit local reads.
+   */
+  private boolean allowShortCircuitLocalReads;
+
+  /**
+   * The ClientContext to use for things like the PeerCache.
+   */
+  private ClientContext clientContext;
+
+  /**
+   * Number of bytes to read.  -1 indicates no limit.
+   */
+  private long length = -1;
+
+  /**
+   * Caching strategy to use when reading the block.
+   */
+  private CachingStrategy cachingStrategy;
+
+  /**
+   * Socket address to use to connect to peer.
+   */
+  private InetSocketAddress inetSocketAddress;
+
+  /**
+   * Remote peer factory to use to create a peer, if needed.
+   */
+  private RemotePeerFactory remotePeerFactory;
+
+  /**
+   * UserGroupInformation  to use for legacy block reader local objects, if needed.
+   */
+  private UserGroupInformation userGroupInformation;
+
+  /**
+   * Configuration to use for legacy block reader local objects, if needed.
+   */
+  private Configuration configuration;
+
+  /**
+   * Information about the domain socket path we should use to connect to the
+   * local peer-- or null if we haven't examined the local domain socket.
+   */
+  private DomainSocketFactory.PathInfo pathInfo;
+
+  /**
+   * The remaining number of times that we'll try to pull a socket out of the
+   * cache.
+   */
+  private int remainingCacheTries;
+
+  public BlockReaderFactory(DFSClient.Conf conf) {
+    this.conf = conf;
+    this.remainingCacheTries = conf.nCachedConnRetry;
+  }
+
+  public BlockReaderFactory setFileName(String fileName) {
+    this.fileName = fileName;
+    return this;
+  }
+
+  public BlockReaderFactory setBlock(ExtendedBlock block) {
+    this.block = block;
+    return this;
+  }
+
+  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.token = token;
+    return this;
+  }
+
+  public BlockReaderFactory setStartOffset(long startOffset) {
+    this.startOffset = startOffset;
+    return this;
+  }
+
+  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
+    this.verifyChecksum = verifyChecksum;
+    return this;
+  }
+
+  public BlockReaderFactory setClientName(String clientName) {
+    this.clientName = clientName;
+    return this;
+  }
+
+  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
+    this.datanode = datanode;
+    return this;
+  }
+
+  public BlockReaderFactory setAllowShortCircuitLocalReads(
+      boolean allowShortCircuitLocalReads) {
+    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
+    return this;
+  }
+
+  public BlockReaderFactory setClientCacheContext(
+      ClientContext clientContext) {
+    this.clientContext = clientContext;
+    return this;
+  }
+
+  public BlockReaderFactory setLength(long length) {
+    this.length = length;
+    return this;
+  }
+
+  public BlockReaderFactory setCachingStrategy(
+      CachingStrategy cachingStrategy) {
+    this.cachingStrategy = cachingStrategy;
+    return this;
+  }
+
+  public BlockReaderFactory setInetSocketAddress (
+      InetSocketAddress inetSocketAddress) {
+    this.inetSocketAddress = inetSocketAddress;
+    return this;
+  }
+
+  public BlockReaderFactory setUserGroupInformation(
+      UserGroupInformation userGroupInformation) {
+    this.userGroupInformation = userGroupInformation;
+    return this;
+  }
+
+  public BlockReaderFactory setRemotePeerFactory(
+      RemotePeerFactory remotePeerFactory) {
+    this.remotePeerFactory = remotePeerFactory;
+    return this;
+  }
+
+  public BlockReaderFactory setConfiguration(
+      Configuration configuration) {
+    this.configuration = configuration;
+    return this;
+  }
+
   /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   * 
-   * @param conf the DFSClient configuration
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read, or -1 to read as many as
-   *             possible.
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   *                    Ignored except on the legacy BlockReader.
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name.  Used for log messages.
-   * @param peer  The peer
-   * @param datanodeID  The datanode that the Peer is connected to
-   * @param domainSocketFactory  The DomainSocketFactory to notify if the Peer
-   *                             is a DomainPeer which turns out to be faulty.
-   *                             If null, no factory will be notified in this
-   *                             case.
-   * @param allowShortCircuitLocalReads  True if short-circuit local reads
-   *                                     should be allowed.
-   * @return New BlockReader instance
-   */
-  public static BlockReader newBlockReader(DFSClient.Conf conf,
-                                     String file,
-                                     ExtendedBlock block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     boolean verifyChecksum,
-                                     String clientName,
-                                     Peer peer,
-                                     DatanodeID datanodeID,
-                                     DomainSocketFactory domSockFactory,
-                                     PeerCache peerCache,
-                                     FileInputStreamCache fisCache,
-                                     boolean allowShortCircuitLocalReads,
-                                     CachingStrategy cachingStrategy)
-  throws IOException {
-    peer.setReadTimeout(conf.socketTimeout);
-    peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
-
-    if (peer.getDomainSocket() != null) {
-      if (allowShortCircuitLocalReads && !conf.useLegacyBlockReaderLocal) {
-        // If this is a domain socket, and short-circuit local reads are 
-        // enabled, try to set up a BlockReaderLocal.
-        BlockReader reader = newShortCircuitBlockReader(conf, file,
-            block, blockToken, startOffset, len, peer, datanodeID,
-            domSockFactory, verifyChecksum, fisCache, cachingStrategy);
+   * Build a BlockReader with the given options.
+   *
+   * This function will do the best it can to create a block reader that meets
+   * all of our requirements.  We prefer short-circuit block readers
+   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
+   * former avoid the overhead of socket communication.  If short-circuit is
+   * unavailable, our next fallback is data transfer over UNIX domain sockets,
+   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
+   * work, we will try to create a remote block reader that operates over TCP
+   * sockets.
+   *
+   * There are a few caches that are important here.
+   *
+   * The ShortCircuitCache stores file descriptor objects which have been passed
+   * from the DataNode. 
+   *
+   * The DomainSocketFactory stores information about UNIX domain socket paths
+   * that we not been able to use in the past, so that we don't waste time
+   * retrying them over and over.  (Like all the caches, it does have a timeout,
+   * though.)
+   *
+   * The PeerCache stores peers that we have used in the past.  If we can reuse
+   * one of these peers, we avoid the overhead of re-opening a socket.  However,
+   * if the socket has been timed out on the remote end, our attempt to reuse
+   * the socket may end with an IOException.  For that reason, we limit our
+   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
+   * that, we create new sockets.  This avoids the problem where a thread tries
+   * to talk to a peer that it hasn't talked to in a while, and has to clean out
+   * every entry in a socket cache full of stale entries.
+   *
+   * @return The new BlockReader.  We will not return null.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  public BlockReader build() throws IOException {
+    BlockReader reader = null;
+
+    Preconditions.checkNotNull(configuration);
+    if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
+      if (clientContext.getUseLegacyBlockReaderLocal()) {
+        reader = getLegacyBlockReaderLocal();
         if (reader != null) {
-          // One we've constructed the short-circuit block reader, we don't
-          // need the socket any more.  So let's return it to the cache.
-          if (peerCache != null) {
-            peerCache.put(datanodeID, peer);
-          } else {
-            IOUtils.cleanup(null, peer);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": returning new legacy block reader local.");
+          }
+          return reader;
+        }
+      } else {
+        reader = getBlockReaderLocal();
+        if (reader != null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": returning new block reader local.");
           }
           return reader;
         }
       }
-      // If this is a domain socket and we couldn't (or didn't want to) set
-      // up a BlockReaderLocal, check that we are allowed to pass data traffic
-      // over the socket before proceeding.
-      if (!conf.domainSocketDataTraffic) {
-        throw new IOException("Because we can't do short-circuit access, " +
-          "and data traffic over domain sockets is disabled, " +
-          "we cannot use this socket to talk to " + datanodeID);
+    }
+    if (conf.domainSocketDataTraffic) {
+      reader = getRemoteBlockReaderFromDomain();
+      if (reader != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": returning new remote block reader using " +
+              "UNIX domain socket on " + pathInfo.getPath());
+        }
+        return reader;
       }
     }
+    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
+        "TCP reads were disabled for testing, but we failed to " +
+        "do a non-TCP read.");
+    return getRemoteBlockReaderFromTcp();
+  }
 
-    if (conf.useLegacyBlockReader) {
-      @SuppressWarnings("deprecation")
-      RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
-          block, blockToken, startOffset, len, conf.ioBufferSize,
-          verifyChecksum, clientName, peer, datanodeID, peerCache,
-          cachingStrategy);
-      return reader;
-    } else {
-      return RemoteBlockReader2.newBlockReader(
-          file, block, blockToken, startOffset, len,
-          verifyChecksum, clientName, peer, datanodeID, peerCache,
-          cachingStrategy);
+  /**
+   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+   * This block reader implements the path-based style of local reads
+   * first introduced in HDFS-2246.
+   */
+  private BlockReader getLegacyBlockReaderLocal() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
+    }
+    if (!DFSClient.isLocalAddress(inetSocketAddress)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
+            "the address " + inetSocketAddress + " is not local");
+      }
+      return null;
+    }
+    if (clientContext.getDisableLegacyBlockReaderLocal()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
+            "disableLegacyBlockReaderLocal is set.");
+      }
+      return null;
+    }
+    IOException ioe = null;
+    try {
+      return BlockReaderLocalLegacy.newBlockReader(conf,
+          userGroupInformation, configuration, fileName, block, token,
+          datanode, startOffset, length);
+    } catch (RemoteException remoteException) {
+      ioe = remoteException.unwrapRemoteException(
+                InvalidToken.class, AccessControlException.class);
+    } catch (IOException e) {
+      ioe = e;
+    }
+    if ((!(ioe instanceof AccessControlException)) &&
+        isSecurityException(ioe)) {
+      // Handle security exceptions.
+      // We do not handle AccessControlException here, since
+      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
+      // that the user is not in dfs.block.local-path-access.user, a condition
+      // which requires us to disable legacy SCR.
+      throw ioe;
+    }
+    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
+        "Disabling legacy local reads.", ioe);
+    clientContext.setDisableLegacyBlockReaderLocal();
+    return null;
+  }
+
+  private BlockReader getBlockReaderLocal() throws InvalidToken {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to construct a BlockReaderLocal " +
+          "for short-circuit reads.");
+    }
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory().
+                      getPathInfo(inetSocketAddress, conf);
+    }
+    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": " + pathInfo + " is not " +
+            "usable for short circuit; giving up on BlockReaderLocal.");
+      }
+      return null;
+    }
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    Key key = new Key(block.getBlockId(), block.getBlockPoolId());
+    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
+    InvalidToken exc = info.getInvalidTokenException();
+    if (exc != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": got InvalidToken exception while trying to " +
+            "construct BlockReaderLocal via " + pathInfo.getPath());
+      }
+      throw exc;
+    }
+    if (info.getReplica() == null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": failed to get ShortCircuitReplica.  " +
+            "Cannot construct BlockReaderLocal via " + pathInfo.getPath());
+      }
+      return null;
+    }
+    return new BlockReaderLocal.Builder(conf).
+        setFilename(fileName).
+        setBlock(block).
+        setStartOffset(startOffset).
+        setShortCircuitReplica(info.getReplica()).
+        setDatanodeID(datanode).
+        setVerifyChecksum(verifyChecksum).
+        setCachingStrategy(cachingStrategy).
+        build();
+  }
+
+  /**
+   * Fetch a pair of short-circuit block descriptors from a local DataNode.
+   *
+   * @return    Null if we could not communicate with the datanode,
+   *            a new ShortCircuitReplicaInfo object otherwise.
+   *            ShortCircuitReplicaInfo objects may contain either an InvalidToken
+   *            exception, or a ShortCircuitReplica object ready to use.
+   */
+  @Override
+  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+    if (createShortCircuitReplicaInfoCallback != null) {
+      ShortCircuitReplicaInfo info =
+        createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
+      if (info != null) return info;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
+    }
+    BlockReaderPeer curPeer;
+    while (true) {
+      curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      try {
+        ShortCircuitReplicaInfo info = requestFileDescriptors(peer);
+        clientContext.getPeerCache().put(datanode, peer);
+        return info;
+      } catch (IOException e) {
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached socket.
+          // These are considered less serious, because the socket may be stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(this + ": closing stale domain peer " + peer, e);
+          }
+          IOUtils.cleanup(LOG, peer);
+        } else {
+          // Handle an I/O error we got when using a newly created socket.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn(this + ": I/O error requesting file descriptors.  " + 
+              "Disabling domain socket " + peer.getDomainSocket(), e);
+          IOUtils.cleanup(LOG, peer);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      }
     }
+    return null;
   }
 
   /**
-   * Create a new short-circuit BlockReader.
-   * 
-   * Here, we ask the DataNode to pass us file descriptors over our
-   * DomainSocket.  If the DataNode declines to do so, we'll return null here;
-   * otherwise, we'll return the BlockReaderLocal.  If the DataNode declines,
-   * this function will inform the DomainSocketFactory that short-circuit local
-   * reads are disabled for this DataNode, so that we don't ask again.
-   * 
-   * @param conf               the configuration.
-   * @param file               the file name. Used in log messages.
-   * @param block              The block object.
-   * @param blockToken         The block token for security.
-   * @param startOffset        The read offset, relative to block head.
-   * @param len                The number of bytes to read, or -1 to read 
-   *                           as many as possible.
-   * @param peer               The peer to use.
-   * @param datanodeID         The datanode that the Peer is connected to.
-   * @param domSockFactory     The DomainSocketFactory to notify if the Peer
-   *                           is a DomainPeer which turns out to be faulty.
-   *                           If null, no factory will be notified in this
-   *                           case.
-   * @param verifyChecksum     True if we should verify the checksums.
-   *                           Note: even if this is true, when
-   *                           DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
-   *                           set or the block is mlocked, we will skip
-   *                           checksums.
-   *
-   * @return                   The BlockReaderLocal, or null if the
-   *                           DataNode declined to provide short-circuit
-   *                           access.
-   * @throws IOException       If there was a communication error.
-   */
-  private static BlockReaderLocal newShortCircuitBlockReader(
-      DFSClient.Conf conf, String file, ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken, long startOffset,
-      long len, Peer peer, DatanodeID datanodeID,
-      DomainSocketFactory domSockFactory, boolean verifyChecksum,
-      FileInputStreamCache fisCache,
-      CachingStrategy cachingStrategy) throws IOException {
+   * Request file descriptors from a DomainPeer.
+   *
+   * @return  A ShortCircuitReplica object if we could communicate with the
+   *          datanode; null, otherwise. 
+   * @throws  IOException If we encountered an I/O exception while communicating
+   *          with the datanode.
+   */
+  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer)
+        throws IOException {
     final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(
-          peer.getOutputStream()));
-    new Sender(out).requestShortCircuitFds(block, blockToken, 1);
-    DataInputStream in =
-        new DataInputStream(peer.getInputStream());
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    new Sender(out).requestShortCircuitFds(block, token, 1);
+    DataInputStream in = new DataInputStream(peer.getInputStream());
     BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
         PBHelper.vintPrefixed(in));
     DomainSocket sock = peer.getDomainSocket();
     switch (resp.getStatus()) {
     case SUCCESS:
-      BlockReaderLocal reader = null;
       byte buf[] = new byte[1];
       FileInputStream fis[] = new FileInputStream[2];
       sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      ShortCircuitReplica replica = null;
       try {
-        reader = new BlockReaderLocal.Builder(conf).
-            setFilename(file).
-            setBlock(block).
-            setStartOffset(startOffset).
-            setStreams(fis).
-            setDatanodeID(datanodeID).
-            setVerifyChecksum(verifyChecksum).
-            setBlockMetadataHeader(
-                BlockMetadataHeader.preadHeader(fis[1].getChannel())).
-            setFileInputStreamCache(fisCache).
-            setCachingStrategy(cachingStrategy).
-            build();
+        Key key = new Key(block.getBlockId(), block.getBlockPoolId());
+        replica = new ShortCircuitReplica(key, fis[0], fis[1],
+            clientContext.getShortCircuitCache(), Time.monotonicNow());
+      } catch (IOException e) {
+        // This indicates an error reading from disk, or a format error.  Since
+        // it's not a socket communication problem, we return null rather than
+        // throwing an exception.
+        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
+        return null;
       } finally {
-        if (reader == null) {
+        if (replica == null) {
           IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
         }
       }
-      return reader;
+      return new ShortCircuitReplicaInfo(replica);
     case ERROR_UNSUPPORTED:
       if (!resp.hasShortCircuitAccessVersion()) {
-        DFSClient.LOG.warn("short-circuit read access is disabled for " +
-            "DataNode " + datanodeID + ".  reason: " + resp.getMessage());
-        domSockFactory.disableShortCircuitForPath(sock.getPath());
+        LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanode + ".  reason: " + resp.getMessage());
+        clientContext.getDomainSocketFactory()
+            .disableShortCircuitForPath(pathInfo.getPath());
       } else {
-        DFSClient.LOG.warn("short-circuit read access for the file " +
-            file + " is disabled for DataNode " + datanodeID +
+        LOG.warn("short-circuit read access for the file " +
+            fileName + " is disabled for DataNode " + datanode +
             ".  reason: " + resp.getMessage());
       }
       return null;
     case ERROR_ACCESS_TOKEN:
       String msg = "access control error while " +
           "attempting to set up short-circuit access to " +
-          file + resp.getMessage();
-      DFSClient.LOG.debug(msg);
-      throw new InvalidBlockTokenException(msg);
+          fileName + resp.getMessage();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(this + ":" + msg);
+      }
+      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
     default:
-      DFSClient.LOG.warn("error while attempting to set up short-circuit " +
-          "access to " + file + ": " + resp.getMessage());
-      domSockFactory.disableShortCircuitForPath(sock.getPath());
+      LOG.warn(this + "unknown response code " + resp.getStatus() + " while " +
+          "attempting to set up short-circuit access. " + resp.getMessage());
+      clientContext.getDomainSocketFactory()
+          .disableShortCircuitForPath(pathInfo.getPath());
       return null;
     }
   }
 
   /**
+   * Get a RemoteBlockReader that communicates over a UNIX domain socket.
+   *
+   * @return The new BlockReader, or null if we failed to create the block
+   * reader.
+   *
+   * @throws InvalidToken    If the block token was invalid.
+   * Potentially other security-related execptions.
+   */
+  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory().
+                      getPathInfo(inetSocketAddress, conf);
+    }
+    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": not trying to create a remote block reader " +
+            "because the UNIX domain socket at " + pathInfo +
+            " is not usable.");
+      }
+      return null;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create a remote block reader from the " +
+          "UNIX domain socket at " + pathInfo.getPath());
+    }
+
+    while (true) {
+      BlockReaderPeer curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      BlockReader blockReader = null;
+      try {
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        IOUtils.cleanup(LOG, peer);
+        if (isSecurityException(ioe)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": got security exception while constructing " +
+                "a remote block reader from the unix domain socket at " +
+                pathInfo.getPath(), ioe);
+          }
+          throw ioe;
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closed potentially stale domain peer " + peer, ioe);
+          }
+        } else {
+          // Handle an I/O error we got when using a newly created domain peer.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn("I/O error constructing remote block reader.  Disabling " +
+              "domain socket " + peer.getDomainSocket(), ioe);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtils.cleanup(LOG, peer);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a RemoteBlockReader that communicates over a TCP socket.
+   *
+   * @return The new BlockReader.  We will not return null, but instead throw
+   *         an exception if this fails.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create a remote block reader from a " +
+          "TCP socket");
+    }
+    BlockReader blockReader = null;
+    while (true) {
+      BlockReaderPeer curPeer = null;
+      Peer peer = null;
+      try {
+        curPeer = nextTcpPeer();
+        if (curPeer == null) break;
+        peer = curPeer.peer;
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        if (isSecurityException(ioe)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": got security exception while constructing " +
+                "a remote block reader from " + peer, ioe);
+          }
+          throw ioe;
+        }
+        if ((curPeer != null) && curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be
+          // stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closed potentially stale remote peer " + peer, ioe);
+          }
+        } else {
+          // Handle an I/O error we got when using a newly created peer.
+          LOG.warn("I/O error constructing remote block reader.", ioe);
+          throw ioe;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtils.cleanup(LOG, peer);
+        }
+      }
+    }
+    return null;
+  }
+
+  private class BlockReaderPeer {
+    final Peer peer;
+    final boolean fromCache;
+    
+    BlockReaderPeer(Peer peer, boolean fromCache) {
+      this.peer = peer;
+      this.fromCache = fromCache;
+    }
+  }
+
+  /**
+   * Get the next DomainPeer-- either from the cache or by creating it.
+   *
+   * @return the next DomainPeer, or null if we could not construct one.
+   */
+  private BlockReaderPeer nextDomainPeer() {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, true);
+      if (peer != null) {
+        remainingCacheTries--;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("nextDomainPeer: reusing existing peer " + peer);
+        }
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    DomainSocket sock = clientContext.getDomainSocketFactory().
+        createSocket(pathInfo, conf.socketTimeout);
+    if (sock == null) return null;
+    return new BlockReaderPeer(new DomainPeer(sock), false);
+  }
+
+  /**
+   * Get the next TCP-based peer-- either from the cache or by creating it.
+   *
+   * @return the next Peer, or null if we could not construct one.
+   *
+   * @throws IOException  If there was an error while constructing the peer
+   *                      (such as an InvalidEncryptionKeyException)
+   */
+  private BlockReaderPeer nextTcpPeer() throws IOException {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, false);
+      if (peer != null) {
+        remainingCacheTries--;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("nextTcpPeer: reusing existing peer " + peer);
+        }
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    try {
+      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
+      }
+      return new BlockReaderPeer(peer, false);
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
+                  "connected to " + datanode);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Determine if an exception is security-related.
+   *
+   * We need to handle these exceptions differently than other IOExceptions.
+   * They don't indicate a communication problem.  Instead, they mean that there
+   * is some action the client needs to take, such as refetching block tokens,
+   * renewing encryption keys, etc.
+   *
+   * @param ioe    The exception
+   * @return       True only if the exception is security-related.
+   */
+  private static boolean isSecurityException(IOException ioe) {
+    return (ioe instanceof InvalidToken) ||
+            (ioe instanceof InvalidEncryptionKeyException) ||
+            (ioe instanceof InvalidBlockTokenException) ||
+            (ioe instanceof AccessControlException);
+  }
+
+  @SuppressWarnings("deprecation")
+  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+    if (conf.useLegacyBlockReader) {
+      return RemoteBlockReader.newBlockReader(fileName,
+          block, token, startOffset, length, conf.ioBufferSize,
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy);
+    } else {
+      return RemoteBlockReader2.newBlockReader(
+          fileName, block, token, startOffset, length,
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
+  }
+
+  /**
    * File name to print when accessing a block directly (from servlets)
    * @param s Address of the block location
    * @param poolId Block pool ID of the block
@@ -246,23 +778,4 @@ public class BlockReaderFactory {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
-
-  /**
-   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
-   * This block reader implements the path-based style of local reads
-   * first introduced in HDFS-2246.
-   */
-  static BlockReader getLegacyBlockReaderLocal(DFSClient dfsClient,
-      String src, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
-      long offsetIntoBlock) throws InvalidToken, IOException {
-    try {
-      final long length = blk.getNumBytes() - offsetIntoBlock;
-      return BlockReaderLocalLegacy.newBlockReader(dfsClient, src, blk,
-          accessToken, chosenNode, offsetIntoBlock, length);
-    } catch (RemoteException re) {
-      throw re.unwrapRemoteException(InvalidToken.class,
-          AccessControlException.class);
-    }
-  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Feb 12 19:08:52 2014
@@ -28,8 +28,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
@@ -67,12 +68,10 @@ class BlockReaderLocal implements BlockR
     private boolean verifyChecksum;
     private int maxReadahead;
     private String filename;
-    private FileInputStream streams[];
+    private ShortCircuitReplica replica;
     private long dataPos;
     private DatanodeID datanodeID;
-    private FileInputStreamCache fisCache;
     private boolean mlocked;
-    private BlockMetadataHeader header;
     private ExtendedBlock block;
 
     public Builder(Conf conf) {
@@ -99,8 +98,8 @@ class BlockReaderLocal implements BlockR
       return this;
     }
 
-    public Builder setStreams(FileInputStream streams[]) {
-      this.streams = streams;
+    public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
+      this.replica = replica;
       return this;
     }
 
@@ -114,30 +113,18 @@ class BlockReaderLocal implements BlockR
       return this;
     }
 
-    public Builder setFileInputStreamCache(FileInputStreamCache fisCache) {
-      this.fisCache = fisCache;
-      return this;
-    }
-
     public Builder setMlocked(boolean mlocked) {
       this.mlocked = mlocked;
       return this;
     }
 
-    public Builder setBlockMetadataHeader(BlockMetadataHeader header) {
-      this.header = header;
-      return this;
-    }
-
     public Builder setBlock(ExtendedBlock block) {
       this.block = block;
       return this;
     }
 
     public BlockReaderLocal build() {
-      Preconditions.checkNotNull(streams);
-      Preconditions.checkArgument(streams.length == 2);
-      Preconditions.checkNotNull(header);
+      Preconditions.checkNotNull(replica);
       return new BlockReaderLocal(this);
     }
   }
@@ -147,7 +134,7 @@ class BlockReaderLocal implements BlockR
   /**
    * Pair of streams for this block.
    */
-  private final FileInputStream streams[];
+  private final ShortCircuitReplica replica;
 
   /**
    * The data FileChannel.
@@ -208,12 +195,6 @@ class BlockReaderLocal implements BlockR
   private int checksumSize;
 
   /**
-   * FileInputStream cache to return the streams to upon closing,
-   * or null if we should just close them unconditionally.
-   */
-  private final FileInputStreamCache fisCache;
-
-  /**
    * Maximum number of chunks to allocate.
    *
    * This is used to allocate dataBuf and checksumBuf, in the event that
@@ -257,20 +238,18 @@ class BlockReaderLocal implements BlockR
    */
   private ByteBuffer checksumBuf;
 
-  private boolean mmapDisabled = false;
-
   private BlockReaderLocal(Builder builder) {
-    this.streams = builder.streams;
-    this.dataIn = builder.streams[0].getChannel();
+    this.replica = builder.replica;
+    this.dataIn = replica.getDataStream().getChannel();
     this.dataPos = builder.dataPos;
-    this.checksumIn = builder.streams[1].getChannel();
-    this.checksum = builder.header.getChecksum();
+    this.checksumIn = replica.getMetaStream().getChannel();
+    BlockMetadataHeader header = builder.replica.getMetaHeader();
+    this.checksum = header.getChecksum();
     this.verifyChecksum = builder.verifyChecksum &&
         (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
     this.mlocked = new AtomicBoolean(builder.mlocked);
     this.filename = builder.filename;
     this.datanodeID = builder.datanodeID;
-    this.fisCache = builder.fisCache;
     this.block = builder.block;
     this.bytesPerChecksum = checksum.getBytesPerChecksum();
     this.checksumSize = checksum.getChecksumSize();
@@ -642,20 +621,7 @@ class BlockReaderLocal implements BlockR
     if (LOG.isTraceEnabled()) {
       LOG.trace("close(filename=" + filename + ", block=" + block + ")");
     }
-    if (clientMmap != null) {
-      clientMmap.unref();
-      clientMmap = null;
-    }
-    if (fisCache != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("putting FileInputStream for " + filename +
-            " back into FileInputStreamCache");
-      }
-      fisCache.put(datanodeID, block, streams);
-    } else {
-      LOG.debug("closing FileInputStream for " + filename);
-      IOUtils.cleanup(LOG, dataIn, checksumIn);
-    }
+    replica.unref();
     freeDataBufIfExists();
     freeChecksumBufIfExists();
   }
@@ -683,8 +649,7 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public synchronized ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) &&
           verifyChecksum && (!mlocked.get())) {
       if (LOG.isTraceEnabled()) {
@@ -694,27 +659,7 @@ class BlockReaderLocal implements BlockR
       }
       return null;
     }
-    if (clientMmap == null) {
-      if (mmapDisabled) {
-        return null;
-      }
-      try {
-        clientMmap = mmapManager.fetch(datanodeID, block, streams[0]);
-        if (clientMmap == null) {
-          mmapDisabled = true;
-          return null;
-        }
-      } catch (InterruptedException e) {
-        LOG.error("Interrupted while setting up mmap for " + filename, e);
-        Thread.currentThread().interrupt();
-        return null;
-      } catch (IOException e) {
-        LOG.error("unable to set up mmap for " + filename, e);
-        mmapDisabled = true;
-        return null;
-      }
-    }
-    return clientMmap;
+    return replica.getOrCreateClientMmap();
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Wed Feb 12 19:08:52 2014
@@ -31,7 +31,6 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -175,19 +174,21 @@ class BlockReaderLocalLegacy implements 
   /**
    * The only way this object can be instantiated.
    */
-  static BlockReaderLocalLegacy newBlockReader(DFSClient dfsClient,
-      String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token,
-      DatanodeInfo node, long startOffset, long length)
-      throws IOException {
-    final DFSClient.Conf conf = dfsClient.getConf();
-
+  static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
+      UserGroupInformation userGroupInformation,
+      Configuration configuration, String file, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> token, DatanodeInfo node, 
+      long startOffset, long length) throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
     BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
     if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(dfsClient.ugi, blk, node,
-          dfsClient.getConfiguration(), dfsClient.getHdfsTimeout(), token,
+      if (userGroupInformation == null) {
+        userGroupInformation = UserGroupInformation.getCurrentUser();
+      }
+      pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
+          configuration, conf.hdfsTimeout, token,
           conf.connectToDnViaHostname);
     }
 
@@ -708,8 +709,7 @@ class BlockReaderLocalLegacy implements 
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java?rev=1567720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java Wed Feb 12 19:08:52 2014
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * ClientContext contains context information for a client.
+ * 
+ * This allows us to share caches such as the socket cache across
+ * DFSClient instances.
+ */
+@InterfaceAudience.Private
+public class ClientContext {
+  private static final Log LOG = LogFactory.getLog(ClientContext.class);
+
+  /**
+   * Global map of context names to caches contexts.
+   */
+  private final static HashMap<String, ClientContext> CACHES =
+      new HashMap<String, ClientContext>();
+
+  /**
+   * Name of context.
+   */
+  private final String name;
+
+  /**
+   * String representation of the configuration.
+   */
+  private final String confString;
+
+  /**
+   * Caches short-circuit file descriptors, mmap regions.
+   */
+  private final ShortCircuitCache shortCircuitCache;
+
+  /**
+   * Caches TCP and UNIX domain sockets for reuse.
+   */
+  private final PeerCache peerCache;
+
+  /**
+   * Stores information about socket paths.
+   */
+  private final DomainSocketFactory domainSocketFactory;
+
+  /**
+   * True if we should use the legacy BlockReaderLocal.
+   */
+  private final boolean useLegacyBlockReaderLocal;
+
+  /**
+   * True if the legacy BlockReaderLocal is disabled.
+   *
+   * The legacy block reader local gets disabled completely whenever there is an
+   * error or miscommunication.  The new block reader local code handles this
+   * case more gracefully inside DomainSocketFactory.
+   */
+  private volatile boolean disableLegacyBlockReaderLocal = false;
+
+  /**
+   * Whether or not we complained about a DFSClient fetching a CacheContext that
+   * didn't match its config values yet.
+   */
+  private boolean printedConfWarning = false;
+
+  private ClientContext(String name, Conf conf) {
+    this.name = name;
+    this.confString = confAsString(conf);
+    this.shortCircuitCache = new ShortCircuitCache(
+        conf.shortCircuitStreamsCacheSize,
+        conf.shortCircuitStreamsCacheExpiryMs,
+        conf.shortCircuitMmapCacheSize,
+        conf.shortCircuitMmapCacheExpiryMs,
+        conf.shortCircuitMmapCacheRetryTimeout,
+        conf.shortCircuitCacheStaleThresholdMs);
+    this.peerCache =
+          new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
+    this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
+    this.domainSocketFactory = new DomainSocketFactory(conf);
+  }
+
+  public static String confAsString(Conf conf) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("shortCircuitStreamsCacheSize = ").
+      append(conf.shortCircuitStreamsCacheSize).
+      append(", shortCircuitStreamsCacheExpiryMs = ").
+      append(conf.shortCircuitStreamsCacheExpiryMs).
+      append(", shortCircuitMmapCacheSize = ").
+      append(conf.shortCircuitMmapCacheSize).
+      append(", shortCircuitMmapCacheExpiryMs = ").
+      append(conf.shortCircuitMmapCacheExpiryMs).
+      append(", shortCircuitMmapCacheRetryTimeout = ").
+      append(conf.shortCircuitMmapCacheRetryTimeout).
+      append(", shortCircuitCacheStaleThresholdMs = ").
+      append(conf.shortCircuitCacheStaleThresholdMs).
+      append(", socketCacheCapacity = ").
+      append(conf.socketCacheCapacity).
+      append(", socketCacheExpiry = ").
+      append(conf.socketCacheExpiry).
+      append(", shortCircuitLocalReads = ").
+      append(conf.shortCircuitLocalReads).
+      append(", useLegacyBlockReaderLocal = ").
+      append(conf.useLegacyBlockReaderLocal).
+      append(", domainSocketDataTraffic = ").
+      append(conf.domainSocketDataTraffic);
+
+    return builder.toString();
+  }
+
+  public static ClientContext get(String name, Conf conf) {
+    ClientContext context;
+    synchronized(ClientContext.class) {
+      context = CACHES.get(name);
+      if (context == null) {
+        context = new ClientContext(name, conf);
+        CACHES.put(name, context);
+      } else {
+        context.printConfWarningIfNeeded(conf);
+      }
+    }
+    return context;
+  }
+
+  /**
+   * Get a client context, from a Configuration object.
+   *
+   * This method is less efficient than the version which takes a DFSClient#Conf
+   * object, and should be mostly used by tests.
+   */
+  @VisibleForTesting
+  public static ClientContext getFromConf(Configuration conf) {
+    return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+        DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
+            new DFSClient.Conf(conf));
+  }
+
+  private void printConfWarningIfNeeded(Conf conf) {
+    String existing = this.getConfString();
+    String requested = confAsString(conf);
+    if (!existing.equals(requested)) {
+      if (!printedConfWarning) {
+        printedConfWarning = true;
+        LOG.warn("Existing client context '" + name + "' does not match " +
+            "requested configuration.  Existing: " + existing + 
+            ", Requested: " + requested);
+      }
+    }
+  }
+
+  public String getConfString() {
+    return confString;
+  }
+
+  public ShortCircuitCache getShortCircuitCache() {
+    return shortCircuitCache;
+  }
+
+  public PeerCache getPeerCache() {
+    return peerCache;
+  }
+
+  public boolean getUseLegacyBlockReaderLocal() {
+    return useLegacyBlockReaderLocal;
+  }
+
+  public boolean getDisableLegacyBlockReaderLocal() {
+    return disableLegacyBlockReaderLocal;
+  }
+
+  public void setDisableLegacyBlockReaderLocal() {
+    disableLegacyBlockReaderLocal = true;
+  }
+
+  public DomainSocketFactory getDomainSocketFactory() {
+    return domainSocketFactory;
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Feb 12 19:08:52 2014
@@ -56,6 +56,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -108,9 +110,10 @@ import org.apache.hadoop.fs.RemoteIterat
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
@@ -189,7 +192,7 @@ import com.google.common.net.InetAddress
  *
  ********************************************************/
 @InterfaceAudience.Private
-public class DFSClient implements java.io.Closeable {
+public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -210,50 +213,13 @@ public class DFSClient implements java.i
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   private final String authority;
-  final PeerCache peerCache;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
-  private boolean shouldUseLegacyBlockReaderLocal;
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
-  private ClientMmapManager mmapManager;
+  private final ClientContext clientContext;
   
-  private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
-      new ClientMmapManagerFactory();
-
-  private static final class ClientMmapManagerFactory {
-    private ClientMmapManager mmapManager = null;
-    /**
-     * Tracks the number of users of mmapManager.
-     */
-    private int refcnt = 0;
-
-    synchronized ClientMmapManager get(Configuration conf) {
-      if (refcnt++ == 0) {
-        mmapManager = ClientMmapManager.fromConf(conf);
-      } else {
-        String mismatches = mmapManager.verifyConfigurationMatches(conf);
-        if (!mismatches.isEmpty()) {
-          LOG.warn("The ClientMmapManager settings you specified " +
-            "have been ignored because another thread created the " +
-            "ClientMmapManager first.  " + mismatches);
-        }
-      }
-      return mmapManager;
-    }
-    
-    synchronized void unref(ClientMmapManager mmapManager) {
-      if (this.mmapManager != mmapManager) {
-        throw new IllegalArgumentException();
-      }
-      if (--refcnt == 0) {
-        IOUtils.cleanup(LOG, mmapManager);
-        mmapManager = null;
-      }
-    }
-  }
-
   /**
    * DFSClient configuration 
    */
@@ -298,6 +264,11 @@ public class DFSClient implements java.i
     final boolean domainSocketDataTraffic;
     final int shortCircuitStreamsCacheSize;
     final long shortCircuitStreamsCacheExpiryMs; 
+    
+    final int shortCircuitMmapCacheSize;
+    final long shortCircuitMmapCacheExpiryMs;
+    final long shortCircuitMmapCacheRetryTimeout;
+    final long shortCircuitCacheStaleThresholdMs;
 
     public Conf(Configuration conf) {
       // The hdfsTimeout is currently the same as the ipc timeout 
@@ -414,6 +385,18 @@ public class DFSClient implements java.i
       shortCircuitStreamsCacheExpiryMs = conf.getLong(
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
+      shortCircuitMmapCacheSize = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
+      shortCircuitMmapCacheExpiryMs = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
+      shortCircuitMmapCacheRetryTimeout = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
+          DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
+      shortCircuitCacheStaleThresholdMs = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
+          DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -462,11 +445,11 @@ public class DFSClient implements java.i
   public Conf getConf() {
     return dfsClientConf;
   }
-  
+
   Configuration getConfiguration() {
     return conf;
   }
-  
+
   /**
    * A map from file names to {@link DFSOutputStream} objects
    * that are currently being written by this client.
@@ -475,8 +458,6 @@ public class DFSClient implements java.i
   private final Map<String, DFSOutputStream> filesBeingWritten
       = new HashMap<String, DFSOutputStream>();
 
-  private final DomainSocketFactory domainSocketFactory;
-  
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -524,8 +505,6 @@ public class DFSClient implements java.i
     throws IOException {
     // Copy only the required DFSClient configuration
     this.dfsClientConf = new Conf(conf);
-    this.shouldUseLegacyBlockReaderLocal = 
-        this.dfsClientConf.useLegacyBlockReaderLocal;
     if (this.dfsClientConf.useLegacyBlockReaderLocal) {
       LOG.debug("Using legacy short-circuit local reads.");
     }
@@ -570,9 +549,6 @@ public class DFSClient implements java.i
       this.namenode = proxyInfo.getProxy();
     }
 
-    // read directly from the block file if configured.
-    this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
-
     String localInterfaces[] =
       conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
     localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -582,7 +558,6 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
     Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
         null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
     Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
@@ -593,7 +568,9 @@ public class DFSClient implements java.i
         new CachingStrategy(readDropBehind, readahead);
     this.defaultWriteCachingStrategy =
         new CachingStrategy(writeDropBehind, readahead);
-    this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
+    this.clientContext = ClientContext.get(
+        conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
+        dfsClientConf);
   }
   
   /**
@@ -798,10 +775,6 @@ public class DFSClient implements java.i
   
   /** Abort and release resources held.  Ignore all errors. */
   void abort() {
-    if (mmapManager != null) {
-      MMAP_MANAGER_FACTORY.unref(mmapManager);
-      mmapManager = null;
-    }
     clientRunning = false;
     closeAllFilesBeingWritten(true);
     try {
@@ -847,10 +820,6 @@ public class DFSClient implements java.i
    */
   @Override
   public synchronized void close() throws IOException {
-    if (mmapManager != null) {
-      MMAP_MANAGER_FACTORY.unref(mmapManager);
-      mmapManager = null;
-    }
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
       clientRunning = false;
@@ -2620,18 +2589,6 @@ public class DFSClient implements java.i
         + ", ugi=" + ugi + "]"; 
   }
 
-  public DomainSocketFactory getDomainSocketFactory() {
-    return domainSocketFactory;
-  }
-
-  public void disableLegacyBlockReaderLocal() {
-    shouldUseLegacyBlockReaderLocal = false;
-  }
-
-  public boolean useLegacyBlockReaderLocal() {
-    return shouldUseLegacyBlockReaderLocal;
-  }
-
   public CachingStrategy getDefaultReadCachingStrategy() {
     return defaultReadCachingStrategy;
   }
@@ -2640,8 +2597,29 @@ public class DFSClient implements java.i
     return defaultWriteCachingStrategy;
   }
 
-  @VisibleForTesting
-  public ClientMmapManager getMmapManager() {
-    return mmapManager;
+  public ClientContext getClientContext() {
+    return clientContext;
+  }
+
+  @Override // RemotePeerFactory
+  public Peer newConnectedPeer(InetSocketAddress addr) throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    Socket sock = null;
+    try {
+      sock = socketFactory.createSocket();
+      NetUtils.connect(sock, addr,
+        getRandomLocalInterfaceAddr(),
+        dfsClientConf.socketTimeout);
+      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
+          getDataEncryptionKey());
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        IOUtils.cleanup(LOG, peer);
+        IOUtils.closeSocket(sock);
+      }
+    }
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Feb 12 19:08:52 2014
@@ -59,6 +59,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
   public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
   public static final String  DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
+  public static final String  DFS_CLIENT_CONTEXT = "dfs.client.context";
+  public static final String  DFS_CLIENT_CONTEXT_DEFAULT = "default";
   public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
   public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
   public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
@@ -416,18 +418,20 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
-  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 100;
+  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 256;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
-  public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5000;
+  public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5 * 60 * 1000;
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
   public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
   public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
   public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
-  public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
+  public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 256;
   public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
-  public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT  = 15 * 60 * 1000;
-  public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.thread.runs.per.timeout";
-  public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT  = 4;
+  public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT  = 60 * 60 * 1000;
+  public static final String DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS = "dfs.client.mmap.retry.timeout.ms";
+  public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000;
+  public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS = "dfs.client.short.circuit.replica.stale.threshold.ms";
+  public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30 * 60 * 1000;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";



Mime
View raw message