hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1567728 [1/2] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ hadoop-hdfs/src/...
Date Wed, 12 Feb 2014 19:24:47 GMT
Author: cmccabe
Date: Wed Feb 12 19:24:46 2014
New Revision: 1567728

URL: http://svn.apache.org/r1567728
Log:
HDFS-5810. Unify mmap cache and short-circuit file descriptor cache (cmccabe)

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
      - copied unchanged from r1567720, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
      - copied unchanged from r1567720, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
      - copied unchanged from r1567720, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
      - copied unchanged from r1567720, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplicaInfo.java
      - copied unchanged from r1567720, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplicaInfo.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
      - copied unchanged from r1567720, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java
      - copied unchanged from r1567720, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java
Removed:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project:r1567720

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1567720

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Feb 12 19:24:46 2014
@@ -30,6 +30,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5929. Add blockpool % usage to HDFS federated nn page.
     (Siqi Li via suresh)
 
+    HDFS-5810. Unify mmap cache and short-circuit file descriptor cache
+    (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1567720

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Wed Feb 12 19:24:46 2014
@@ -23,7 +23,6 @@ import java.util.EnumSet;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
@@ -97,6 +96,5 @@ public interface BlockReader extends Byt
    * @return              The ClientMmap object, or null if mmap is not
    *                      supported.
    */
-  ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager);
+  ClientMmap getClientMmap(EnumSet<ReadOption> opts);
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Wed Feb 12 19:24:46 2014
@@ -24,218 +24,750 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 
 /** 
  * Utility class to create BlockReader implementations.
  */
 @InterfaceAudience.Private
-public class BlockReaderFactory {
+public class BlockReaderFactory implements ShortCircuitReplicaCreator {
+  static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
+
+  @VisibleForTesting
+  static ShortCircuitReplicaCreator
+      createShortCircuitReplicaInfoCallback = null;
+
+  private final DFSClient.Conf conf;
+
+  /**
+   * The file name, for logging and debugging purposes.
+   */
+  private String fileName;
+
+  /**
+   * The block ID and block pool ID to use.
+   */
+  private ExtendedBlock block;
+
+  /**
+   * The block token to use for security purposes.
+   */
+  private Token<BlockTokenIdentifier> token;
+
+  /**
+   * The offset within the block to start reading at.
+   */
+  private long startOffset;
+
+  /**
+   * If false, we won't try to verify the block checksum.
+   */
+  private boolean verifyChecksum;
+
+  /**
+   * The name of this client.
+   */
+  private String clientName; 
+
+  /**
+   * The DataNode we're talking to.
+   */
+  private DatanodeInfo datanode;
+
+  /**
+   * If false, we won't try short-circuit local reads.
+   */
+  private boolean allowShortCircuitLocalReads;
+
+  /**
+   * The ClientContext to use for things like the PeerCache.
+   */
+  private ClientContext clientContext;
+
+  /**
+   * Number of bytes to read.  -1 indicates no limit.
+   */
+  private long length = -1;
+
+  /**
+   * Caching strategy to use when reading the block.
+   */
+  private CachingStrategy cachingStrategy;
+
+  /**
+   * Socket address to use to connect to peer.
+   */
+  private InetSocketAddress inetSocketAddress;
+
+  /**
+   * Remote peer factory to use to create a peer, if needed.
+   */
+  private RemotePeerFactory remotePeerFactory;
+
+  /**
+   * UserGroupInformation  to use for legacy block reader local objects, if needed.
+   */
+  private UserGroupInformation userGroupInformation;
+
+  /**
+   * Configuration to use for legacy block reader local objects, if needed.
+   */
+  private Configuration configuration;
+
+  /**
+   * Information about the domain socket path we should use to connect to the
+   * local peer-- or null if we haven't examined the local domain socket.
+   */
+  private DomainSocketFactory.PathInfo pathInfo;
+
+  /**
+   * The remaining number of times that we'll try to pull a socket out of the
+   * cache.
+   */
+  private int remainingCacheTries;
+
+  public BlockReaderFactory(DFSClient.Conf conf) {
+    this.conf = conf;
+    this.remainingCacheTries = conf.nCachedConnRetry;
+  }
+
+  public BlockReaderFactory setFileName(String fileName) {
+    this.fileName = fileName;
+    return this;
+  }
+
+  public BlockReaderFactory setBlock(ExtendedBlock block) {
+    this.block = block;
+    return this;
+  }
+
+  public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
+    this.token = token;
+    return this;
+  }
+
+  public BlockReaderFactory setStartOffset(long startOffset) {
+    this.startOffset = startOffset;
+    return this;
+  }
+
+  public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
+    this.verifyChecksum = verifyChecksum;
+    return this;
+  }
+
+  public BlockReaderFactory setClientName(String clientName) {
+    this.clientName = clientName;
+    return this;
+  }
+
+  public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
+    this.datanode = datanode;
+    return this;
+  }
+
+  public BlockReaderFactory setAllowShortCircuitLocalReads(
+      boolean allowShortCircuitLocalReads) {
+    this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
+    return this;
+  }
+
+  public BlockReaderFactory setClientCacheContext(
+      ClientContext clientContext) {
+    this.clientContext = clientContext;
+    return this;
+  }
+
+  public BlockReaderFactory setLength(long length) {
+    this.length = length;
+    return this;
+  }
+
+  public BlockReaderFactory setCachingStrategy(
+      CachingStrategy cachingStrategy) {
+    this.cachingStrategy = cachingStrategy;
+    return this;
+  }
+
+  public BlockReaderFactory setInetSocketAddress (
+      InetSocketAddress inetSocketAddress) {
+    this.inetSocketAddress = inetSocketAddress;
+    return this;
+  }
+
+  public BlockReaderFactory setUserGroupInformation(
+      UserGroupInformation userGroupInformation) {
+    this.userGroupInformation = userGroupInformation;
+    return this;
+  }
+
+  public BlockReaderFactory setRemotePeerFactory(
+      RemotePeerFactory remotePeerFactory) {
+    this.remotePeerFactory = remotePeerFactory;
+    return this;
+  }
+
+  public BlockReaderFactory setConfiguration(
+      Configuration configuration) {
+    this.configuration = configuration;
+    return this;
+  }
+
   /**
-   * Create a new BlockReader specifically to satisfy a read.
-   * This method also sends the OP_READ_BLOCK request.
-   * 
-   * @param conf the DFSClient configuration
-   * @param file  File location
-   * @param block  The block object
-   * @param blockToken  The block token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read, or -1 to read as many as
-   *             possible.
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   *                    Ignored except on the legacy BlockReader.
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name.  Used for log messages.
-   * @param peer  The peer
-   * @param datanodeID  The datanode that the Peer is connected to
-   * @param domainSocketFactory  The DomainSocketFactory to notify if the Peer
-   *                             is a DomainPeer which turns out to be faulty.
-   *                             If null, no factory will be notified in this
-   *                             case.
-   * @param allowShortCircuitLocalReads  True if short-circuit local reads
-   *                                     should be allowed.
-   * @return New BlockReader instance
-   */
-  public static BlockReader newBlockReader(DFSClient.Conf conf,
-                                     String file,
-                                     ExtendedBlock block, 
-                                     Token<BlockTokenIdentifier> blockToken,
-                                     long startOffset, long len,
-                                     boolean verifyChecksum,
-                                     String clientName,
-                                     Peer peer,
-                                     DatanodeID datanodeID,
-                                     DomainSocketFactory domSockFactory,
-                                     PeerCache peerCache,
-                                     FileInputStreamCache fisCache,
-                                     boolean allowShortCircuitLocalReads,
-                                     CachingStrategy cachingStrategy)
-  throws IOException {
-    peer.setReadTimeout(conf.socketTimeout);
-    peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
-
-    if (peer.getDomainSocket() != null) {
-      if (allowShortCircuitLocalReads && !conf.useLegacyBlockReaderLocal) {
-        // If this is a domain socket, and short-circuit local reads are 
-        // enabled, try to set up a BlockReaderLocal.
-        BlockReader reader = newShortCircuitBlockReader(conf, file,
-            block, blockToken, startOffset, len, peer, datanodeID,
-            domSockFactory, verifyChecksum, fisCache, cachingStrategy);
+   * Build a BlockReader with the given options.
+   *
+   * This function will do the best it can to create a block reader that meets
+   * all of our requirements.  We prefer short-circuit block readers
+   * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
+   * former avoid the overhead of socket communication.  If short-circuit is
+   * unavailable, our next fallback is data transfer over UNIX domain sockets,
+   * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
+   * work, we will try to create a remote block reader that operates over TCP
+   * sockets.
+   *
+   * There are a few caches that are important here.
+   *
+   * The ShortCircuitCache stores file descriptor objects which have been passed
+   * from the DataNode. 
+   *
+   * The DomainSocketFactory stores information about UNIX domain socket paths
+   * that we not been able to use in the past, so that we don't waste time
+   * retrying them over and over.  (Like all the caches, it does have a timeout,
+   * though.)
+   *
+   * The PeerCache stores peers that we have used in the past.  If we can reuse
+   * one of these peers, we avoid the overhead of re-opening a socket.  However,
+   * if the socket has been timed out on the remote end, our attempt to reuse
+   * the socket may end with an IOException.  For that reason, we limit our
+   * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
+   * that, we create new sockets.  This avoids the problem where a thread tries
+   * to talk to a peer that it hasn't talked to in a while, and has to clean out
+   * every entry in a socket cache full of stale entries.
+   *
+   * @return The new BlockReader.  We will not return null.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  public BlockReader build() throws IOException {
+    BlockReader reader = null;
+
+    Preconditions.checkNotNull(configuration);
+    if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
+      if (clientContext.getUseLegacyBlockReaderLocal()) {
+        reader = getLegacyBlockReaderLocal();
         if (reader != null) {
-          // One we've constructed the short-circuit block reader, we don't
-          // need the socket any more.  So let's return it to the cache.
-          if (peerCache != null) {
-            peerCache.put(datanodeID, peer);
-          } else {
-            IOUtils.cleanup(null, peer);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": returning new legacy block reader local.");
+          }
+          return reader;
+        }
+      } else {
+        reader = getBlockReaderLocal();
+        if (reader != null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": returning new block reader local.");
           }
           return reader;
         }
       }
-      // If this is a domain socket and we couldn't (or didn't want to) set
-      // up a BlockReaderLocal, check that we are allowed to pass data traffic
-      // over the socket before proceeding.
-      if (!conf.domainSocketDataTraffic) {
-        throw new IOException("Because we can't do short-circuit access, " +
-          "and data traffic over domain sockets is disabled, " +
-          "we cannot use this socket to talk to " + datanodeID);
+    }
+    if (conf.domainSocketDataTraffic) {
+      reader = getRemoteBlockReaderFromDomain();
+      if (reader != null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": returning new remote block reader using " +
+              "UNIX domain socket on " + pathInfo.getPath());
+        }
+        return reader;
       }
     }
+    Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
+        "TCP reads were disabled for testing, but we failed to " +
+        "do a non-TCP read.");
+    return getRemoteBlockReaderFromTcp();
+  }
 
-    if (conf.useLegacyBlockReader) {
-      @SuppressWarnings("deprecation")
-      RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
-          block, blockToken, startOffset, len, conf.ioBufferSize,
-          verifyChecksum, clientName, peer, datanodeID, peerCache,
-          cachingStrategy);
-      return reader;
-    } else {
-      return RemoteBlockReader2.newBlockReader(
-          file, block, blockToken, startOffset, len,
-          verifyChecksum, clientName, peer, datanodeID, peerCache,
-          cachingStrategy);
+  /**
+   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+   * This block reader implements the path-based style of local reads
+   * first introduced in HDFS-2246.
+   */
+  private BlockReader getLegacyBlockReaderLocal() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
+    }
+    if (!DFSClient.isLocalAddress(inetSocketAddress)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
+            "the address " + inetSocketAddress + " is not local");
+      }
+      return null;
+    }
+    if (clientContext.getDisableLegacyBlockReaderLocal()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
+            "disableLegacyBlockReaderLocal is set.");
+      }
+      return null;
+    }
+    IOException ioe = null;
+    try {
+      return BlockReaderLocalLegacy.newBlockReader(conf,
+          userGroupInformation, configuration, fileName, block, token,
+          datanode, startOffset, length);
+    } catch (RemoteException remoteException) {
+      ioe = remoteException.unwrapRemoteException(
+                InvalidToken.class, AccessControlException.class);
+    } catch (IOException e) {
+      ioe = e;
+    }
+    if ((!(ioe instanceof AccessControlException)) &&
+        isSecurityException(ioe)) {
+      // Handle security exceptions.
+      // We do not handle AccessControlException here, since
+      // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
+      // that the user is not in dfs.block.local-path-access.user, a condition
+      // which requires us to disable legacy SCR.
+      throw ioe;
+    }
+    LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
+        "Disabling legacy local reads.", ioe);
+    clientContext.setDisableLegacyBlockReaderLocal();
+    return null;
+  }
+
+  private BlockReader getBlockReaderLocal() throws InvalidToken {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to construct a BlockReaderLocal " +
+          "for short-circuit reads.");
+    }
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory().
+                      getPathInfo(inetSocketAddress, conf);
+    }
+    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": " + pathInfo + " is not " +
+            "usable for short circuit; giving up on BlockReaderLocal.");
+      }
+      return null;
+    }
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    Key key = new Key(block.getBlockId(), block.getBlockPoolId());
+    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
+    InvalidToken exc = info.getInvalidTokenException();
+    if (exc != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": got InvalidToken exception while trying to " +
+            "construct BlockReaderLocal via " + pathInfo.getPath());
+      }
+      throw exc;
+    }
+    if (info.getReplica() == null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": failed to get ShortCircuitReplica.  " +
+            "Cannot construct BlockReaderLocal via " + pathInfo.getPath());
+      }
+      return null;
+    }
+    return new BlockReaderLocal.Builder(conf).
+        setFilename(fileName).
+        setBlock(block).
+        setStartOffset(startOffset).
+        setShortCircuitReplica(info.getReplica()).
+        setDatanodeID(datanode).
+        setVerifyChecksum(verifyChecksum).
+        setCachingStrategy(cachingStrategy).
+        build();
+  }
+
+  /**
+   * Fetch a pair of short-circuit block descriptors from a local DataNode.
+   *
+   * @return    Null if we could not communicate with the datanode,
+   *            a new ShortCircuitReplicaInfo object otherwise.
+   *            ShortCircuitReplicaInfo objects may contain either an InvalidToken
+   *            exception, or a ShortCircuitReplica object ready to use.
+   */
+  @Override
+  public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+    if (createShortCircuitReplicaInfoCallback != null) {
+      ShortCircuitReplicaInfo info =
+        createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
+      if (info != null) return info;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
+    }
+    BlockReaderPeer curPeer;
+    while (true) {
+      curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      try {
+        ShortCircuitReplicaInfo info = requestFileDescriptors(peer);
+        clientContext.getPeerCache().put(datanode, peer);
+        return info;
+      } catch (IOException e) {
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached socket.
+          // These are considered less serious, because the socket may be stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(this + ": closing stale domain peer " + peer, e);
+          }
+          IOUtils.cleanup(LOG, peer);
+        } else {
+          // Handle an I/O error we got when using a newly created socket.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn(this + ": I/O error requesting file descriptors.  " + 
+              "Disabling domain socket " + peer.getDomainSocket(), e);
+          IOUtils.cleanup(LOG, peer);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      }
     }
+    return null;
   }
 
   /**
-   * Create a new short-circuit BlockReader.
-   * 
-   * Here, we ask the DataNode to pass us file descriptors over our
-   * DomainSocket.  If the DataNode declines to do so, we'll return null here;
-   * otherwise, we'll return the BlockReaderLocal.  If the DataNode declines,
-   * this function will inform the DomainSocketFactory that short-circuit local
-   * reads are disabled for this DataNode, so that we don't ask again.
-   * 
-   * @param conf               the configuration.
-   * @param file               the file name. Used in log messages.
-   * @param block              The block object.
-   * @param blockToken         The block token for security.
-   * @param startOffset        The read offset, relative to block head.
-   * @param len                The number of bytes to read, or -1 to read 
-   *                           as many as possible.
-   * @param peer               The peer to use.
-   * @param datanodeID         The datanode that the Peer is connected to.
-   * @param domSockFactory     The DomainSocketFactory to notify if the Peer
-   *                           is a DomainPeer which turns out to be faulty.
-   *                           If null, no factory will be notified in this
-   *                           case.
-   * @param verifyChecksum     True if we should verify the checksums.
-   *                           Note: even if this is true, when
-   *                           DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
-   *                           set or the block is mlocked, we will skip
-   *                           checksums.
-   *
-   * @return                   The BlockReaderLocal, or null if the
-   *                           DataNode declined to provide short-circuit
-   *                           access.
-   * @throws IOException       If there was a communication error.
-   */
-  private static BlockReaderLocal newShortCircuitBlockReader(
-      DFSClient.Conf conf, String file, ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken, long startOffset,
-      long len, Peer peer, DatanodeID datanodeID,
-      DomainSocketFactory domSockFactory, boolean verifyChecksum,
-      FileInputStreamCache fisCache,
-      CachingStrategy cachingStrategy) throws IOException {
+   * Request file descriptors from a DomainPeer.
+   *
+   * @return  A ShortCircuitReplica object if we could communicate with the
+   *          datanode; null, otherwise. 
+   * @throws  IOException If we encountered an I/O exception while communicating
+   *          with the datanode.
+   */
+  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer)
+        throws IOException {
     final DataOutputStream out =
-        new DataOutputStream(new BufferedOutputStream(
-          peer.getOutputStream()));
-    new Sender(out).requestShortCircuitFds(block, blockToken, 1);
-    DataInputStream in =
-        new DataInputStream(peer.getInputStream());
+        new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+    new Sender(out).requestShortCircuitFds(block, token, 1);
+    DataInputStream in = new DataInputStream(peer.getInputStream());
     BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
         PBHelper.vintPrefixed(in));
     DomainSocket sock = peer.getDomainSocket();
     switch (resp.getStatus()) {
     case SUCCESS:
-      BlockReaderLocal reader = null;
       byte buf[] = new byte[1];
       FileInputStream fis[] = new FileInputStream[2];
       sock.recvFileInputStreams(fis, buf, 0, buf.length);
+      ShortCircuitReplica replica = null;
       try {
-        reader = new BlockReaderLocal.Builder(conf).
-            setFilename(file).
-            setBlock(block).
-            setStartOffset(startOffset).
-            setStreams(fis).
-            setDatanodeID(datanodeID).
-            setVerifyChecksum(verifyChecksum).
-            setBlockMetadataHeader(
-                BlockMetadataHeader.preadHeader(fis[1].getChannel())).
-            setFileInputStreamCache(fisCache).
-            setCachingStrategy(cachingStrategy).
-            build();
+        Key key = new Key(block.getBlockId(), block.getBlockPoolId());
+        replica = new ShortCircuitReplica(key, fis[0], fis[1],
+            clientContext.getShortCircuitCache(), Time.monotonicNow());
+      } catch (IOException e) {
+        // This indicates an error reading from disk, or a format error.  Since
+        // it's not a socket communication problem, we return null rather than
+        // throwing an exception.
+        LOG.warn(this + ": error creating ShortCircuitReplica.", e);
+        return null;
       } finally {
-        if (reader == null) {
+        if (replica == null) {
           IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
         }
       }
-      return reader;
+      return new ShortCircuitReplicaInfo(replica);
     case ERROR_UNSUPPORTED:
       if (!resp.hasShortCircuitAccessVersion()) {
-        DFSClient.LOG.warn("short-circuit read access is disabled for " +
-            "DataNode " + datanodeID + ".  reason: " + resp.getMessage());
-        domSockFactory.disableShortCircuitForPath(sock.getPath());
+        LOG.warn("short-circuit read access is disabled for " +
+            "DataNode " + datanode + ".  reason: " + resp.getMessage());
+        clientContext.getDomainSocketFactory()
+            .disableShortCircuitForPath(pathInfo.getPath());
       } else {
-        DFSClient.LOG.warn("short-circuit read access for the file " +
-            file + " is disabled for DataNode " + datanodeID +
+        LOG.warn("short-circuit read access for the file " +
+            fileName + " is disabled for DataNode " + datanode +
             ".  reason: " + resp.getMessage());
       }
       return null;
     case ERROR_ACCESS_TOKEN:
       String msg = "access control error while " +
           "attempting to set up short-circuit access to " +
-          file + resp.getMessage();
-      DFSClient.LOG.debug(msg);
-      throw new InvalidBlockTokenException(msg);
+          fileName + resp.getMessage();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(this + ":" + msg);
+      }
+      return new ShortCircuitReplicaInfo(new InvalidToken(msg));
     default:
-      DFSClient.LOG.warn("error while attempting to set up short-circuit " +
-          "access to " + file + ": " + resp.getMessage());
-      domSockFactory.disableShortCircuitForPath(sock.getPath());
+      LOG.warn(this + "unknown response code " + resp.getStatus() + " while " +
+          "attempting to set up short-circuit access. " + resp.getMessage());
+      clientContext.getDomainSocketFactory()
+          .disableShortCircuitForPath(pathInfo.getPath());
       return null;
     }
   }
 
   /**
+   * Get a RemoteBlockReader that communicates over a UNIX domain socket.
+   *
+   * @return The new BlockReader, or null if we failed to create the block
+   * reader.
+   *
+   * @throws InvalidToken    If the block token was invalid.
+   * Potentially other security-related execptions.
+   */
+  private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
+    if (pathInfo == null) {
+      pathInfo = clientContext.getDomainSocketFactory().
+                      getPathInfo(inetSocketAddress, conf);
+    }
+    if (!pathInfo.getPathState().getUsableForDataTransfer()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": not trying to create a remote block reader " +
+            "because the UNIX domain socket at " + pathInfo +
+            " is not usable.");
+      }
+      return null;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create a remote block reader from the " +
+          "UNIX domain socket at " + pathInfo.getPath());
+    }
+
+    while (true) {
+      BlockReaderPeer curPeer = nextDomainPeer();
+      if (curPeer == null) break;
+      DomainPeer peer = (DomainPeer)curPeer.peer;
+      BlockReader blockReader = null;
+      try {
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        IOUtils.cleanup(LOG, peer);
+        if (isSecurityException(ioe)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": got security exception while constructing " +
+                "a remote block reader from the unix domain socket at " +
+                pathInfo.getPath(), ioe);
+          }
+          throw ioe;
+        }
+        if (curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closed potentially stale domain peer " + peer, ioe);
+          }
+        } else {
+          // Handle an I/O error we got when using a newly created domain peer.
+          // We temporarily disable the domain socket path for a few minutes in
+          // this case, to prevent wasting more time on it.
+          LOG.warn("I/O error constructing remote block reader.  Disabling " +
+              "domain socket " + peer.getDomainSocket(), ioe);
+          clientContext.getDomainSocketFactory()
+              .disableDomainSocketPath(pathInfo.getPath());
+          return null;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtils.cleanup(LOG, peer);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a RemoteBlockReader that communicates over a TCP socket.
+   *
+   * @return The new BlockReader.  We will not return null, but instead throw
+   *         an exception if this fails.
+   *
+   * @throws InvalidToken
+   *             If the block token was invalid.
+   *         InvalidEncryptionKeyException
+   *             If the encryption key was invalid.
+   *         Other IOException
+   *             If there was another problem.
+   */
+  private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": trying to create a remote block reader from a " +
+          "TCP socket");
+    }
+    BlockReader blockReader = null;
+    while (true) {
+      BlockReaderPeer curPeer = null;
+      Peer peer = null;
+      try {
+        curPeer = nextTcpPeer();
+        if (curPeer == null) break;
+        peer = curPeer.peer;
+        blockReader = getRemoteBlockReader(peer);
+        return blockReader;
+      } catch (IOException ioe) {
+        if (isSecurityException(ioe)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": got security exception while constructing " +
+                "a remote block reader from " + peer, ioe);
+          }
+          throw ioe;
+        }
+        if ((curPeer != null) && curPeer.fromCache) {
+          // Handle an I/O error we got when using a cached peer.  These are
+          // considered less serious, because the underlying socket may be
+          // stale.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closed potentially stale remote peer " + peer, ioe);
+          }
+        } else {
+          // Handle an I/O error we got when using a newly created peer.
+          LOG.warn("I/O error constructing remote block reader.", ioe);
+          throw ioe;
+        }
+      } finally {
+        if (blockReader == null) {
+          IOUtils.cleanup(LOG, peer);
+        }
+      }
+    }
+    return null;
+  }
+
+  private class BlockReaderPeer {
+    final Peer peer;
+    final boolean fromCache;
+    
+    BlockReaderPeer(Peer peer, boolean fromCache) {
+      this.peer = peer;
+      this.fromCache = fromCache;
+    }
+  }
+
+  /**
+   * Get the next DomainPeer-- either from the cache or by creating it.
+   *
+   * @return the next DomainPeer, or null if we could not construct one.
+   */
+  private BlockReaderPeer nextDomainPeer() {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, true);
+      if (peer != null) {
+        remainingCacheTries--;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("nextDomainPeer: reusing existing peer " + peer);
+        }
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    DomainSocket sock = clientContext.getDomainSocketFactory().
+        createSocket(pathInfo, conf.socketTimeout);
+    if (sock == null) return null;
+    return new BlockReaderPeer(new DomainPeer(sock), false);
+  }
+
+  /**
+   * Get the next TCP-based peer-- either from the cache or by creating it.
+   *
+   * @return the next Peer, or null if we could not construct one.
+   *
+   * @throws IOException  If there was an error while constructing the peer
+   *                      (such as an InvalidEncryptionKeyException)
+   */
+  private BlockReaderPeer nextTcpPeer() throws IOException {
+    if (remainingCacheTries > 0) {
+      Peer peer = clientContext.getPeerCache().get(datanode, false);
+      if (peer != null) {
+        remainingCacheTries--;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("nextTcpPeer: reusing existing peer " + peer);
+        }
+        return new BlockReaderPeer(peer, true);
+      }
+    }
+    try {
+      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
+      }
+      return new BlockReaderPeer(peer, false);
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
+                  "connected to " + datanode);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Determine if an exception is security-related.
+   *
+   * We need to handle these exceptions differently than other IOExceptions.
+   * They don't indicate a communication problem.  Instead, they mean that there
+   * is some action the client needs to take, such as refetching block tokens,
+   * renewing encryption keys, etc.
+   *
+   * @param ioe    The exception
+   * @return       True only if the exception is security-related.
+   */
+  private static boolean isSecurityException(IOException ioe) {
+    return (ioe instanceof InvalidToken) ||
+            (ioe instanceof InvalidEncryptionKeyException) ||
+            (ioe instanceof InvalidBlockTokenException) ||
+            (ioe instanceof AccessControlException);
+  }
+
+  @SuppressWarnings("deprecation")
+  private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+    if (conf.useLegacyBlockReader) {
+      return RemoteBlockReader.newBlockReader(fileName,
+          block, token, startOffset, length, conf.ioBufferSize,
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy);
+    } else {
+      return RemoteBlockReader2.newBlockReader(
+          fileName, block, token, startOffset, length,
+          verifyChecksum, clientName, peer, datanode,
+          clientContext.getPeerCache(), cachingStrategy);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
+  }
+
+  /**
    * File name to print when accessing a block directly (from servlets)
    * @param s Address of the block location
    * @param poolId Block pool ID of the block
@@ -246,23 +778,4 @@ public class BlockReaderFactory {
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
-
-  /**
-   * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
-   * This block reader implements the path-based style of local reads
-   * first introduced in HDFS-2246.
-   */
-  static BlockReader getLegacyBlockReaderLocal(DFSClient dfsClient,
-      String src, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
-      long offsetIntoBlock) throws InvalidToken, IOException {
-    try {
-      final long length = blk.getNumBytes() - offsetIntoBlock;
-      return BlockReaderLocalLegacy.newBlockReader(dfsClient, src, blk,
-          accessToken, chosenNode, offsetIntoBlock, length);
-    } catch (RemoteException re) {
-      throw re.unwrapRemoteException(InvalidToken.class,
-          AccessControlException.class);
-    }
-  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Feb 12 19:24:46 2014
@@ -28,8 +28,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
@@ -67,12 +68,10 @@ class BlockReaderLocal implements BlockR
     private boolean verifyChecksum;
     private int maxReadahead;
     private String filename;
-    private FileInputStream streams[];
+    private ShortCircuitReplica replica;
     private long dataPos;
     private DatanodeID datanodeID;
-    private FileInputStreamCache fisCache;
     private boolean mlocked;
-    private BlockMetadataHeader header;
     private ExtendedBlock block;
 
     public Builder(Conf conf) {
@@ -99,8 +98,8 @@ class BlockReaderLocal implements BlockR
       return this;
     }
 
-    public Builder setStreams(FileInputStream streams[]) {
-      this.streams = streams;
+    public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
+      this.replica = replica;
       return this;
     }
 
@@ -114,30 +113,18 @@ class BlockReaderLocal implements BlockR
       return this;
     }
 
-    public Builder setFileInputStreamCache(FileInputStreamCache fisCache) {
-      this.fisCache = fisCache;
-      return this;
-    }
-
     public Builder setMlocked(boolean mlocked) {
       this.mlocked = mlocked;
       return this;
     }
 
-    public Builder setBlockMetadataHeader(BlockMetadataHeader header) {
-      this.header = header;
-      return this;
-    }
-
     public Builder setBlock(ExtendedBlock block) {
       this.block = block;
       return this;
     }
 
     public BlockReaderLocal build() {
-      Preconditions.checkNotNull(streams);
-      Preconditions.checkArgument(streams.length == 2);
-      Preconditions.checkNotNull(header);
+      Preconditions.checkNotNull(replica);
       return new BlockReaderLocal(this);
     }
   }
@@ -147,7 +134,7 @@ class BlockReaderLocal implements BlockR
   /**
    * Pair of streams for this block.
    */
-  private final FileInputStream streams[];
+  private final ShortCircuitReplica replica;
 
   /**
    * The data FileChannel.
@@ -208,12 +195,6 @@ class BlockReaderLocal implements BlockR
   private int checksumSize;
 
   /**
-   * FileInputStream cache to return the streams to upon closing,
-   * or null if we should just close them unconditionally.
-   */
-  private final FileInputStreamCache fisCache;
-
-  /**
    * Maximum number of chunks to allocate.
    *
    * This is used to allocate dataBuf and checksumBuf, in the event that
@@ -257,20 +238,18 @@ class BlockReaderLocal implements BlockR
    */
   private ByteBuffer checksumBuf;
 
-  private boolean mmapDisabled = false;
-
   private BlockReaderLocal(Builder builder) {
-    this.streams = builder.streams;
-    this.dataIn = builder.streams[0].getChannel();
+    this.replica = builder.replica;
+    this.dataIn = replica.getDataStream().getChannel();
     this.dataPos = builder.dataPos;
-    this.checksumIn = builder.streams[1].getChannel();
-    this.checksum = builder.header.getChecksum();
+    this.checksumIn = replica.getMetaStream().getChannel();
+    BlockMetadataHeader header = builder.replica.getMetaHeader();
+    this.checksum = header.getChecksum();
     this.verifyChecksum = builder.verifyChecksum &&
         (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
     this.mlocked = new AtomicBoolean(builder.mlocked);
     this.filename = builder.filename;
     this.datanodeID = builder.datanodeID;
-    this.fisCache = builder.fisCache;
     this.block = builder.block;
     this.bytesPerChecksum = checksum.getBytesPerChecksum();
     this.checksumSize = checksum.getChecksumSize();
@@ -642,20 +621,7 @@ class BlockReaderLocal implements BlockR
     if (LOG.isTraceEnabled()) {
       LOG.trace("close(filename=" + filename + ", block=" + block + ")");
     }
-    if (clientMmap != null) {
-      clientMmap.unref();
-      clientMmap = null;
-    }
-    if (fisCache != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("putting FileInputStream for " + filename +
-            " back into FileInputStreamCache");
-      }
-      fisCache.put(datanodeID, block, streams);
-    } else {
-      LOG.debug("closing FileInputStream for " + filename);
-      IOUtils.cleanup(LOG, dataIn, checksumIn);
-    }
+    replica.unref();
     freeDataBufIfExists();
     freeChecksumBufIfExists();
   }
@@ -683,8 +649,7 @@ class BlockReaderLocal implements BlockR
   }
 
   @Override
-  public synchronized ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) &&
           verifyChecksum && (!mlocked.get())) {
       if (LOG.isTraceEnabled()) {
@@ -694,27 +659,7 @@ class BlockReaderLocal implements BlockR
       }
       return null;
     }
-    if (clientMmap == null) {
-      if (mmapDisabled) {
-        return null;
-      }
-      try {
-        clientMmap = mmapManager.fetch(datanodeID, block, streams[0]);
-        if (clientMmap == null) {
-          mmapDisabled = true;
-          return null;
-        }
-      } catch (InterruptedException e) {
-        LOG.error("Interrupted while setting up mmap for " + filename, e);
-        Thread.currentThread().interrupt();
-        return null;
-      } catch (IOException e) {
-        LOG.error("unable to set up mmap for " + filename, e);
-        mmapDisabled = true;
-        return null;
-      }
-    }
-    return clientMmap;
+    return replica.getOrCreateClientMmap();
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Wed Feb 12 19:24:46 2014
@@ -31,7 +31,6 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -175,19 +174,21 @@ class BlockReaderLocalLegacy implements 
   /**
    * The only way this object can be instantiated.
    */
-  static BlockReaderLocalLegacy newBlockReader(DFSClient dfsClient,
-      String file, ExtendedBlock blk, Token<BlockTokenIdentifier> token,
-      DatanodeInfo node, long startOffset, long length)
-      throws IOException {
-    final DFSClient.Conf conf = dfsClient.getConf();
-
+  static BlockReaderLocalLegacy newBlockReader(DFSClient.Conf conf,
+      UserGroupInformation userGroupInformation,
+      Configuration configuration, String file, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> token, DatanodeInfo node, 
+      long startOffset, long length) throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
     BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
     if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(dfsClient.ugi, blk, node,
-          dfsClient.getConfiguration(), dfsClient.getHdfsTimeout(), token,
+      if (userGroupInformation == null) {
+        userGroupInformation = UserGroupInformation.getCurrentUser();
+      }
+      pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
+          configuration, conf.hdfsTimeout, token,
           conf.connectToDnViaHostname);
     }
 
@@ -708,8 +709,7 @@ class BlockReaderLocalLegacy implements 
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Feb 12 19:24:46 2014
@@ -56,6 +56,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -110,9 +112,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@@ -191,7 +194,7 @@ import com.google.common.net.InetAddress
  *
  ********************************************************/
 @InterfaceAudience.Private
-public class DFSClient implements java.io.Closeable {
+public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -212,50 +215,13 @@ public class DFSClient implements java.i
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   private final String authority;
-  final PeerCache peerCache;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
-  private boolean shouldUseLegacyBlockReaderLocal;
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
-  private ClientMmapManager mmapManager;
+  private final ClientContext clientContext;
   
-  private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
-      new ClientMmapManagerFactory();
-
-  private static final class ClientMmapManagerFactory {
-    private ClientMmapManager mmapManager = null;
-    /**
-     * Tracks the number of users of mmapManager.
-     */
-    private int refcnt = 0;
-
-    synchronized ClientMmapManager get(Configuration conf) {
-      if (refcnt++ == 0) {
-        mmapManager = ClientMmapManager.fromConf(conf);
-      } else {
-        String mismatches = mmapManager.verifyConfigurationMatches(conf);
-        if (!mismatches.isEmpty()) {
-          LOG.warn("The ClientMmapManager settings you specified " +
-            "have been ignored because another thread created the " +
-            "ClientMmapManager first.  " + mismatches);
-        }
-      }
-      return mmapManager;
-    }
-    
-    synchronized void unref(ClientMmapManager mmapManager) {
-      if (this.mmapManager != mmapManager) {
-        throw new IllegalArgumentException();
-      }
-      if (--refcnt == 0) {
-        IOUtils.cleanup(LOG, mmapManager);
-        mmapManager = null;
-      }
-    }
-  }
-
   /**
    * DFSClient configuration 
    */
@@ -300,6 +266,11 @@ public class DFSClient implements java.i
     final boolean domainSocketDataTraffic;
     final int shortCircuitStreamsCacheSize;
     final long shortCircuitStreamsCacheExpiryMs; 
+    
+    final int shortCircuitMmapCacheSize;
+    final long shortCircuitMmapCacheExpiryMs;
+    final long shortCircuitMmapCacheRetryTimeout;
+    final long shortCircuitCacheStaleThresholdMs;
 
     public Conf(Configuration conf) {
       // The hdfsTimeout is currently the same as the ipc timeout 
@@ -416,6 +387,18 @@ public class DFSClient implements java.i
       shortCircuitStreamsCacheExpiryMs = conf.getLong(
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
           DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
+      shortCircuitMmapCacheSize = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
+      shortCircuitMmapCacheExpiryMs = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
+          DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
+      shortCircuitMmapCacheRetryTimeout = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
+          DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
+      shortCircuitCacheStaleThresholdMs = conf.getLong(
+          DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
+          DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -464,11 +447,11 @@ public class DFSClient implements java.i
   public Conf getConf() {
     return dfsClientConf;
   }
-  
+
   Configuration getConfiguration() {
     return conf;
   }
-  
+
   /**
    * A map from file names to {@link DFSOutputStream} objects
    * that are currently being written by this client.
@@ -477,8 +460,6 @@ public class DFSClient implements java.i
   private final Map<String, DFSOutputStream> filesBeingWritten
       = new HashMap<String, DFSOutputStream>();
 
-  private final DomainSocketFactory domainSocketFactory;
-  
   /**
    * Same as this(NameNode.getAddress(conf), conf);
    * @see #DFSClient(InetSocketAddress, Configuration)
@@ -526,8 +507,6 @@ public class DFSClient implements java.i
     throws IOException {
     // Copy only the required DFSClient configuration
     this.dfsClientConf = new Conf(conf);
-    this.shouldUseLegacyBlockReaderLocal = 
-        this.dfsClientConf.useLegacyBlockReaderLocal;
     if (this.dfsClientConf.useLegacyBlockReaderLocal) {
       LOG.debug("Using legacy short-circuit local reads.");
     }
@@ -572,9 +551,6 @@ public class DFSClient implements java.i
       this.namenode = proxyInfo.getProxy();
     }
 
-    // read directly from the block file if configured.
-    this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
-
     String localInterfaces[] =
       conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
     localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -584,7 +560,6 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
     Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
         null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
     Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
@@ -595,7 +570,9 @@ public class DFSClient implements java.i
         new CachingStrategy(readDropBehind, readahead);
     this.defaultWriteCachingStrategy =
         new CachingStrategy(writeDropBehind, readahead);
-    this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
+    this.clientContext = ClientContext.get(
+        conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
+        dfsClientConf);
   }
   
   /**
@@ -800,10 +777,6 @@ public class DFSClient implements java.i
   
   /** Abort and release resources held.  Ignore all errors. */
   void abort() {
-    if (mmapManager != null) {
-      MMAP_MANAGER_FACTORY.unref(mmapManager);
-      mmapManager = null;
-    }
     clientRunning = false;
     closeAllFilesBeingWritten(true);
     try {
@@ -849,10 +822,6 @@ public class DFSClient implements java.i
    */
   @Override
   public synchronized void close() throws IOException {
-    if (mmapManager != null) {
-      MMAP_MANAGER_FACTORY.unref(mmapManager);
-      mmapManager = null;
-    }
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
       clientRunning = false;
@@ -2638,18 +2607,6 @@ public class DFSClient implements java.i
         + ", ugi=" + ugi + "]"; 
   }
 
-  public DomainSocketFactory getDomainSocketFactory() {
-    return domainSocketFactory;
-  }
-
-  public void disableLegacyBlockReaderLocal() {
-    shouldUseLegacyBlockReaderLocal = false;
-  }
-
-  public boolean useLegacyBlockReaderLocal() {
-    return shouldUseLegacyBlockReaderLocal;
-  }
-
   public CachingStrategy getDefaultReadCachingStrategy() {
     return defaultReadCachingStrategy;
   }
@@ -2658,8 +2615,29 @@ public class DFSClient implements java.i
     return defaultWriteCachingStrategy;
   }
 
-  @VisibleForTesting
-  public ClientMmapManager getMmapManager() {
-    return mmapManager;
+  public ClientContext getClientContext() {
+    return clientContext;
+  }
+
+  @Override // RemotePeerFactory
+  public Peer newConnectedPeer(InetSocketAddress addr) throws IOException {
+    Peer peer = null;
+    boolean success = false;
+    Socket sock = null;
+    try {
+      sock = socketFactory.createSocket();
+      NetUtils.connect(sock, addr,
+        getRandomLocalInterfaceAddr(),
+        dfsClientConf.socketTimeout);
+      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
+          getDataEncryptionKey());
+      success = true;
+      return peer;
+    } finally {
+      if (!success) {
+        IOUtils.cleanup(LOG, peer);
+        IOUtils.closeSocket(sock);
+      }
+    }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Feb 12 19:24:46 2014
@@ -59,6 +59,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
   public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
   public static final String  DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
+  public static final String  DFS_CLIENT_CONTEXT = "dfs.client.context";
+  public static final String  DFS_CLIENT_CONTEXT_DEFAULT = "default";
   public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
   public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
   public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
@@ -416,18 +418,20 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
-  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 100;
+  public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 256;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
-  public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5000;
+  public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5 * 60 * 1000;
   public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
   public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
   public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
   public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
-  public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
+  public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 256;
   public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
-  public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT  = 15 * 60 * 1000;
-  public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.thread.runs.per.timeout";
-  public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT  = 4;
+  public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT  = 60 * 60 * 1000;
+  public static final String DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS = "dfs.client.mmap.retry.timeout.ms";
+  public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000;
+  public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS = "dfs.client.short.circuit.replica.stale.threshold.ms";
+  public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30 * 60 * 1000;
 
   // property for fsimage compression
   public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Feb 12 19:24:46 2014
@@ -46,9 +46,6 @@ import org.apache.hadoop.fs.HasEnhancedB
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -82,7 +79,6 @@ implements ByteBufferReadable, CanSetDro
     HasEnhancedByteBufferAccess {
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
-  private final PeerCache peerCache;
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
@@ -190,8 +186,6 @@ implements ByteBufferReadable, CanSetDro
     private long totalZeroCopyBytesRead;
   }
   
-  private final FileInputStreamCache fileInputStreamCache;
-
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -223,10 +217,6 @@ implements ByteBufferReadable, CanSetDro
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
-    this.peerCache = dfsClient.peerCache;
-    this.fileInputStreamCache = new FileInputStreamCache(
-        dfsClient.getConf().shortCircuitStreamsCacheSize,
-        dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
     this.cachingStrategy =
         dfsClient.getDefaultReadCachingStrategy();
     openInfo();
@@ -572,18 +562,28 @@ implements ByteBufferReadable, CanSetDro
       try {
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-        blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
-            accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
-            buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
+        blockReader = new BlockReaderFactory(dfsClient.getConf()).
+            setInetSocketAddress(targetAddr).
+            setRemotePeerFactory(dfsClient).
+            setDatanodeInfo(chosenNode).
+            setFileName(src).
+            setBlock(blk).
+            setBlockToken(accessToken).
+            setStartOffset(offsetIntoBlock).
+            setVerifyChecksum(verifyChecksum).
+            setClientName(dfsClient.clientName).
+            setLength(blk.getNumBytes() - offsetIntoBlock).
+            setCachingStrategy(cachingStrategy).
+            setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
+            setClientCacheContext(dfsClient.getClientContext()).
+            setUserGroupInformation(dfsClient.ugi).
+            setConfiguration(dfsClient.getConfiguration()).
+            build();
         if(connectFailedOnce) {
           DFSClient.LOG.info("Successfully connected to " + targetAddr +
                              " for " + blk);
         }
         return chosenNode;
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableLegacyBlockReaderLocal();
-        continue;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -635,7 +635,6 @@ implements ByteBufferReadable, CanSetDro
       blockReader = null;
     }
     super.close();
-    fileInputStreamCache.close();
     closed = true;
   }
 
@@ -932,9 +931,11 @@ implements ByteBufferReadable, CanSetDro
       // or fetchBlockAt(). Always get the latest list of locations at the 
       // start of the loop.
       CachingStrategy curCachingStrategy;
+      boolean allowShortCircuitLocalReads;
       synchronized (this) {
         block = getBlockAt(block.getStartOffset(), false);
         curCachingStrategy = cachingStrategy;
+        allowShortCircuitLocalReads = !shortCircuitForbidden();
       }
       DNAddrPair retval = chooseDataNode(block);
       DatanodeInfo chosenNode = retval.info;
@@ -943,11 +944,24 @@ implements ByteBufferReadable, CanSetDro
           
       try {
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-            
         int len = (int) (end - start + 1);
-        reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
-            blockToken, start, len, buffersize, verifyChecksum,
-            dfsClient.clientName, curCachingStrategy);
+        reader = new BlockReaderFactory(dfsClient.getConf()).
+            setInetSocketAddress(targetAddr).
+            setRemotePeerFactory(dfsClient).
+            setDatanodeInfo(chosenNode).
+            setFileName(src).
+            setBlock(block.getBlock()).
+            setBlockToken(blockToken).
+            setStartOffset(start).
+            setVerifyChecksum(verifyChecksum).
+            setClientName(dfsClient.clientName).
+            setLength(len).
+            setCachingStrategy(curCachingStrategy).
+            setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
+            setClientCacheContext(dfsClient.getClientContext()).
+            setUserGroupInformation(dfsClient.ugi).
+            setConfiguration(dfsClient.getConfiguration()).
+            build();
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -960,10 +974,6 @@ implements ByteBufferReadable, CanSetDro
                  e.getPos() + " from " + chosenNode);
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableLegacyBlockReaderLocal();
-        continue;
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -1022,194 +1032,6 @@ implements ByteBufferReadable, CanSetDro
     return false;
   }
 
-  private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
-    Peer peer = null;
-    boolean success = false;
-    Socket sock = null;
-    try {
-      sock = dfsClient.socketFactory.createSocket();
-      NetUtils.connect(sock, addr,
-        dfsClient.getRandomLocalInterfaceAddr(),
-        dfsClient.getConf().socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-          dfsClient.getDataEncryptionKey());
-      success = true;
-      return peer;
-    } finally {
-      if (!success) {
-        IOUtils.closeQuietly(peer);
-        IOUtils.closeQuietly(sock);
-      }
-    }
-  }
-
-  /**
-   * Retrieve a BlockReader suitable for reading.
-   * This method will reuse the cached connection to the DN if appropriate.
-   * Otherwise, it will create a new connection.
-   * Throwing an IOException from this method is basically equivalent to 
-   * declaring the DataNode bad, so we try to connect a lot of different ways
-   * before doing that.
-   *
-   * @param dnAddr  Address of the datanode
-   * @param chosenNode Chosen datanode information
-   * @param file  File location
-   * @param block  The Block object
-   * @param blockToken  The access token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @param CachingStrategy  caching strategy to use
-   * @return New BlockReader instance
-   */
-  protected BlockReader getBlockReader(InetSocketAddress dnAddr,
-                                       DatanodeInfo chosenNode,
-                                       String file,
-                                       ExtendedBlock block,
-                                       Token<BlockTokenIdentifier> blockToken,
-                                       long startOffset,
-                                       long len,
-                                       int bufferSize,
-                                       boolean verifyChecksum,
-                                       String clientName,
-                                       CachingStrategy curCachingStrategy)
-      throws IOException {
-    // Firstly, we check to see if we have cached any file descriptors for
-    // local blocks.  If so, we can just re-use those file descriptors.
-    FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
-    if (fis != null) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
-            "the FileInputStreamCache.");
-      }
-      return new BlockReaderLocal.Builder(dfsClient.getConf()).
-          setFilename(file).
-          setBlock(block).
-          setStartOffset(startOffset).
-          setStreams(fis).
-          setDatanodeID(chosenNode).
-          setVerifyChecksum(verifyChecksum).
-          setBlockMetadataHeader(BlockMetadataHeader.
-              preadHeader(fis[1].getChannel())).
-          setFileInputStreamCache(fileInputStreamCache).
-          setCachingStrategy(curCachingStrategy).
-          build();
-    }
-    
-    // If the legacy local block reader is enabled and we are reading a local
-    // block, try to create a BlockReaderLocalLegacy.  The legacy local block
-    // reader implements local reads in the style first introduced by HDFS-2246.
-    if ((dfsClient.useLegacyBlockReaderLocal()) &&
-        DFSClient.isLocalAddress(dnAddr) &&
-        (!shortCircuitForbidden())) {
-      try {
-        return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient,
-            clientName, block, blockToken, chosenNode, startOffset);
-      } catch (IOException e) {
-        DFSClient.LOG.warn("error creating legacy BlockReaderLocal.  " +
-            "Disabling legacy local reads.", e);
-        dfsClient.disableLegacyBlockReaderLocal();
-      }
-    }
-
-    // Look for cached domain peers.
-    int cacheTries = 0;
-    DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
-    BlockReader reader = null;
-    final int nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
-    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
-      Peer peer = peerCache.get(chosenNode, true);
-      if (peer == null) break;
-      try {
-        boolean allowShortCircuitLocalReads = dfsClient.getConf().
-            shortCircuitLocalReads && (!shortCircuitForbidden());
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, curCachingStrategy);
-        return reader;
-      } catch (IOException ex) {
-        DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
-            "Closing stale " + peer, ex);
-      } finally {
-        if (reader == null) {
-          IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-
-    // Try to create a DomainPeer.
-    DomainSocket domSock = dsFactory.create(dnAddr, this);
-    if (domSock != null) {
-      Peer peer = new DomainPeer(domSock);
-      try {
-        boolean allowShortCircuitLocalReads = dfsClient.getConf().
-            shortCircuitLocalReads && (!shortCircuitForbidden());
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode,
-            dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, curCachingStrategy);
-        return reader;
-      } catch (IOException e) {
-        DFSClient.LOG.warn("failed to connect to " + domSock, e);
-      } finally {
-        if (reader == null) {
-         // If the Peer that we got the error from was a DomainPeer,
-         // mark the socket path as bad, so that newDataSocket will not try 
-         // to re-open this socket for a while.
-         dsFactory.disableDomainSocketPath(domSock.getPath());
-         IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-
-    // Look for cached peers.
-    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
-      Peer peer = peerCache.get(chosenNode, false);
-      if (peer == null) break;
-      try {
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache, false,
-            curCachingStrategy);
-        return reader;
-      } catch (IOException ex) {
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
-          peer, ex);
-      } finally {
-        if (reader == null) {
-          IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-    if (tcpReadsDisabledForTesting) {
-      throw new IOException("TCP reads are disabled.");
-    }
-    // Try to create a new remote peer.
-    Peer peer = newTcpPeer(dnAddr);
-    try {
-      reader = BlockReaderFactory.newBlockReader(dfsClient.getConf(), file,
-          block, blockToken, startOffset, len, verifyChecksum, clientName,
-          peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false,
-        curCachingStrategy);
-      return reader;
-    } catch (IOException ex) {
-      DFSClient.LOG.debug(
-          "Exception while getting block reader, closing stale " + peer, ex);
-      throw ex;
-    } finally {
-      if (reader == null) {
-        IOUtils.closeQuietly(peer);
-  }
-    }
-  }
-
-
   /**
    * Read bytes starting from the specified position.
    * 
@@ -1555,8 +1377,7 @@ implements ByteBufferReadable, CanSetDro
     long blockStartInFile = currentLocatedBlock.getStartOffset();
     long blockPos = curPos - blockStartInFile;
     long limit = blockPos + length;
-    ClientMmap clientMmap =
-        blockReader.getClientMmap(opts, dfsClient.getMmapManager());
+    ClientMmap clientMmap = blockReader.getClientMmap(opts);
     if (clientMmap == null) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
@@ -1565,17 +1386,25 @@ implements ByteBufferReadable, CanSetDro
       }
       return null;
     }
-    seek(pos + length);
-    ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
-    buffer.position((int)blockPos);
-    buffer.limit((int)limit);
-    clientMmap.ref();
-    extendedReadBuffers.put(buffer, clientMmap);
-    readStatistics.addZeroCopyBytes(length);
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
-          "offset " + curPos + " via the zero-copy read path.  " +
-          "blockEnd = " + blockEnd);
+    boolean success = false;
+    ByteBuffer buffer;
+    try {
+      seek(pos + length);
+      buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+      buffer.position((int)blockPos);
+      buffer.limit((int)limit);
+      extendedReadBuffers.put(buffer, clientMmap);
+      readStatistics.addZeroCopyBytes(length);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+            "offset " + curPos + " via the zero-copy read path.  " +
+            "blockEnd = " + blockEnd);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        clientMmap.unref();
+      }
     }
     return buffer;
   }



Mime
View raw message