hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1567720 [2/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/server/common/ src/main/java/org/apache/had...
Date Wed, 12 Feb 2014 19:08:53 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Feb 12 19:08:52 2014
@@ -46,9 +46,6 @@ import org.apache.hadoop.fs.HasEnhancedB
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -82,7 +79,6 @@ implements ByteBufferReadable, CanSetDro
     HasEnhancedByteBufferAccess {
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
-  private final PeerCache peerCache;
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
@@ -190,8 +186,6 @@ implements ByteBufferReadable, CanSetDro
     private long totalZeroCopyBytesRead;
   }
   
-  private final FileInputStreamCache fileInputStreamCache;
-
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -223,10 +217,6 @@ implements ByteBufferReadable, CanSetDro
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
-    this.peerCache = dfsClient.peerCache;
-    this.fileInputStreamCache = new FileInputStreamCache(
-        dfsClient.getConf().shortCircuitStreamsCacheSize,
-        dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
     this.cachingStrategy =
         dfsClient.getDefaultReadCachingStrategy();
     openInfo();
@@ -572,18 +562,28 @@ implements ByteBufferReadable, CanSetDro
       try {
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-        blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
-            accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
-            buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
+        blockReader = new BlockReaderFactory(dfsClient.getConf()).
+            setInetSocketAddress(targetAddr).
+            setRemotePeerFactory(dfsClient).
+            setDatanodeInfo(chosenNode).
+            setFileName(src).
+            setBlock(blk).
+            setBlockToken(accessToken).
+            setStartOffset(offsetIntoBlock).
+            setVerifyChecksum(verifyChecksum).
+            setClientName(dfsClient.clientName).
+            setLength(blk.getNumBytes() - offsetIntoBlock).
+            setCachingStrategy(cachingStrategy).
+            setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
+            setClientCacheContext(dfsClient.getClientContext()).
+            setUserGroupInformation(dfsClient.ugi).
+            setConfiguration(dfsClient.getConfiguration()).
+            build();
         if(connectFailedOnce) {
           DFSClient.LOG.info("Successfully connected to " + targetAddr +
                              " for " + blk);
         }
         return chosenNode;
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableLegacyBlockReaderLocal();
-        continue;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -635,7 +635,6 @@ implements ByteBufferReadable, CanSetDro
       blockReader = null;
     }
     super.close();
-    fileInputStreamCache.close();
     closed = true;
   }
 
@@ -932,9 +931,11 @@ implements ByteBufferReadable, CanSetDro
       // or fetchBlockAt(). Always get the latest list of locations at the 
       // start of the loop.
       CachingStrategy curCachingStrategy;
+      boolean allowShortCircuitLocalReads;
       synchronized (this) {
         block = getBlockAt(block.getStartOffset(), false);
         curCachingStrategy = cachingStrategy;
+        allowShortCircuitLocalReads = !shortCircuitForbidden();
       }
       DNAddrPair retval = chooseDataNode(block);
       DatanodeInfo chosenNode = retval.info;
@@ -943,11 +944,24 @@ implements ByteBufferReadable, CanSetDro
           
       try {
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-            
         int len = (int) (end - start + 1);
-        reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
-            blockToken, start, len, buffersize, verifyChecksum,
-            dfsClient.clientName, curCachingStrategy);
+        reader = new BlockReaderFactory(dfsClient.getConf()).
+            setInetSocketAddress(targetAddr).
+            setRemotePeerFactory(dfsClient).
+            setDatanodeInfo(chosenNode).
+            setFileName(src).
+            setBlock(block.getBlock()).
+            setBlockToken(blockToken).
+            setStartOffset(start).
+            setVerifyChecksum(verifyChecksum).
+            setClientName(dfsClient.clientName).
+            setLength(len).
+            setCachingStrategy(curCachingStrategy).
+            setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
+            setClientCacheContext(dfsClient.getClientContext()).
+            setUserGroupInformation(dfsClient.ugi).
+            setConfiguration(dfsClient.getConfiguration()).
+            build();
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -960,10 +974,6 @@ implements ByteBufferReadable, CanSetDro
                  e.getPos() + " from " + chosenNode);
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableLegacyBlockReaderLocal();
-        continue;
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -1022,194 +1032,6 @@ implements ByteBufferReadable, CanSetDro
     return false;
   }
 
-  private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
-    Peer peer = null;
-    boolean success = false;
-    Socket sock = null;
-    try {
-      sock = dfsClient.socketFactory.createSocket();
-      NetUtils.connect(sock, addr,
-        dfsClient.getRandomLocalInterfaceAddr(),
-        dfsClient.getConf().socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-          dfsClient.getDataEncryptionKey());
-      success = true;
-      return peer;
-    } finally {
-      if (!success) {
-        IOUtils.closeQuietly(peer);
-        IOUtils.closeQuietly(sock);
-      }
-    }
-  }
-
-  /**
-   * Retrieve a BlockReader suitable for reading.
-   * This method will reuse the cached connection to the DN if appropriate.
-   * Otherwise, it will create a new connection.
-   * Throwing an IOException from this method is basically equivalent to 
-   * declaring the DataNode bad, so we try to connect a lot of different ways
-   * before doing that.
-   *
-   * @param dnAddr  Address of the datanode
-   * @param chosenNode Chosen datanode information
-   * @param file  File location
-   * @param block  The Block object
-   * @param blockToken  The access token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @param CachingStrategy  caching strategy to use
-   * @return New BlockReader instance
-   */
-  protected BlockReader getBlockReader(InetSocketAddress dnAddr,
-                                       DatanodeInfo chosenNode,
-                                       String file,
-                                       ExtendedBlock block,
-                                       Token<BlockTokenIdentifier> blockToken,
-                                       long startOffset,
-                                       long len,
-                                       int bufferSize,
-                                       boolean verifyChecksum,
-                                       String clientName,
-                                       CachingStrategy curCachingStrategy)
-      throws IOException {
-    // Firstly, we check to see if we have cached any file descriptors for
-    // local blocks.  If so, we can just re-use those file descriptors.
-    FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
-    if (fis != null) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
-            "the FileInputStreamCache.");
-      }
-      return new BlockReaderLocal.Builder(dfsClient.getConf()).
-          setFilename(file).
-          setBlock(block).
-          setStartOffset(startOffset).
-          setStreams(fis).
-          setDatanodeID(chosenNode).
-          setVerifyChecksum(verifyChecksum).
-          setBlockMetadataHeader(BlockMetadataHeader.
-              preadHeader(fis[1].getChannel())).
-          setFileInputStreamCache(fileInputStreamCache).
-          setCachingStrategy(curCachingStrategy).
-          build();
-    }
-    
-    // If the legacy local block reader is enabled and we are reading a local
-    // block, try to create a BlockReaderLocalLegacy.  The legacy local block
-    // reader implements local reads in the style first introduced by HDFS-2246.
-    if ((dfsClient.useLegacyBlockReaderLocal()) &&
-        DFSClient.isLocalAddress(dnAddr) &&
-        (!shortCircuitForbidden())) {
-      try {
-        return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient,
-            clientName, block, blockToken, chosenNode, startOffset);
-      } catch (IOException e) {
-        DFSClient.LOG.warn("error creating legacy BlockReaderLocal.  " +
-            "Disabling legacy local reads.", e);
-        dfsClient.disableLegacyBlockReaderLocal();
-      }
-    }
-
-    // Look for cached domain peers.
-    int cacheTries = 0;
-    DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
-    BlockReader reader = null;
-    final int nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
-    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
-      Peer peer = peerCache.get(chosenNode, true);
-      if (peer == null) break;
-      try {
-        boolean allowShortCircuitLocalReads = dfsClient.getConf().
-            shortCircuitLocalReads && (!shortCircuitForbidden());
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, curCachingStrategy);
-        return reader;
-      } catch (IOException ex) {
-        DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
-            "Closing stale " + peer, ex);
-      } finally {
-        if (reader == null) {
-          IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-
-    // Try to create a DomainPeer.
-    DomainSocket domSock = dsFactory.create(dnAddr, this);
-    if (domSock != null) {
-      Peer peer = new DomainPeer(domSock);
-      try {
-        boolean allowShortCircuitLocalReads = dfsClient.getConf().
-            shortCircuitLocalReads && (!shortCircuitForbidden());
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode,
-            dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, curCachingStrategy);
-        return reader;
-      } catch (IOException e) {
-        DFSClient.LOG.warn("failed to connect to " + domSock, e);
-      } finally {
-        if (reader == null) {
-         // If the Peer that we got the error from was a DomainPeer,
-         // mark the socket path as bad, so that newDataSocket will not try 
-         // to re-open this socket for a while.
-         dsFactory.disableDomainSocketPath(domSock.getPath());
-         IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-
-    // Look for cached peers.
-    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
-      Peer peer = peerCache.get(chosenNode, false);
-      if (peer == null) break;
-      try {
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache, false,
-            curCachingStrategy);
-        return reader;
-      } catch (IOException ex) {
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
-          peer, ex);
-      } finally {
-        if (reader == null) {
-          IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-    if (tcpReadsDisabledForTesting) {
-      throw new IOException("TCP reads are disabled.");
-    }
-    // Try to create a new remote peer.
-    Peer peer = newTcpPeer(dnAddr);
-    try {
-      reader = BlockReaderFactory.newBlockReader(dfsClient.getConf(), file,
-          block, blockToken, startOffset, len, verifyChecksum, clientName,
-          peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false,
-          curCachingStrategy);
-      return reader;
-    } catch (IOException ex) {
-      DFSClient.LOG.debug(
-          "Exception while getting block reader, closing stale " + peer, ex);
-      throw ex;
-    } finally {
-      if (reader == null) {
-        IOUtils.closeQuietly(peer);
-      }
-    }
-  }
-
-
   /**
    * Read bytes starting from the specified position.
    * 
@@ -1555,8 +1377,7 @@ implements ByteBufferReadable, CanSetDro
     long blockStartInFile = currentLocatedBlock.getStartOffset();
     long blockPos = curPos - blockStartInFile;
     long limit = blockPos + length;
-    ClientMmap clientMmap =
-        blockReader.getClientMmap(opts, dfsClient.getMmapManager());
+    ClientMmap clientMmap = blockReader.getClientMmap(opts);
     if (clientMmap == null) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
@@ -1565,17 +1386,25 @@ implements ByteBufferReadable, CanSetDro
       }
       return null;
     }
-    seek(pos + length);
-    ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
-    buffer.position((int)blockPos);
-    buffer.limit((int)limit);
-    clientMmap.ref();
-    extendedReadBuffers.put(buffer, clientMmap);
-    readStatistics.addZeroCopyBytes(length);
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
-          "offset " + curPos + " via the zero-copy read path.  " +
-          "blockEnd = " + blockEnd);
+    boolean success = false;
+    ByteBuffer buffer;
+    try {
+      seek(pos + length);
+      buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+      buffer.position((int)blockPos);
+      buffer.limit((int)limit);
+      extendedReadBuffers.put(buffer, clientMmap);
+      readStatistics.addZeroCopyBytes(length);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+            "offset " + curPos + " via the zero-copy read path.  " +
+            "blockEnd = " + blockEnd);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        clientMmap.unref();
+      }
     }
     return buffer;
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java Wed Feb 12 19:08:52 2014
@@ -27,29 +27,71 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.net.unix.DomainSocket;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
 class DomainSocketFactory {
   private static final Log LOG = BlockReaderLocal.LOG;
-  private final Conf conf;
 
-  enum PathStatus {
-    UNUSABLE,
-    SHORT_CIRCUIT_DISABLED,
+  public enum PathState {
+    UNUSABLE(false, false),
+    SHORT_CIRCUIT_DISABLED(true, false),
+    VALID(true, true);
+
+    PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) {
+      this.usableForDataTransfer = usableForDataTransfer;
+      this.usableForShortCircuit = usableForShortCircuit;
+    }
+
+    public boolean getUsableForDataTransfer() {
+      return usableForDataTransfer;
+    }
+
+    public boolean getUsableForShortCircuit() {
+      return usableForShortCircuit;
+    }
+
+    private final boolean usableForDataTransfer;
+    private final boolean usableForShortCircuit;
+  }
+
+  public static class PathInfo {
+    private final static PathInfo NOT_CONFIGURED =
+          new PathInfo("", PathState.UNUSABLE);
+
+    final private String path;
+    final private PathState state;
+
+    PathInfo(String path, PathState state) {
+      this.path = path;
+      this.state = state;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public PathState getPathState() {
+      return state;
+    }
+    
+    @Override
+    public String toString() {
+      return new StringBuilder().append("PathInfo{path=").append(path).
+          append(", state=").append(state).append("}").toString();
+    }
   }
 
   /**
    * Information about domain socket paths.
    */
-  Cache<String, PathStatus> pathInfo =
+  Cache<String, PathState> pathMap =
       CacheBuilder.newBuilder()
       .expireAfterWrite(10, TimeUnit.MINUTES)
       .build();
 
   public DomainSocketFactory(Conf conf) {
-    this.conf = conf;
-
     final String feature;
     if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) {
       feature = "The short-circuit local reads feature";
@@ -75,51 +117,46 @@ class DomainSocketFactory {
   }
 
   /**
-   * Create a DomainSocket.
-   * 
-   * @param addr        The address of the DataNode
-   * @param stream      The DFSInputStream the socket will be created for.
+   * Get information about a domain socket path.
+   *
+   * @param addr         The inet address to use.
+   * @param conf         The client configuration.
    *
-   * @return            null if the socket could not be created; the
-   *                    socket otherwise.  If there was an error while
-   *                    creating the socket, we will add the socket path
-   *                    to our list of failed domain socket paths.
+   * @return             Information about the socket path.
    */
-  DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
+  public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) {
     // If there is no domain socket path configured, we can't use domain
     // sockets.
-    if (conf.domainSocketPath.isEmpty()) return null;
+    if (conf.domainSocketPath.isEmpty()) return PathInfo.NOT_CONFIGURED;
     // If we can't do anything with the domain socket, don't create it.
     if (!conf.domainSocketDataTraffic &&
         (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) {
-      return null;
+      return PathInfo.NOT_CONFIGURED;
     }
-    // UNIX domain sockets can only be used to talk to local peers
-    if (!DFSClient.isLocalAddress(addr)) return null;
     // If the DomainSocket code is not loaded, we can't create
     // DomainSocket objects.
-    if (DomainSocket.getLoadingFailureReason() != null) return null;
+    if (DomainSocket.getLoadingFailureReason() != null) {
+      return PathInfo.NOT_CONFIGURED;
+    }
+    // UNIX domain sockets can only be used to talk to local peers
+    if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
     String escapedPath = DomainSocket.
         getEffectivePath(conf.domainSocketPath, addr.getPort());
-    PathStatus info = pathInfo.getIfPresent(escapedPath);
-    if (info == PathStatus.UNUSABLE) {
-      // We tried to connect to this domain socket before, and it was totally
-      // unusable.
-      return null;
-    }
-    if ((!conf.domainSocketDataTraffic) &&
-        ((info == PathStatus.SHORT_CIRCUIT_DISABLED) || 
-            stream.shortCircuitForbidden())) {
-      // If we don't want to pass data over domain sockets, and we don't want
-      // to pass file descriptors over them either, we have no use for domain
-      // sockets.
-      return null;
+    PathState status = pathMap.getIfPresent(escapedPath);
+    if (status == null) {
+      return new PathInfo(escapedPath, PathState.VALID);
+    } else {
+      return new PathInfo(escapedPath, status);
     }
+  }
+
+  public DomainSocket createSocket(PathInfo info, int socketTimeout) {
+    Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE);
     boolean success = false;
     DomainSocket sock = null;
     try {
-      sock = DomainSocket.connect(escapedPath);
-      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, conf.socketTimeout);
+      sock = DomainSocket.connect(info.getPath());
+      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout);
       success = true;
     } catch (IOException e) {
       LOG.warn("error creating DomainSocket", e);
@@ -129,7 +166,7 @@ class DomainSocketFactory {
         if (sock != null) {
           IOUtils.closeQuietly(sock);
         }
-        pathInfo.put(escapedPath, PathStatus.UNUSABLE);
+        pathMap.put(info.getPath(), PathState.UNUSABLE);
         sock = null;
       }
     }
@@ -137,10 +174,10 @@ class DomainSocketFactory {
   }
 
   public void disableShortCircuitForPath(String path) {
-    pathInfo.put(path, PathStatus.SHORT_CIRCUIT_DISABLED);
+    pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED);
   }
 
   public void disableDomainSocketPath(String path) {
-    pathInfo.put(path, PathStatus.UNUSABLE);
+    pathMap.put(path, PathState.UNUSABLE);
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java Wed Feb 12 19:08:52 2014
@@ -89,42 +89,19 @@ class PeerCache {
     LinkedListMultimap.create();
   private final int capacity;
   private final long expiryPeriod;
-  private static PeerCache instance = null;
   
-  @VisibleForTesting
-  PeerCache(int c, long e) {
+  public PeerCache(int c, long e) {
     this.capacity = c;
     this.expiryPeriod = e;
 
     if (capacity == 0 ) {
       LOG.info("SocketCache disabled.");
-    }
-    else if (expiryPeriod == 0) {
+    } else if (expiryPeriod == 0) {
       throw new IllegalStateException("Cannot initialize expiryPeriod to " +
-         expiryPeriod + "when cache is enabled.");
+         expiryPeriod + " when cache is enabled.");
     }
   }
  
-  public static synchronized PeerCache getInstance(int c, long e) {
-    // capacity is only initialized once
-    if (instance == null) {
-      instance = new PeerCache(c, e);
-    } else { //already initialized once
-      if (instance.capacity != c || instance.expiryPeriod != e) {
-        LOG.info("capacity and expiry periods already set to " +
-          instance.capacity + " and " + instance.expiryPeriod +
-          " respectively. Cannot set it to " + c + " and " + e);
-      }
-    }
-
-    return instance;
-  }
-
-  @VisibleForTesting
-  public static synchronized void setInstance(int c, long e) {
-    instance = new PeerCache(c, e);
-  }
-
   private boolean isDaemonStarted() {
     return (daemon == null)? false: true;
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Feb 12 19:08:52 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FSInputCheck
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -492,8 +491,7 @@ public class RemoteBlockReader extends F
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Wed Feb 12 19:08:52 2014
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -457,8 +456,7 @@ public class RemoteBlockReader2  impleme
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java?rev=1567720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java Wed Feb 12 19:08:52 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public interface RemotePeerFactory {
+  /**
+   * @param addr          The address to connect to.
+   * 
+   * @return              A new Peer connected to the address.
+   *
+   * @throws IOException  If there was an error connecting or creating 
+   *                      the remote socket, encrypted stream, etc.
+   */
+  Peer newConnectedPeer(InetSocketAddress addr) throws IOException;
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Wed Feb 12 19:08:52 2014
@@ -17,24 +17,14 @@
  */
 package org.apache.hadoop.hdfs.client;
 
-import java.io.FileInputStream;
-
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.io.nativeio.NativeIO;
 
-import java.io.IOException;
-import java.lang.ref.WeakReference;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel.MapMode;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * A memory-mapped region used by an HDFS client.
  * 
@@ -46,111 +36,46 @@ public class ClientMmap {
   static final Log LOG = LogFactory.getLog(ClientMmap.class);
   
   /**
-   * A reference to the manager of this mmap.
-   * 
-   * This is only a weak reference to help minimize the damange done by
-   * code which leaks references accidentally.
+   * A reference to the block replica which this mmap relates to.
    */
-  private final WeakReference<ClientMmapManager> manager;
+  private final ShortCircuitReplica replica;
   
   /**
-   * The actual mapped memory region.
+   * The java ByteBuffer object.
    */
   private final MappedByteBuffer map;
-  
-  /**
-   * A reference count tracking how many threads are using this object.
-   */
-  private final AtomicInteger refCount = new AtomicInteger(1);
 
   /**
-   * Block pertaining to this mmap
+   * Reference count of this ClientMmap object.
    */
-  private final ExtendedBlock block;
-  
-  /**
-   * The DataNode where this mmap came from.
-   */
-  private final DatanodeID datanodeID;
-
-  /**
-   * The monotonic time when this mmap was last evictable.
-   */
-  private long lastEvictableTimeNs;
-
-  public static ClientMmap load(ClientMmapManager manager, FileInputStream in, 
-      ExtendedBlock block, DatanodeID datanodeID) 
-          throws IOException {
-    MappedByteBuffer map =
-        in.getChannel().map(MapMode.READ_ONLY, 0,
-            in.getChannel().size());
-    return new ClientMmap(manager, map, block, datanodeID);
-  }
+  private final AtomicInteger refCount = new AtomicInteger(1);
 
-  private ClientMmap(ClientMmapManager manager, MappedByteBuffer map, 
-        ExtendedBlock block, DatanodeID datanodeID) 
-            throws IOException {
-    this.manager = new WeakReference<ClientMmapManager>(manager);
+  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map) {
+    this.replica = replica;
     this.map = map;
-    this.block = block;
-    this.datanodeID = datanodeID;
-    this.lastEvictableTimeNs = 0;
   }
 
   /**
-   * Decrement the reference count on this object.
-   * Should be called with the ClientMmapManager lock held.
+   * Increment the reference count.
+   *
+   * @return   The new reference count.
    */
-  public void unref() {
-    int count = refCount.decrementAndGet();
-    if (count < 0) {
-      throw new IllegalArgumentException("can't decrement the " +
-          "reference count on this ClientMmap lower than 0.");
-    } else if (count == 0) {
-      ClientMmapManager man = manager.get();
-      if (man == null) {
-        unmap();
-      } else {
-        man.makeEvictable(this);
-      }
-    }
+  void ref() {
+    refCount.addAndGet(1);
   }
 
   /**
-   * Increment the reference count on this object.
+   * Decrement the reference count.
    *
-   * @return     The new reference count.
+   * The parent replica gets unreferenced each time the reference count 
+   * of this object goes to 0.
    */
-  public int ref() {
-    return refCount.getAndIncrement();
-  }
-
-  @VisibleForTesting
-  public ExtendedBlock getBlock() {
-    return block;
-  }
-
-  DatanodeID getDatanodeID() {
-    return datanodeID;
+  public void unref() {
+    refCount.addAndGet(-1);
+    replica.unref();
   }
 
   public MappedByteBuffer getMappedByteBuffer() {
     return map;
   }
-
-  public void setLastEvictableTimeNs(long lastEvictableTimeNs) {
-    this.lastEvictableTimeNs = lastEvictableTimeNs;
-  }
-
-  public long getLastEvictableTimeNs() {
-    return this.lastEvictableTimeNs;
-  }
-
-  /**
-   * Unmap the memory region.
-   */
-  void unmap() {
-    assert(refCount.get() == 0);
-    NativeIO.POSIX.munmap(map);
-  }
-}
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java?rev=1567720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java Wed Feb 12 19:08:52 2014
@@ -0,0 +1,880 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.Closeable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Waitable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The ShortCircuitCache tracks things which the client needs to access
+ * HDFS block files via short-circuit.
+ *
+ * These things include: memory-mapped regions, file descriptors, and shared
+ * memory areas for communicating with the DataNode.
+ */
+@InterfaceAudience.Private
+public class ShortCircuitCache implements Closeable {
+  public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
+
+  /**
+   * Expiry thread which makes sure that the file descriptors get closed
+   * after a while.
+   */
+  private class CacheCleaner implements Runnable, Closeable {
+    private ScheduledFuture<?> future;
+
+    /**
+     * Run the CacheCleaner thread.
+     *
+     * Whenever a thread requests a ShortCircuitReplica object, we will make
+     * sure it gets one.  That ShortCircuitReplica object can then be re-used
+     * when another thread requests a ShortCircuitReplica object for the same
+     * block.  So in that sense, there is no maximum size to the cache.
+     *
+     * However, when a ShortCircuitReplica object is unreferenced by the
+     * thread(s) that are using it, it becomes evictable.  There are two
+     * separate eviction lists-- one for mmaped objects, and another for
+     * non-mmaped objects.  We do this in order to avoid having the regular
+     * files kick the mmaped files out of the cache too quickly.  Reusing
+     * an already-existing mmap gives a huge performance boost, since the
+     * page table entries don't have to be re-populated.  Both the mmap
+     * and non-mmap evictable lists have maximum sizes and maximum lifespans.
+     */
+    @Override
+    public void run() {
+      ShortCircuitCache.this.lock.lock();
+      try {
+        if (ShortCircuitCache.this.closed) return;
+        long curMs = Time.monotonicNow();
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(this + ": cache cleaner running at " + curMs);
+        }
+
+        int numDemoted = demoteOldEvictableMmaped(curMs);
+        int numPurged = 0;
+        Long evictionTimeNs = Long.valueOf(0);
+        while (true) {
+          Entry<Long, ShortCircuitReplica> entry = 
+              evictableMmapped.ceilingEntry(evictionTimeNs);
+          if (entry == null) break;
+          evictionTimeNs = entry.getKey();
+          long evictionTimeMs = 
+              TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
+          if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break;
+          ShortCircuitReplica replica = entry.getValue();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("CacheCleaner: purging " + replica + ": " + 
+                  StringUtils.getStackTrace(Thread.currentThread()));
+          }
+          purge(replica);
+          numPurged++;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(this + ": finishing cache cleaner run started at " +
+            curMs + ".  Demoted " + numDemoted + " mmapped replicas; " +
+            "purged " + numPurged + " replicas.");
+        }
+      } finally {
+        ShortCircuitCache.this.lock.unlock();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (future != null) {
+        future.cancel(false);
+      }
+    }
+
+    public void setFuture(ScheduledFuture<?> future) {
+      this.future = future;
+    }
+
+    /**
+     * Get the rate at which this cleaner thread should be scheduled.
+     *
+     * We do this by taking the minimum expiration time and dividing by 4.
+     *
+     * @return the rate in milliseconds at which this thread should be
+     *         scheduled.
+     */
+    public long getRateInMs() {
+      long minLifespanMs =
+          Math.min(maxNonMmappedEvictableLifespanMs,
+              maxEvictableMmapedLifespanMs);
+      long sampleTimeMs = minLifespanMs / 4;
+      return (sampleTimeMs < 1) ? 1 : sampleTimeMs;
+    }
+  }
+
+  public interface ShortCircuitReplicaCreator {
+    /**
+     * Attempt to create a ShortCircuitReplica object.
+     *
+     * This callback will be made without holding any locks.
+     *
+     * @return a non-null ShortCircuitReplicaInfo object.
+     */
+    ShortCircuitReplicaInfo createShortCircuitReplicaInfo();
+  }
+
+  /**
+   * Lock protecting the cache.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * The executor service that runs the cacheCleaner.
+   */
+  private final ScheduledThreadPoolExecutor executor
+      = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("ShortCircuitCache Cleaner").
+          build());
+
+  /**
+   * A map containing all ShortCircuitReplicaInfo objects, organized by Key.
+   * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken
+   * exception.
+   */
+  private final HashMap<Key, Waitable<ShortCircuitReplicaInfo>>
+      replicaInfoMap = new HashMap<Key, Waitable<ShortCircuitReplicaInfo>>();
+
+  /**
+   * The CacheCleaner.  We don't create this and schedule it until it becomes
+   * necessary.
+   */
+  private CacheCleaner cacheCleaner;
+
+  /**
+   * Tree of evictable elements.
+   *
+   * Maps (unique) insertion time in nanoseconds to the element.
+   */
+  private final TreeMap<Long, ShortCircuitReplica> evictable =
+      new TreeMap<Long, ShortCircuitReplica>();
+
+  /**
+   * Maximum total size of the cache, including both mmapped and
+   * no$-mmapped elements.
+   */
+  private int maxTotalSize;
+
+  /**
+   * Non-mmaped elements older than this will be closed.
+   */
+  private long maxNonMmappedEvictableLifespanMs;
+
+  /**
+   * Tree of mmaped evictable elements.
+   *
+   * Maps (unique) insertion time in nanoseconds to the element.
+   */
+  private final TreeMap<Long, ShortCircuitReplica> evictableMmapped =
+      new TreeMap<Long, ShortCircuitReplica>();
+
+  /**
+   * Maximum number of mmaped evictable elements.
+   */
+  private int maxEvictableMmapedSize;
+
+  /**
+   * Mmaped elements older than this will be closed.
+   */
+  private final long maxEvictableMmapedLifespanMs;
+
+  /**
+   * The minimum number of milliseconds we'll wait after an unsuccessful
+   * mmap attempt before trying again.
+   */
+  private final long mmapRetryTimeoutMs;
+
+  /**
+   * How long we will keep replicas in the cache before declaring them
+   * to be stale.
+   */
+  private final long staleThresholdMs;
+
+  /**
+   * True if the ShortCircuitCache is closed.
+   */
+  private boolean closed = false;
+
+  /**
+   * Number of existing mmaps associated with this cache.
+   */
+  private int outstandingMmapCount = 0;
+
+  /**
+   * Create a {@link ShortCircuitCache} object from a {@link Configuration}
+   */
+  public static ShortCircuitCache fromConf(Configuration conf) {
+    return new ShortCircuitCache(
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
+        conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+            DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT),
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
+            DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT),
+        conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
+            DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT),
+        conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
+            DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT),
+        conf.getLong(DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
+            DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT));
+  }
+
+  public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
+      int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
+      long mmapRetryTimeoutMs, long staleThresholdMs) {
+    Preconditions.checkArgument(maxTotalSize >= 0);
+    this.maxTotalSize = maxTotalSize;
+    Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0);
+    this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs;
+    Preconditions.checkArgument(maxEvictableMmapedSize >= 0);
+    this.maxEvictableMmapedSize = maxEvictableMmapedSize;
+    Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0);
+    this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs;
+    this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
+    this.staleThresholdMs = staleThresholdMs;
+  }
+
+  public long getMmapRetryTimeoutMs() {
+    return mmapRetryTimeoutMs;
+  }
+
+  public long getStaleThresholdMs() {
+    return staleThresholdMs;
+  }
+
+  /**
+   * Increment the reference count of a replica, and remove it from any free
+   * list it may be in.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param replica      The replica we're removing.
+   */
+  private void ref(ShortCircuitReplica replica) {
+    lock.lock();
+    try {
+      Preconditions.checkArgument(replica.refCount > 0,
+          "can't ref " + replica + " because its refCount reached " +
+          replica.refCount);
+      Long evictableTimeNs = replica.getEvictableTimeNs();
+      replica.refCount++;
+      if (evictableTimeNs != null) {
+        String removedFrom = removeEvictable(replica);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": " + removedFrom +
+              " no longer contains " + replica + ".  refCount " +
+              (replica.refCount - 1) + " -> " + replica.refCount +
+              StringUtils.getStackTrace(Thread.currentThread()));
+
+        }
+      } else if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": replica  refCount " +
+            (replica.refCount - 1) + " -> " + replica.refCount +
+            StringUtils.getStackTrace(Thread.currentThread()));
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Unreference a replica.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param replica   The replica being unreferenced.
+   */
+  void unref(ShortCircuitReplica replica) {
+    lock.lock();
+    try {
+      String addedString = "";
+      int newRefCount = --replica.refCount;
+      if (newRefCount == 0) {
+        // Close replica, since there are no remaining references to it.
+        Preconditions.checkArgument(replica.purged,
+            "Replica " + replica + " reached a refCount of 0 without " +
+            "being purged");
+        replica.close();
+      } else if (newRefCount == 1) {
+        Preconditions.checkState(null == replica.getEvictableTimeNs(),
+            "Replica " + replica + " had a refCount higher than 1, " +
+              "but was still evictable (evictableTimeNs = " +
+                replica.getEvictableTimeNs() + ")");
+        if (!replica.purged) {
+          // Add the replica to the end of an eviction list.
+          // Eviction lists are sorted by time.
+          if (replica.hasMmap()) {
+            insertEvictable(System.nanoTime(), replica, evictableMmapped);
+            addedString = "added to evictableMmapped, ";
+          } else {
+            insertEvictable(System.nanoTime(), replica, evictable);
+            addedString = "added to evictable, ";
+          }
+          trimEvictionMaps();
+        }
+      } else {
+        Preconditions.checkArgument(replica.refCount >= 0,
+            "replica's refCount went negative (refCount = " +
+            replica.refCount + " for " + replica + ")");
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": unref replica " + replica +
+            ": " + addedString + " refCount " +
+            (newRefCount + 1) + " -> " + newRefCount +
+            StringUtils.getStackTrace(Thread.currentThread()));
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Demote old evictable mmaps into the regular eviction map.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param now   Current time in monotonic milliseconds.
+   * @return      Number of replicas demoted.
+   */
+  private int demoteOldEvictableMmaped(long now) {
+    int numDemoted = 0;
+    boolean needMoreSpace = false;
+    Long evictionTimeNs = Long.valueOf(0);
+
+    while (true) {
+      Entry<Long, ShortCircuitReplica> entry = 
+          evictableMmapped.ceilingEntry(evictionTimeNs);
+      if (entry == null) break;
+      evictionTimeNs = entry.getKey();
+      long evictionTimeMs = 
+          TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
+      if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) {
+        if (evictableMmapped.size() < maxEvictableMmapedSize) {
+          break;
+        }
+        needMoreSpace = true;
+      }
+      ShortCircuitReplica replica = entry.getValue();
+      if (LOG.isTraceEnabled()) {
+        String rationale = needMoreSpace ? "because we need more space" : 
+            "because it's too old";
+        LOG.trace("demoteOldEvictable: demoting " + replica + ": " +
+            rationale + ": " +
+            StringUtils.getStackTrace(Thread.currentThread()));
+      }
+      removeEvictable(replica, evictableMmapped);
+      munmap(replica);
+      insertEvictable(evictionTimeNs, replica, evictable);
+      numDemoted++;
+    }
+    return numDemoted;
+  }
+
+  /**
+   * Trim the eviction lists.
+   */
+  private void trimEvictionMaps() {
+    long now = Time.monotonicNow();
+    demoteOldEvictableMmaped(now);
+
+    while (true) {
+      long evictableSize = evictable.size();
+      long evictableMmappedSize = evictableMmapped.size();
+      if (evictableSize + evictableMmappedSize <= maxTotalSize) {
+        return;
+      }
+      ShortCircuitReplica replica;
+      if (evictableSize == 0) {
+       replica = evictableMmapped.firstEntry().getValue();
+      } else {
+       replica = evictable.firstEntry().getValue();
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": trimEvictionMaps is purging " +
+          StringUtils.getStackTrace(Thread.currentThread()));
+      }
+      purge(replica);
+    }
+  }
+
+  /**
+   * Munmap a replica, updating outstandingMmapCount.
+   *
+   * @param replica  The replica to munmap.
+   */
+  private void munmap(ShortCircuitReplica replica) {
+    replica.munmap();
+    outstandingMmapCount--;
+  }
+
+  /**
+   * Remove a replica from an evictable map.
+   *
+   * @param replica   The replica to remove.
+   * @return          The map it was removed from.
+   */
+  private String removeEvictable(ShortCircuitReplica replica) {
+    if (replica.hasMmap()) {
+      removeEvictable(replica, evictableMmapped);
+      return "evictableMmapped";
+    } else {
+      removeEvictable(replica, evictable);
+      return "evictable";
+    }
+  }
+
+  /**
+   * Remove a replica from an evictable map.
+   *
+   * @param replica   The replica to remove.
+   * @param map       The map to remove it from.
+   */
+  private void removeEvictable(ShortCircuitReplica replica,
+      TreeMap<Long, ShortCircuitReplica> map) {
+    Long evictableTimeNs = replica.getEvictableTimeNs();
+    Preconditions.checkNotNull(evictableTimeNs);
+    ShortCircuitReplica removed = map.remove(evictableTimeNs);
+    Preconditions.checkState(removed == replica,
+        "failed to make " + replica + " unevictable");
+    replica.setEvictableTimeNs(null);
+  }
+
+  /**
+   * Insert a replica into an evictable map.
+   *
+   * If an element already exists with this eviction time, we add a nanosecond
+   * to it until we find an unused key.
+   *
+   * @param evictionTimeNs   The eviction time in absolute nanoseconds.
+   * @param replica          The replica to insert.
+   * @param map              The map to insert it into.
+   */
+  private void insertEvictable(Long evictionTimeNs,
+      ShortCircuitReplica replica, TreeMap<Long, ShortCircuitReplica> map) {
+    while (map.containsKey(evictionTimeNs)) {
+      evictionTimeNs++;
+    }
+    Preconditions.checkState(null == replica.getEvictableTimeNs());
+    Long time = Long.valueOf(evictionTimeNs);
+    replica.setEvictableTimeNs(time);
+    map.put(time, replica);
+  }
+
+  /**
+   * Purge a replica from the cache.
+   *
+   * This doesn't necessarily close the replica, since there may be
+   * outstanding references to it.  However, it does mean the cache won't
+   * hand it out to anyone after this.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param replica   The replica being removed.
+   */
+  private void purge(ShortCircuitReplica replica) {
+    boolean removedFromInfoMap = false;
+    String evictionMapName = null;
+    Preconditions.checkArgument(!replica.purged);
+    replica.purged = true;
+    Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key);
+    if (val != null) {
+      ShortCircuitReplicaInfo info = val.getVal();
+      if ((info != null) && (info.getReplica() == replica)) {
+        replicaInfoMap.remove(replica.key);
+        removedFromInfoMap = true;
+      }
+    }
+    Long evictableTimeNs = replica.getEvictableTimeNs();
+    if (evictableTimeNs != null) {
+      evictionMapName = removeEvictable(replica);
+    }
+    if (LOG.isTraceEnabled()) {
+      StringBuilder builder = new StringBuilder();
+      builder.append(this).append(": ").append(": removed ").
+          append(replica).append(" from the cache.");
+      if (removedFromInfoMap) {
+        builder.append("  Removed from the replicaInfoMap.");
+      }
+      if (evictionMapName != null) {
+        builder.append("  Removed from ").append(evictionMapName);
+      }
+      LOG.trace(builder.toString());
+    }
+    unref(replica);
+  }
+
+  /**
+   * Fetch or create a replica.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param key          Key to use for lookup.
+   * @param creator      Replica creator callback.  Will be called without
+   *                     the cache lock being held.
+   *
+   * @return             Null if no replica could be found or created.
+   *                     The replica, otherwise.
+   */
+  public ShortCircuitReplicaInfo fetchOrCreate(Key key,
+      ShortCircuitReplicaCreator creator) {
+    Waitable<ShortCircuitReplicaInfo> newWaitable = null;
+    lock.lock();
+    try {
+      ShortCircuitReplicaInfo info = null;
+      do {
+        if (closed) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": can't fetchOrCreate " + key +
+                " because the cache is closed.");
+          }
+          return null;
+        }
+        Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
+        if (waitable != null) {
+          try {
+            info = fetch(key, waitable);
+          } catch (RetriableException e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(this + ": retrying " + e.getMessage());
+            }
+            continue;
+          }
+        }
+      } while (false);
+      if (info != null) return info;
+      // We need to load the replica ourselves.
+      newWaitable = new Waitable<ShortCircuitReplicaInfo>(lock.newCondition());
+      replicaInfoMap.put(key, newWaitable);
+    } finally {
+      lock.unlock();
+    }
+    return create(key, creator, newWaitable);
+  }
+
+  /**
+   * Fetch an existing ReplicaInfo object.
+   *
+   * @param key       The key that we're using.
+   * @param waitable  The waitable object to wait on.
+   * @return          The existing ReplicaInfo object, or null if there is
+   *                  none.
+   *
+   * @throws RetriableException   If the caller needs to retry.
+   */
+  private ShortCircuitReplicaInfo fetch(Key key,
+      Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
+    // Another thread is already in the process of loading this
+    // ShortCircuitReplica.  So we simply wait for it to complete.
+    ShortCircuitReplicaInfo info;
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": found waitable for " + key);
+      }
+      info = waitable.await();
+    } catch (InterruptedException e) {
+      LOG.info(this + ": interrupted while waiting for " + key);
+      Thread.currentThread().interrupt();
+      throw new RetriableException("interrupted");
+    }
+    if (info.getInvalidTokenException() != null) {
+      LOG.warn(this + ": could not get " + key + " due to InvalidToken " +
+            "exception.", info.getInvalidTokenException());
+      return info;
+    }
+    ShortCircuitReplica replica = info.getReplica();
+    if (replica == null) {
+      LOG.warn(this + ": failed to get " + key);
+      return info;
+    }
+    if (replica.purged) {
+      // Ignore replicas that have already been purged from the cache.
+      throw new RetriableException("Ignoring purged replica " +
+          replica + ".  Retrying.");
+    }
+    // Check if the replica is stale before using it.
+    // If it is, purge it and retry.
+    if (replica.isStale()) {
+      LOG.info(this + ": got stale replica " + replica + ".  Removing " +
+          "this replica from the replicaInfoMap and retrying.");
+      // Remove the cache's reference to the replica.  This may or may not
+      // trigger a close.
+      purge(replica);
+      throw new RetriableException("ignoring stale replica " + replica);
+    }
+    ref(replica);
+    return info;
+  }
+
+  private ShortCircuitReplicaInfo create(Key key,
+      ShortCircuitReplicaCreator creator,
+      Waitable<ShortCircuitReplicaInfo> newWaitable) {
+    // Handle loading a new replica.
+    ShortCircuitReplicaInfo info = null;
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": loading " + key);
+      }
+      info = creator.createShortCircuitReplicaInfo();
+    } catch (RuntimeException e) {
+      LOG.warn(this + ": failed to load " + key, e);
+    }
+    if (info == null) info = new ShortCircuitReplicaInfo();
+    lock.lock();
+    try {
+      if (info.getReplica() != null) {
+        // On success, make sure the cache cleaner thread is running.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": successfully loaded " + info.getReplica());
+        }
+        startCacheCleanerThreadIfNeeded();
+        // Note: new ShortCircuitReplicas start with a refCount of 2,
+        // indicating that both this cache and whoever requested the 
+        // creation of the replica hold a reference.  So we don't need
+        // to increment the reference count here.
+      } else {
+        // On failure, remove the waitable from the replicaInfoMap.
+        Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key);
+        if (waitableInMap == newWaitable) replicaInfoMap.remove(key);
+        if (info.getInvalidTokenException() != null) {
+          LOG.warn(this + ": could not load " + key + " due to InvalidToken " +
+              "exception.", info.getInvalidTokenException());
+        } else {
+          LOG.warn(this + ": failed to load " + key);
+        }
+      }
+      newWaitable.provide(info);
+    } finally {
+      lock.unlock();
+    }
+    return info;
+  }
+
+  private void startCacheCleanerThreadIfNeeded() {
+    if (cacheCleaner == null) {
+      cacheCleaner = new CacheCleaner();
+      long rateMs = cacheCleaner.getRateInMs();
+      ScheduledFuture<?> future =
+          executor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
+              TimeUnit.MILLISECONDS);
+      cacheCleaner.setFuture(future);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(this + ": starting cache cleaner thread which will run " +
+          "every " + rateMs + " ms");
+      }
+    }
+  }
+
+  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica) {
+    Condition newCond;
+    lock.lock();
+    try {
+      while (replica.mmapData != null) {
+        if (replica.mmapData instanceof ClientMmap) {
+          ref(replica);
+          ClientMmap clientMmap = (ClientMmap)replica.mmapData;
+          clientMmap.ref();
+          return clientMmap;
+        } else if (replica.mmapData instanceof Long) {
+          long lastAttemptTimeMs = (Long)replica.mmapData;
+          long delta = Time.monotonicNow() - lastAttemptTimeMs;
+          if (delta < staleThresholdMs) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace(this + ": can't create client mmap for " +
+                  replica + " because we failed to " +
+                  "create one just " + delta + "ms ago.");
+            }
+            return null;
+          }
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": retrying client mmap for " + replica +
+                ", " + delta + " ms after the previous failure.");
+          }
+        } else if (replica.mmapData instanceof Condition) {
+          Condition cond = (Condition)replica.mmapData;
+          cond.awaitUninterruptibly();
+        } else {
+          Preconditions.checkState(false, "invalid mmapData type " +
+              replica.mmapData.getClass().getName());
+        }
+      }
+      newCond = lock.newCondition();
+      replica.mmapData = newCond;
+    } finally {
+      lock.unlock();
+    }
+    MappedByteBuffer map = replica.loadMmapInternal();
+    lock.lock();
+    try {
+      if (map == null) {
+        replica.mmapData = Long.valueOf(Time.monotonicNow());
+        newCond.signalAll();
+        return null;
+      } else {
+        ClientMmap clientMmap = new ClientMmap(replica, map);
+        outstandingMmapCount++;
+        replica.mmapData = clientMmap;
+        ref(replica);
+        newCond.signalAll();
+        return clientMmap;
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Close the cache and free all associated resources.
+   */
+  public void close() {
+    try {
+      lock.lock();
+      if (closed) return;
+      closed = true;
+      LOG.info(this + ": closing");
+      maxNonMmappedEvictableLifespanMs = 0;
+      maxEvictableMmapedSize = 0;
+      // Close and join cacheCleaner thread.
+      IOUtils.cleanup(LOG, cacheCleaner);
+      // Purge all replicas.
+      while (true) {
+        Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry();
+        if (entry == null) break;
+        purge(entry.getValue());
+      }
+      while (true) {
+        Entry<Long, ShortCircuitReplica> entry = evictableMmapped.firstEntry();
+        if (entry == null) break;
+        purge(entry.getValue());
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @VisibleForTesting // ONLY for testing
+  public interface CacheVisitor {
+    void visit(int numOutstandingMmaps,
+        Map<Key, ShortCircuitReplica> replicas,
+        Map<Key, InvalidToken> failedLoads,
+        Map<Long, ShortCircuitReplica> evictable,
+        Map<Long, ShortCircuitReplica> evictableMmapped);
+  }
+
+  @VisibleForTesting // ONLY for testing
+  public void accept(CacheVisitor visitor) {
+    lock.lock();
+    try {
+      Map<Key, ShortCircuitReplica> replicas =
+          new HashMap<Key, ShortCircuitReplica>();
+      Map<Key, InvalidToken> failedLoads =
+          new HashMap<Key, InvalidToken>();
+      for (Entry<Key, Waitable<ShortCircuitReplicaInfo>> entry :
+            replicaInfoMap.entrySet()) {
+        Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue();
+        if (waitable.hasVal()) {
+          if (waitable.getVal().getReplica() != null) {
+            replicas.put(entry.getKey(), waitable.getVal().getReplica());
+          } else {
+            // The exception may be null here, indicating a failed load that
+            // isn't the result of an invalid block token.
+            failedLoads.put(entry.getKey(),
+                waitable.getVal().getInvalidTokenException());
+          }
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("visiting ").append(visitor.getClass().getName()).
+            append("with outstandingMmapCount=").append(outstandingMmapCount).
+            append(", replicas=");
+        String prefix = "";
+        for (Entry<Key, ShortCircuitReplica> entry : replicas.entrySet()) {
+          builder.append(prefix).append(entry.getValue());
+          prefix = ",";
+        }
+        prefix = "";
+        builder.append(", failedLoads=");
+        for (Entry<Key, InvalidToken> entry : failedLoads.entrySet()) {
+          builder.append(prefix).append(entry.getValue());
+          prefix = ",";
+        }
+        prefix = "";
+        builder.append(", evictable=");
+        for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) {
+          builder.append(prefix).append(entry.getKey()).
+              append(":").append(entry.getValue());
+          prefix = ",";
+        }
+        prefix = "";
+        builder.append(", evictableMmapped=");
+        for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) {
+          builder.append(prefix).append(entry.getKey()).
+              append(":").append(entry.getValue());
+          prefix = ",";
+        }
+        LOG.debug(builder.toString());
+      }
+      visitor.visit(outstandingMmapCount, replicas, failedLoads,
+            evictable, evictableMmapped);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ShortCircuitCache(0x" +
+        Integer.toHexString(System.identityHashCode(this)) + ")";
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java?rev=1567720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java Wed Feb 12 19:08:52 2014
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * A ShortCircuitReplica object contains file descriptors for a block that
+ * we are reading via short-circuit local reads.
+ *
+ * The file descriptors can be shared between multiple threads because
+ * all the operations we perform are stateless-- i.e., we use pread
+ * instead of read, to avoid using the shared position state.
+ */
+@InterfaceAudience.Private
+public class ShortCircuitReplica {
+  public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
+
+  /**
+   * Immutable class which identifies a ShortCircuitReplica object.
+   */
+  public static final class Key {
+    public Key(long blockId, String bpId) {
+      this.blockId = blockId;
+      this.bpId = bpId;
+    }
+
+    public long getBlockId() {
+      return this.blockId;
+    }
+
+    public String getBlockPoolId() {
+      return this.bpId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if ((o == null) || (o.getClass() != this.getClass())) {
+        return false;
+      }
+      Key other = (Key)o;
+      return new EqualsBuilder().
+          append(blockId, other.blockId).
+          append(bpId, other.bpId).
+          isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(this.blockId).
+          append(this.bpId).
+          toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append(blockId).
+          append("_").append(bpId).toString();
+    }
+
+    /**
+     * The block ID for this BlockDescriptors object.
+     */
+    private final long blockId;
+
+    /**
+     * The block pool ID for this BlockDescriptors object.
+     */
+    private final String bpId;
+  }
+  
+
+  /**
+   * Identifies this ShortCircuitReplica object.
+   */
+  final Key key;
+
+  /**
+   * The block data input stream.
+   */
+  private final FileInputStream dataStream;
+
+  /**
+   * The block metadata input stream.
+   *
+   * TODO: make this nullable if the file has no checksums on disk.
+   */
+  private final FileInputStream metaStream;
+
+  /**
+   * Block metadata header.
+   */
+  private final BlockMetadataHeader metaHeader;
+
+  /**
+   * The cache we belong to.
+   */
+  private final ShortCircuitCache cache;
+
+  /**
+   * Monotonic time at which the replica was created.
+   */
+  private final long creationTimeMs;
+
+  /**
+   * Current mmap state.
+   *
+   * Protected by the cache lock.
+   */
+  Object mmapData;
+
+  /**
+   * True if this replica has been purged from the cache; false otherwise.
+   *
+   * Protected by the cache lock.
+   */
+  boolean purged = false;
+
+  /**
+   * Number of external references to this replica.  Replicas are referenced
+   * by the cache, BlockReaderLocal instances, and by ClientMmap instances.
+   * The number starts at 2 because when we create a replica, it is referenced
+   * by both the cache and the requester.
+   *
+   * Protected by the cache lock.
+   */
+  int refCount = 2;
+
+  /**
+   * The monotonic time in nanoseconds at which the replica became evictable, or
+   * null if it is not evictable.
+   *
+   * Protected by the cache lock.
+   */
+  private Long evictableTimeNs = null;
+
+  public ShortCircuitReplica(Key key,
+      FileInputStream dataStream, FileInputStream metaStream,
+      ShortCircuitCache cache, long creationTimeMs) throws IOException {
+    this.key = key;
+    this.dataStream = dataStream;
+    this.metaStream = metaStream;
+    this.metaHeader =
+          BlockMetadataHeader.preadHeader(metaStream.getChannel());
+    if (metaHeader.getVersion() != 1) {
+      throw new IOException("invalid metadata header version " +
+          metaHeader.getVersion() + ".  Can only handle version 1.");
+    }
+    this.cache = cache;
+    this.creationTimeMs = creationTimeMs;
+  }
+
+  /**
+   * Decrement the reference count.
+   */
+  public void unref() {
+    cache.unref(this);
+  }
+
+  /**
+   * Check if the replica is stale.
+   *
+   * Must be called with the cache lock held.
+   */
+  boolean isStale() {
+    long deltaMs = Time.monotonicNow() - creationTimeMs;
+    long staleThresholdMs = cache.getStaleThresholdMs();
+    if (deltaMs > staleThresholdMs) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + " is stale because it's " + deltaMs +
+            " ms old, and staleThresholdMs = " + staleThresholdMs);
+      }
+      return true;
+    } else {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + " is not stale because it's only " + deltaMs +
+            " ms old, and staleThresholdMs = " + staleThresholdMs);
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Check if the replica has an associated mmap that has been fully loaded.
+   *
+   * Must be called with the cache lock held.
+   */
+  @VisibleForTesting
+  public boolean hasMmap() {
+    return ((mmapData != null) && (mmapData instanceof ClientMmap));
+  }
+
+  /**
+   * Free the mmap associated with this replica.
+   *
+   * Must be called with the cache lock held.
+   */
+  void munmap() {
+    ClientMmap clientMmap = (ClientMmap)mmapData;
+    NativeIO.POSIX.munmap(clientMmap.getMappedByteBuffer());
+    mmapData = null;
+  }
+
+  /**
+   * Close the replica.
+   *
+   * Must be called after there are no more references to the replica in the
+   * cache or elsewhere.
+   */
+  void close() {
+    Preconditions.checkState(refCount == 0,
+        "tried to close replica with refCount " + refCount + ": " + this);
+    Preconditions.checkState(purged,
+        "tried to close unpurged replica " + this);
+    if (hasMmap()) munmap();
+    IOUtils.cleanup(LOG, dataStream, metaStream);
+  }
+
+  public FileInputStream getDataStream() {
+    return dataStream;
+  }
+
+  public FileInputStream getMetaStream() {
+    return metaStream;
+  }
+
+  public BlockMetadataHeader getMetaHeader() {
+    return metaHeader;
+  }
+
+  public Key getKey() {
+    return key;
+  }
+
+  public ClientMmap getOrCreateClientMmap() {
+    return cache.getOrCreateClientMmap(this);
+  }
+
+  MappedByteBuffer loadMmapInternal() {
+    try {
+      FileChannel channel = dataStream.getChannel();
+      return channel.map(MapMode.READ_ONLY, 0, channel.size());
+    } catch (IOException e) {
+      LOG.warn(this + ": mmap error", e);
+      return null;
+    } catch (RuntimeException e) {
+      LOG.warn(this + ": mmap error", e);
+      return null;
+    }
+  }
+
+  /**
+   * Get the evictable time in nanoseconds.
+   *
+   * Note: you must hold the cache lock to call this function.
+   *
+   * @return the evictable time in nanoseconds.
+   */
+  public Long getEvictableTimeNs() {
+    return evictableTimeNs;
+  }
+
+  /**
+   * Set the evictable time in nanoseconds.
+   *
+   * Note: you must hold the cache lock to call this function.
+   *
+   * @param evictableTimeNs   The evictable time in nanoseconds, or null
+   *                          to set no evictable time.
+   */
+  void setEvictableTimeNs(Long evictableTimeNs) {
+    this.evictableTimeNs = evictableTimeNs;
+  }
+
+  /**
+   * Convert the replica to a string for debugging purposes.
+   * Note that we can't take the lock here.
+   */
+  @Override
+  public String toString() {
+    return new StringBuilder().append("ShortCircuitReplica{").
+        append("key=").append(key).
+        append(", metaHeader.version=").append(metaHeader.getVersion()).
+        append(", metaHeader.checksum=").append(metaHeader.getChecksum()).
+        append(", ident=").append("0x").
+          append(Integer.toHexString(System.identityHashCode(this))).
+        append(", creationTimeMs=").append(creationTimeMs).
+        append("}").toString();
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplicaInfo.java?rev=1567720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplicaInfo.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplicaInfo.java Wed Feb 12 19:08:52 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+public final class ShortCircuitReplicaInfo {
+  private final ShortCircuitReplica replica;
+  private final InvalidToken exc; 
+
+  public ShortCircuitReplicaInfo() {
+    this.replica = null;
+    this.exc = null;
+  }
+
+  public ShortCircuitReplicaInfo(ShortCircuitReplica replica) {
+    this.replica = replica;
+    this.exc = null;
+  }
+
+  public ShortCircuitReplicaInfo(InvalidToken exc) {
+    this.replica = null;
+    this.exc = exc;
+  }
+
+  public ShortCircuitReplica getReplica() {
+    return replica;
+  }
+
+  public InvalidToken getInvalidTokenException() {
+    return exc; 
+  }
+  
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    String prefix = "";
+    builder.append("ShortCircuitReplicaInfo{");
+    if (replica != null) {
+      builder.append(prefix).append(replica);
+      prefix = ", ";
+    }
+    if (exc != null) {
+      builder.append(prefix).append(exc);
+      prefix = ", ";
+    }
+    builder.append("}");
+    return builder.toString();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Feb 12 19:08:52 2014
@@ -27,8 +27,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -225,44 +228,67 @@ public class JspHelper {
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, Configuration conf, DFSClient.Conf dfsConf,
-      DataEncryptionKey encryptionKey)
+      JspWriter out, final Configuration conf, DFSClient.Conf dfsConf,
+      final DataEncryptionKey encryptionKey)
           throws IOException {
     if (chunkSizeToView == 0) return;
-    Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-    s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
-    s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-      
     int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
-      // Use the block name for file name. 
-    String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file,
-        new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead,  true,
-        "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
-        new DatanodeID(addr.getAddress().getHostAddress(),
-            addr.getHostName(), poolId, addr.getPort(), 0, 0, 0), null,
-            null, null, false, CachingStrategy.newDefaultStrategy());
-        
+    BlockReader blockReader = new BlockReaderFactory(dfsConf).
+      setInetSocketAddress(addr).
+      setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
+      setFileName(BlockReaderFactory.getFileName(addr, poolId, blockId)).
+      setBlockToken(blockToken).
+      setStartOffset(offsetIntoBlock).
+      setLength(amtToRead).
+      setVerifyChecksum(true).
+      setClientName("JspHelper").
+      setClientCacheContext(ClientContext.getFromConf(conf)).
+      setDatanodeInfo(new DatanodeInfo(
+          new DatanodeID(addr.getAddress().getHostAddress(),
+              addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setConfiguration(conf).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+          try {
+            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+            peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeSocket(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
+
     final byte[] buf = new byte[amtToRead];
-    int readOffset = 0;
-    int retries = 2;
-    while ( amtToRead > 0 ) {
-      int numRead = amtToRead;
-      try {
-        blockReader.readFully(buf, readOffset, amtToRead);
-      }
-      catch (IOException e) {
-        retries--;
-        if (retries == 0)
-          throw new IOException("Could not read data from datanode");
-        continue;
+    try {
+      int readOffset = 0;
+      int retries = 2;
+      while (amtToRead > 0) {
+        int numRead = amtToRead;
+        try {
+          blockReader.readFully(buf, readOffset, amtToRead);
+        } catch (IOException e) {
+          retries--;
+          if (retries == 0)
+            throw new IOException("Could not read data from datanode");
+          continue;
+        }
+        amtToRead -= numRead;
+        readOffset += numRead;
       }
-      amtToRead -= numRead;
-      readOffset += numRead;
+    } finally {
+      blockReader.close();
     }
-    blockReader.close();
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Wed Feb 12 19:08:52 2014
@@ -34,6 +34,8 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 
 /**
@@ -55,7 +57,8 @@ public class BlockMetadataHeader {
   private short version;
   private DataChecksum checksum = null;
     
-  BlockMetadataHeader(short version, DataChecksum checksum) {
+  @VisibleForTesting
+  public BlockMetadataHeader(short version, DataChecksum checksum) {
     this.checksum = checksum;
     this.version = version;
   }
@@ -148,7 +151,8 @@ public class BlockMetadataHeader {
    * @return 
    * @throws IOException
    */
-  private static void writeHeader(DataOutputStream out, 
+  @VisibleForTesting
+  public static void writeHeader(DataOutputStream out, 
                                   BlockMetadataHeader header) 
                                   throws IOException {
     out.writeShort(header.getVersion());



Mime
View raw message