hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1573433 [1/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/ap...
Date Mon, 03 Mar 2014 03:58:38 GMT
Author: cmccabe
Date: Mon Mar  3 03:58:37 2014
New Revision: 1573433

URL: http://svn.apache.org/r1573433
Log:
HDFS-5950. The DFSClient and DataNode should use shared memory segments to communicate short-circuit information (cmccabe)

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java
      - copied, changed from r1573432, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java
      - copied, changed from r1573432, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitSharedMemorySegment.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitSharedMemorySegment.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Mar  3 03:58:37 2014
@@ -370,6 +370,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
     (suresh)
 
+    HDFS-5950. The DFSClient and DataNode should use shared memory segments to
+    communicate short-circuit information. (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Mon Mar  3 03:58:37 2014
@@ -23,7 +23,6 @@ import java.util.EnumSet;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
  * A BlockReader is responsible for reading a single block

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Mon Mar  3 03:58:37 2014
@@ -24,6 +24,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,6 +33,8 @@ import org.apache.hadoop.hdfs.client.Sho
 import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -410,7 +413,6 @@ public class BlockReaderFactory implemen
         setBlock(block).
         setStartOffset(startOffset).
         setShortCircuitReplica(info.getReplica()).
-        setDatanodeID(datanode).
         setVerifyChecksum(verifyChecksum).
         setCachingStrategy(cachingStrategy).
         build();
@@ -438,12 +440,31 @@ public class BlockReaderFactory implemen
     while (true) {
       curPeer = nextDomainPeer();
       if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
       DomainPeer peer = (DomainPeer)curPeer.peer;
+      Slot slot = null;
+      ShortCircuitCache cache = clientContext.getShortCircuitCache();
       try {
-        ShortCircuitReplicaInfo info = requestFileDescriptors(peer);
+        MutableBoolean usedPeer = new MutableBoolean(false);
+        slot = cache.allocShmSlot(datanode, peer, usedPeer,
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
+            clientName);
+        if (usedPeer.booleanValue()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": allocShmSlot used up our previous socket " +
+              peer.getDomainSocket() + ".  Allocating a new one...");
+          }
+          curPeer = nextDomainPeer();
+          if (curPeer == null) break;
+          peer = (DomainPeer)curPeer.peer;
+        }
+        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
         clientContext.getPeerCache().put(datanode, peer);
         return info;
       } catch (IOException e) {
+        if (slot != null) {
+          cache.freeSlot(slot);
+        }
         if (curPeer.fromCache) {
           // Handle an I/O error we got when using a cached socket.
           // These are considered less serious, because the socket may be stale.
@@ -470,16 +491,22 @@ public class BlockReaderFactory implemen
   /**
    * Request file descriptors from a DomainPeer.
    *
+   * @param peer   The peer to use for communication.
+   * @param slot   If non-null, the shared memory slot to associate with the 
+   *               new ShortCircuitReplica.
+   * 
    * @return  A ShortCircuitReplica object if we could communicate with the
    *          datanode; null, otherwise. 
    * @throws  IOException If we encountered an I/O exception while communicating
    *          with the datanode.
    */
-  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer)
-        throws IOException {
+  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
+          Slot slot) throws IOException {
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
-    new Sender(out).requestShortCircuitFds(block, token, 1);
+    SlotId slotId = slot == null ? null : slot.getSlotId();
+    new Sender(out).requestShortCircuitFds(block, token, slotId, 1);
     DataInputStream in = new DataInputStream(peer.getInputStream());
     BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
         PBHelper.vintPrefixed(in));
@@ -491,9 +518,10 @@ public class BlockReaderFactory implemen
       sock.recvFileInputStreams(fis, buf, 0, buf.length);
       ShortCircuitReplica replica = null;
       try {
-        ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-        replica = new ShortCircuitReplica(key, fis[0], fis[1],
-            clientContext.getShortCircuitCache(), Time.monotonicNow());
+        ExtendedBlockId key =
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
+            Time.monotonicNow(), slot);
       } catch (IOException e) {
         // This indicates an error reading from disk, or a format error.  Since
         // it's not a socket communication problem, we return null rather than
@@ -527,8 +555,9 @@ public class BlockReaderFactory implemen
       }
       return new ShortCircuitReplicaInfo(new InvalidToken(msg));
     default:
-      LOG.warn(this + "unknown response code " + resp.getStatus() + " while " +
-          "attempting to set up short-circuit access. " + resp.getMessage());
+      LOG.warn(this + ": unknown response code " + resp.getStatus() +
+          " while attempting to set up short-circuit access. " +
+          resp.getMessage());
       clientContext.getDomainSocketFactory()
           .disableShortCircuitForPath(pathInfo.getPath());
       return null;
@@ -565,6 +594,7 @@ public class BlockReaderFactory implemen
     while (true) {
       BlockReaderPeer curPeer = nextDomainPeer();
       if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
       DomainPeer peer = (DomainPeer)curPeer.peer;
       BlockReader blockReader = null;
       try {
@@ -630,6 +660,7 @@ public class BlockReaderFactory implemen
       try {
         curPeer = nextTcpPeer();
         if (curPeer == null) break;
+        if (curPeer.fromCache) remainingCacheTries--;
         peer = curPeer.peer;
         blockReader = getRemoteBlockReader(peer);
         return blockReader;
@@ -662,7 +693,7 @@ public class BlockReaderFactory implemen
     return null;
   }
 
-  private static class BlockReaderPeer {
+  public static class BlockReaderPeer {
     final Peer peer;
     final boolean fromCache;
     
@@ -681,7 +712,6 @@ public class BlockReaderFactory implemen
     if (remainingCacheTries > 0) {
       Peer peer = clientContext.getPeerCache().get(datanode, true);
       if (peer != null) {
-        remainingCacheTries--;
         if (LOG.isTraceEnabled()) {
           LOG.trace("nextDomainPeer: reusing existing peer " + peer);
         }
@@ -706,7 +736,6 @@ public class BlockReaderFactory implemen
     if (remainingCacheTries > 0) {
       Peer peer = clientContext.getPeerCache().get(datanode, false);
       if (peer != null) {
-        remainingCacheTries--;
         if (LOG.isTraceEnabled()) {
           LOG.trace("nextTcpPeer: reusing existing peer " + peer);
         }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Mon Mar  3 03:58:37 2014
@@ -17,26 +17,21 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.EnumSet;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -70,8 +65,6 @@ class BlockReaderLocal implements BlockR
     private String filename;
     private ShortCircuitReplica replica;
     private long dataPos;
-    private DatanodeID datanodeID;
-    private boolean mlocked;
     private ExtendedBlock block;
 
     public Builder(Conf conf) {
@@ -108,16 +101,6 @@ class BlockReaderLocal implements BlockR
       return this;
     }
 
-    public Builder setDatanodeID(DatanodeID datanodeID) {
-      this.datanodeID = datanodeID;
-      return this;
-    }
-
-    public Builder setMlocked(boolean mlocked) {
-      this.mlocked = mlocked;
-      return this;
-    }
-
     public Builder setBlock(ExtendedBlock block) {
       this.block = block;
       return this;
@@ -165,19 +148,9 @@ class BlockReaderLocal implements BlockR
   private final boolean verifyChecksum;
 
   /**
-   * If true, this block is mlocked on the DataNode.
-   */
-  private final AtomicBoolean mlocked;
-
-  /**
    * Name of the block, for logging purposes.
    */
   private final String filename;
-
-  /**
-   * DataNode which contained this block.
-   */
-  private final DatanodeID datanodeID;
   
   /**
    * Block ID and Block Pool ID.
@@ -220,8 +193,6 @@ class BlockReaderLocal implements BlockR
    */
   private int maxReadaheadLength;
 
-  private ClientMmap clientMmap;
-
   /**
    * Buffers data starting at the current dataPos and extending on
    * for dataBuf.limit().
@@ -247,9 +218,7 @@ class BlockReaderLocal implements BlockR
     this.checksum = header.getChecksum();
     this.verifyChecksum = builder.verifyChecksum &&
         (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
-    this.mlocked = new AtomicBoolean(builder.mlocked);
     this.filename = builder.filename;
-    this.datanodeID = builder.datanodeID;
     this.block = builder.block;
     this.bytesPerChecksum = checksum.getBytesPerChecksum();
     this.checksumSize = checksum.getChecksumSize();
@@ -380,42 +349,55 @@ class BlockReaderLocal implements BlockR
     return total;
   }
 
-  private boolean getCanSkipChecksum() {
-    return (!verifyChecksum) || mlocked.get();
+  private boolean createNoChecksumContext() {
+    if (verifyChecksum) {
+      return replica.addNoChecksumAnchor();
+    } else {
+      return true;
+    }
   }
-  
+
+  private void releaseNoChecksumContext() {
+    if (verifyChecksum) {
+      replica.removeNoChecksumAnchor();
+    }
+  }
+
   @Override
   public synchronized int read(ByteBuffer buf) throws IOException {
-    boolean canSkipChecksum = getCanSkipChecksum();
-    
-    String traceString = null;
-    if (LOG.isTraceEnabled()) {
-      traceString = new StringBuilder().
-          append("read(").
-          append("buf.remaining=").append(buf.remaining()).
-          append(", block=").append(block).
-          append(", filename=").append(filename).
-          append(", canSkipChecksum=").append(canSkipChecksum).
-          append(")").toString();
-      LOG.info(traceString + ": starting");
-    }
-    int nRead;
+    boolean canSkipChecksum = createNoChecksumContext();
     try {
-      if (canSkipChecksum && zeroReadaheadRequested) {
-        nRead = readWithoutBounceBuffer(buf);
-      } else {
-        nRead = readWithBounceBuffer(buf, canSkipChecksum);
+      String traceString = null;
+      if (LOG.isTraceEnabled()) {
+        traceString = new StringBuilder().
+            append("read(").
+            append("buf.remaining=").append(buf.remaining()).
+            append(", block=").append(block).
+            append(", filename=").append(filename).
+            append(", canSkipChecksum=").append(canSkipChecksum).
+            append(")").toString();
+        LOG.info(traceString + ": starting");
+      }
+      int nRead;
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(buf);
+        } else {
+          nRead = readWithBounceBuffer(buf, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.info(traceString + ": I/O error", e);
+        }
+        throw e;
       }
-    } catch (IOException e) {
       if (LOG.isTraceEnabled()) {
-        LOG.info(traceString + ": I/O error", e);
+        LOG.info(traceString + ": returning " + nRead);
       }
-      throw e;
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.info(traceString + ": returning " + nRead);
+      return nRead;
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
     }
-    return nRead;
   }
 
   private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
@@ -531,34 +513,38 @@ class BlockReaderLocal implements BlockR
   @Override
   public synchronized int read(byte[] arr, int off, int len)
         throws IOException {
-    boolean canSkipChecksum = getCanSkipChecksum();
-    String traceString = null;
-    if (LOG.isTraceEnabled()) {
-      traceString = new StringBuilder().
-          append("read(arr.length=").append(arr.length).
-          append(", off=").append(off).
-          append(", len=").append(len).
-          append(", filename=").append(filename).
-          append(", block=").append(block).
-          append(", canSkipChecksum=").append(canSkipChecksum).
-          append(")").toString();
-      LOG.trace(traceString + ": starting");
-    }
+    boolean canSkipChecksum = createNoChecksumContext();
     int nRead;
     try {
-      if (canSkipChecksum && zeroReadaheadRequested) {
-        nRead = readWithoutBounceBuffer(arr, off, len);
-      } else {
-        nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+      String traceString = null;
+      if (LOG.isTraceEnabled()) {
+        traceString = new StringBuilder().
+            append("read(arr.length=").append(arr.length).
+            append(", off=").append(off).
+            append(", len=").append(len).
+            append(", filename=").append(filename).
+            append(", block=").append(block).
+            append(", canSkipChecksum=").append(canSkipChecksum).
+            append(")").toString();
+        LOG.trace(traceString + ": starting");
+      }
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(arr, off, len);
+        } else {
+          nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(traceString + ": I/O error", e);
+        }
+        throw e;
       }
-    } catch (IOException e) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace(traceString + ": I/O error", e);
+        LOG.trace(traceString + ": returning " + nRead);
       }
-      throw e;
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(traceString + ": returning " + nRead);
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
     }
     return nRead;
   }
@@ -648,28 +634,45 @@ class BlockReaderLocal implements BlockR
     return true;
   }
 
+  /**
+   * Get or create a memory map for this replica.
+   * 
+   * There are two kinds of ClientMmap objects we could fetch here: one that 
+   * will always read pre-checksummed data, and one that may read data that
+   * hasn't been checksummed.
+   *
+   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
+   * the anchor count on the shared memory slot.  This will tell the DataNode
+   * not to munlock the block until this ClientMmap is closed.
+   * If we fetch the latter, we don't bother with anchoring.
+   *
+   * @param opts     The options to use, such as SKIP_CHECKSUMS.
+   * 
+   * @return         null on failure; the ClientMmap otherwise.
+   */
   @Override
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) &&
-          verifyChecksum && (!mlocked.get())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("can't get an mmap for " + block + " of " + filename + 
-            " since SKIP_CHECKSUMS was not given, " +
-            "we aren't skipping checksums, and the block is not mlocked.");
+    boolean anchor = verifyChecksum &&
+        (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
+    if (anchor) {
+      if (!createNoChecksumContext()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("can't get an mmap for " + block + " of " + filename + 
+              " since SKIP_CHECKSUMS was not given, " +
+              "we aren't skipping checksums, and the block is not mlocked.");
+        }
+        return null;
       }
-      return null;
     }
-    return replica.getOrCreateClientMmap();
-  }
-
-  /**
-   * Set the mlocked state of the BlockReader.
-   * This method does NOT need to be synchronized because mlocked is atomic.
-   *
-   * @param mlocked  the new mlocked state of the BlockReader.
-   */
-  public void setMlocked(boolean mlocked) {
-    this.mlocked.set(mlocked);
+    ClientMmap clientMmap = null;
+    try {
+      clientMmap = replica.getOrCreateClientMmap(anchor);
+    } finally {
+      if ((clientMmap == null) && anchor) {
+        releaseNoChecksumContext();
+      }
+    }
+    return clientMmap;
   }
   
   @VisibleForTesting
@@ -681,4 +684,22 @@ class BlockReaderLocal implements BlockR
   int getMaxReadaheadLength() {
     return this.maxReadaheadLength;
   }
+  
+  /**
+   * Make the replica anchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceAnchorable() {
+    replica.getSlot().makeAnchorable();
+  }
+
+  /**
+   * Make the replica unanchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceUnanchorable() {
+    replica.getSlot().makeUnanchorable();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java Mon Mar  3 03:58:37 2014
@@ -99,7 +99,8 @@ public class ClientContext {
         conf.shortCircuitMmapCacheSize,
         conf.shortCircuitMmapCacheExpiryMs,
         conf.shortCircuitMmapCacheRetryTimeout,
-        conf.shortCircuitCacheStaleThresholdMs);
+        conf.shortCircuitCacheStaleThresholdMs,
+        conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
     this.peerCache =
           new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
     this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
@@ -129,7 +130,9 @@ public class ClientContext {
       append(", useLegacyBlockReaderLocal = ").
       append(conf.useLegacyBlockReaderLocal).
       append(", domainSocketDataTraffic = ").
-      append(conf.domainSocketDataTraffic);
+      append(conf.domainSocketDataTraffic).
+      append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
+      append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
 
     return builder.toString();
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Mon Mar  3 03:58:37 2014
@@ -277,6 +277,7 @@ public class DFSClient implements java.i
     final boolean domainSocketDataTraffic;
     final int shortCircuitStreamsCacheSize;
     final long shortCircuitStreamsCacheExpiryMs; 
+    final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
     
     final int shortCircuitMmapCacheSize;
     final long shortCircuitMmapCacheExpiryMs;
@@ -409,6 +410,9 @@ public class DFSClient implements java.i
       shortCircuitCacheStaleThresholdMs = conf.getLong(
           DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
           DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
+      shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
+          DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
+          DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Mar  3 03:58:37 2014
@@ -469,6 +469,10 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup";
   public static final String  DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file";
   public static final String  DFS_DATANODE_USER_NAME_KEY = "dfs.datanode.kerberos.principal";
+  public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH = "dfs.datanode.shared.file.descriptor.path";
+  public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT = "/dev/shm";
+  public static final String  DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
+  public static final int     DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000;
   public static final String  DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
   public static final String  DFS_NAMENODE_USER_NAME_KEY = "dfs.namenode.kerberos.principal";
   public static final String  DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.namenode.kerberos.internal.spnego.principal";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Mar  3 03:58:37 2014
@@ -39,6 +39,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferUtil;
@@ -1630,7 +1631,7 @@ implements ByteBufferReadable, CanSetDro
       success = true;
     } finally {
       if (!success) {
-        clientMmap.unref();
+        IOUtils.closeQuietly(clientMmap);
       }
     }
     return buffer;
@@ -1644,7 +1645,7 @@ implements ByteBufferReadable, CanSetDro
           "that was not created by this stream, " + buffer);
     }
     if (val instanceof ClientMmap) {
-      ((ClientMmap)val).unref();
+      IOUtils.closeQuietly((ClientMmap)val);
     } else if (val instanceof ByteBufferPool) {
       ((ByteBufferPool)val).putBuffer(buffer);
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java Mon Mar  3 03:58:37 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * An immutable key which identifies a block.
@@ -34,6 +35,10 @@ final public class ExtendedBlockId {
    */
   private final String bpId;
 
+  public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) {
+    return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+  }
+  
   public ExtendedBlockId(long blockId, String bpId) {
     this.blockId = blockId;
     this.bpId = bpId;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java Mon Mar  3 03:58:37 2014
@@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.security.UserGroupInformation;
 
 public interface RemotePeerFactory {
   /**

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java (from r1573432, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java&r1=1573432&r2=1573433&rev=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java Mon Mar  3 03:58:37 2014
@@ -15,47 +15,214 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.client;
+package org.apache.hadoop.hdfs;
 
-import java.io.Closeable;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
 
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
-import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.primitives.Ints;
 
 import sun.misc.Unsafe;
 
-public class ShortCircuitSharedMemorySegment implements Closeable {
-  private static final Log LOG =
-    LogFactory.getLog(ShortCircuitSharedMemorySegment.class);
+/**
+ * A shared memory segment used to implement short-circuit reads.
+ */
+public class ShortCircuitShm {
+  private static final Log LOG = LogFactory.getLog(ShortCircuitShm.class);
 
-  private static final int BYTES_PER_SLOT = 64;
+  protected static final int BYTES_PER_SLOT = 64;
 
-  private static final Unsafe unsafe;
+  private static final Unsafe unsafe = safetyDance();
 
-  static {
-    Unsafe theUnsafe = null;
+  private static Unsafe safetyDance() {
     try {
       Field f = Unsafe.class.getDeclaredField("theUnsafe");
       f.setAccessible(true);
-      theUnsafe = (Unsafe)f.get(null);
+      return (Unsafe)f.get(null);
     } catch (Throwable e) {
       LOG.error("failed to load misc.Unsafe", e);
     }
-    unsafe = theUnsafe;
+    return null;
+  }
+
+  /**
+   * Calculate the usable size of a shared memory segment.
+   * We round down to a multiple of the slot size and do some validation.
+   *
+   * @param stream The stream we're using.
+   * @return       The usable size of the shared memory segment.
+   */
+  private static int getUsableLength(FileInputStream stream)
+      throws IOException {
+    int intSize = Ints.checkedCast(stream.getChannel().size());
+    int slots = intSize / BYTES_PER_SLOT;
+    if (slots == 0) {
+      throw new IOException("size of shared memory segment was " +
+          intSize + ", but that is not enough to hold even one slot.");
+    }
+    return slots * BYTES_PER_SLOT;
   }
 
   /**
+   * Identifies a DfsClientShm.
+   */
+  public static class ShmId implements Comparable<ShmId> {
+    private static final Random random = new Random();
+    private final long hi;
+    private final long lo;
+
+    /**
+     * Generate a random ShmId.
+     * 
+     * We generate ShmIds randomly to prevent a malicious client from
+     * successfully guessing one and using that to interfere with another
+     * client.
+     */
+    public static ShmId createRandom() {
+      return new ShmId(random.nextLong(), random.nextLong());
+    }
+
+    public ShmId(long hi, long lo) {
+      this.hi = hi;
+      this.lo = lo;
+    }
+    
+    public long getHi() {
+      return hi;
+    }
+    
+    public long getLo() {
+      return lo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if ((o == null) || (o.getClass() != this.getClass())) {
+        return false;
+      }
+      ShmId other = (ShmId)o;
+      return new EqualsBuilder().
+          append(hi, other.hi).
+          append(lo, other.lo).
+          isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(this.hi).
+          append(this.lo).
+          toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%016x%016x", hi, lo);
+    }
+
+    @Override
+    public int compareTo(ShmId other) {
+      return ComparisonChain.start().
+          compare(hi, other.hi).
+          compare(lo, other.lo).
+          result();
+    }
+  };
+
+  /**
+   * Uniquely identifies a slot.
+   */
+  public static class SlotId {
+    private final ShmId shmId;
+    private final int slotIdx;
+    
+    public SlotId(ShmId shmId, int slotIdx) {
+      this.shmId = shmId;
+      this.slotIdx = slotIdx;
+    }
+
+    public ShmId getShmId() {
+      return shmId;
+    }
+
+    public int getSlotIdx() {
+      return slotIdx;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if ((o == null) || (o.getClass() != this.getClass())) {
+        return false;
+      }
+      SlotId other = (SlotId)o;
+      return new EqualsBuilder().
+          append(shmId, other.shmId).
+          append(slotIdx, other.slotIdx).
+          isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().
+          append(this.shmId).
+          append(this.slotIdx).
+          toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("SlotId(%s:%d)", shmId.toString(), slotIdx);
+    }
+  }
+
+  public class SlotIterator implements Iterator<Slot> {
+    int slotIdx = -1;
+
+    @Override
+    public boolean hasNext() {
+      synchronized (ShortCircuitShm.this) {
+        return allocatedSlots.nextSetBit(slotIdx + 1) != -1;
+      }
+    }
+
+    @Override
+    public Slot next() {
+      synchronized (ShortCircuitShm.this) {
+        int nextSlotIdx = allocatedSlots.nextSetBit(slotIdx + 1);
+        if (nextSlotIdx == -1) {
+          throw new NoSuchElementException();
+        }
+        slotIdx = nextSlotIdx;
+        return slots[nextSlotIdx];
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("SlotIterator " +
+          "doesn't support removal");
+    }
+  }
+  
+  /**
    * A slot containing information about a replica.
    *
    * The format is:
@@ -69,53 +236,134 @@ public class ShortCircuitSharedMemorySeg
    * Little-endian versus big-endian is not relevant here since both the client
    * and the server reside on the same computer and use the same orientation.
    */
-  public class Slot implements Closeable {
+  public class Slot {
     /**
-     * Flag indicating that the slot is in use.
+     * Flag indicating that the slot is valid.  
+     * 
+     * The DFSClient sets this flag when it allocates a new slot within one of
+     * its shared memory regions.
+     * 
+     * The DataNode clears this flag when the replica associated with this slot
+     * is no longer valid.  The client itself also clears this flag when it
+     * believes that the DataNode is no longer using this slot to communicate.
      */
-    private static final long SLOT_IN_USE_FLAG =    1L<<63;
+    private static final long VALID_FLAG =          1L<<63;
 
     /**
      * Flag indicating that the slot can be anchored.
      */
     private static final long ANCHORABLE_FLAG =     1L<<62;
 
-    private long slotAddress;
+    /**
+     * The slot address in memory.
+     */
+    private final long slotAddress;
 
-    Slot(long slotAddress) {
+    /**
+     * BlockId of the block this slot is used for.
+     */
+    private final ExtendedBlockId blockId;
+
+    Slot(long slotAddress, ExtendedBlockId blockId) {
       this.slotAddress = slotAddress;
+      this.blockId = blockId;
     }
 
     /**
-     * Make a given slot anchorable.
+     * Get the short-circuit memory segment associated with this Slot.
+     *
+     * @return      The enclosing short-circuit memory segment.
      */
-    public void makeAnchorable() {
-      Preconditions.checkState(slotAddress != 0,
-          "Called makeAnchorable on a slot that was closed.");
+    public ShortCircuitShm getShm() {
+      return ShortCircuitShm.this;
+    }
+
+    /**
+     * Get the ExtendedBlockId associated with this slot.
+     *
+     * @return      The ExtendedBlockId of this slot.
+     */
+    public ExtendedBlockId getBlockId() {
+      return blockId;
+    }
+
+    /**
+     * Get the SlotId of this slot, containing both shmId and slotIdx.
+     *
+     * @return      The SlotId of this slot.
+     */
+    public SlotId getSlotId() {
+      return new SlotId(getShmId(), getSlotIdx());
+    }
+
+    /**
+     * Get the Slot index.
+     *
+     * @return      The index of this slot.
+     */
+    public int getSlotIdx() {
+      return Ints.checkedCast(
+          (slotAddress - baseAddress) / BYTES_PER_SLOT);
+    }
+
+    private boolean isSet(long flag) {
+      long prev = unsafe.getLongVolatile(null, this.slotAddress);
+      return (prev & flag) != 0;
+    }
+
+    private void setFlag(long flag) {
       long prev;
       do {
         prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & ANCHORABLE_FLAG) != 0) {
+        if ((prev & flag) != 0) {
           return;
         }
       } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev | ANCHORABLE_FLAG));
+                  prev, prev | flag));
     }
 
-    /**
-     * Make a given slot unanchorable.
-     */
-    public void makeUnanchorable() {
-      Preconditions.checkState(slotAddress != 0,
-          "Called makeUnanchorable on a slot that was closed.");
+    private void clearFlag(long flag) {
       long prev;
       do {
         prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & ANCHORABLE_FLAG) == 0) {
+        if ((prev & flag) == 0) {
           return;
         }
       } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, prev & (~ANCHORABLE_FLAG)));
+                  prev, prev & (~flag)));
+    }
+    
+    public boolean isValid() {
+      return isSet(VALID_FLAG);
+    }
+
+    public void makeValid() {
+      setFlag(VALID_FLAG);
+    }
+
+    public void makeInvalid() {
+      clearFlag(VALID_FLAG);
+    }
+
+    public boolean isAnchorable() {
+      return isSet(ANCHORABLE_FLAG);
+    }
+
+    public void makeAnchorable() {
+      setFlag(ANCHORABLE_FLAG);
+    }
+
+    public void makeUnanchorable() {
+      clearFlag(ANCHORABLE_FLAG);
+    }
+
+    public boolean isAnchored() {
+      long prev = unsafe.getLongVolatile(null, this.slotAddress);
+      if ((prev & VALID_FLAG) == 0) {
+        // Slot is no longer valid.
+        return false;
+      }
+      return ((prev & 0x7fffffff) != 0);
     }
 
     /**
@@ -130,14 +378,18 @@ public class ShortCircuitSharedMemorySeg
       long prev;
       do {
         prev = unsafe.getLongVolatile(null, this.slotAddress);
-        if ((prev & 0x7fffffff) == 0x7fffffff) {
-          // Too many other threads have anchored the slot (2 billion?)
+        if ((prev & VALID_FLAG) == 0) {
+          // Slot is no longer valid.
           return false;
         }
         if ((prev & ANCHORABLE_FLAG) == 0) {
           // Slot can't be anchored right now.
           return false;
         }
+        if ((prev & 0x7fffffff) == 0x7fffffff) {
+          // Too many other threads have anchored the slot (2 billion?)
+          return false;
+        }
       } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
                   prev, prev + 1));
       return true;
@@ -157,146 +409,222 @@ public class ShortCircuitSharedMemorySeg
                   prev, prev - 1));
     }
 
-    /**
-     * @return      The index of this slot.
-     */
-    public int getIndex() {
-      Preconditions.checkState(slotAddress != 0);
-      return Ints.checkedCast(
-          (slotAddress - baseAddress) / BYTES_PER_SLOT);
-    }
-
     @Override
-    public void close() throws IOException {
-      if (slotAddress == 0) return;
-      long prev;
-      do {
-        prev = unsafe.getLongVolatile(null, this.slotAddress);
-        Preconditions.checkState((prev & SLOT_IN_USE_FLAG) != 0,
-            "tried to close slot that wasn't open");
-      } while (!unsafe.compareAndSwapLong(null, this.slotAddress,
-                  prev, 0));
-      slotAddress = 0;
-      if (ShortCircuitSharedMemorySegment.this.refCount.unreference()) {
-        ShortCircuitSharedMemorySegment.this.free();
-      }
+    public String toString() {
+      return "Slot(slotIdx=" + getSlotIdx() + ", shm=" + getShm() + ")";
     }
   }
 
   /**
-   * The stream that we're going to use to create this shared memory segment.
-   *
-   * Although this is a FileInputStream, we are going to assume that the
-   * underlying file descriptor is writable as well as readable.
-   * It would be more appropriate to use a RandomAccessFile here, but that class
-   * does not have any public accessor which returns a FileDescriptor, unlike
-   * FileInputStream.
+   * ID for this SharedMemorySegment.
+   */
+  private final ShmId shmId;
+
+  /**
+   * The base address of the memory-mapped file.
    */
-  private final FileInputStream stream;
+  private final long baseAddress;
 
   /**
-   * Length of the shared memory segment.
+   * The mmapped length of the shared memory segment
    */
-  private final int length;
+  private final int mmappedLength;
 
   /**
-   * The base address of the memory-mapped file.
+   * The slots associated with this shared memory segment.
+   * slot[i] contains the slot at offset i * BYTES_PER_SLOT,
+   * or null if that slot is not allocated.
    */
-  private final long baseAddress;
+  private final Slot slots[];
 
   /**
-   * Reference count and 'closed' status.
+   * A bitset where each bit represents a slot which is in use.
    */
-  private final CloseableReferenceCount refCount = new CloseableReferenceCount();
+  private final BitSet allocatedSlots;
 
-  public ShortCircuitSharedMemorySegment(FileInputStream stream)
+  /**
+   * Create the ShortCircuitShm.
+   * 
+   * @param shmId       The ID to use.
+   * @param stream      The stream that we're going to use to create this 
+   *                    shared memory segment.
+   *                    
+   *                    Although this is a FileInputStream, we are going to
+   *                    assume that the underlying file descriptor is writable
+   *                    as well as readable. It would be more appropriate to use
+   *                    a RandomAccessFile here, but that class does not have
+   *                    any public accessor which returns a FileDescriptor,
+   *                    unlike FileInputStream.
+   */
+  public ShortCircuitShm(ShmId shmId, FileInputStream stream)
         throws IOException {
     if (!NativeIO.isAvailable()) {
       throw new UnsupportedOperationException("NativeIO is not available.");
     }
     if (Shell.WINDOWS) {
       throw new UnsupportedOperationException(
-          "ShortCircuitSharedMemorySegment is not yet implemented " +
-          "for Windows.");
+          "DfsClientShm is not yet implemented for Windows.");
     }
     if (unsafe == null) {
       throw new UnsupportedOperationException(
-          "can't use ShortCircuitSharedMemorySegment because we failed to " +
+          "can't use DfsClientShm because we failed to " +
           "load misc.Unsafe.");
     }
-    this.refCount.reference();
-    this.stream = stream;
-    this.length = getEffectiveLength(stream);
-    this.baseAddress = POSIX.mmap(this.stream.getFD(), 
-      POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, this.length);
+    this.shmId = shmId;
+    this.mmappedLength = getUsableLength(stream);
+    this.baseAddress = POSIX.mmap(stream.getFD(), 
+        POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
+    this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
+    this.allocatedSlots = new BitSet(slots.length);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("creating " + this.getClass().getSimpleName() +
+          "(shmId=" + shmId +
+          ", mmappedLength=" + mmappedLength +
+          ", baseAddress=" + String.format("%x", baseAddress) +
+          ", slots.length=" + slots.length + ")");
+    }
   }
 
+  public final ShmId getShmId() {
+    return shmId;
+  }
+  
   /**
-   * Calculate the effective usable size of the shared memory segment.
-   * We round down to a multiple of the slot size and do some validation.
+   * Determine if this shared memory object is empty.
    *
-   * @param stream The stream we're using.
-   * @return       The effective usable size of the shared memory segment.
+   * @return    True if the shared memory object is empty.
    */
-  private static int getEffectiveLength(FileInputStream stream)
-      throws IOException {
-    int intSize = Ints.checkedCast(stream.getChannel().size());
-    int slots = intSize / BYTES_PER_SLOT;
-    Preconditions.checkState(slots > 0, "size of shared memory segment was " +
-        intSize + ", but that is not enough to hold even one slot.");
-    return slots * BYTES_PER_SLOT;
+  synchronized final public boolean isEmpty() {
+    return allocatedSlots.nextSetBit(0) == -1;
   }
 
-  private boolean allocateSlot(long address) {
-    long prev;
-    do {
-      prev = unsafe.getLongVolatile(null, address);
-      if ((prev & Slot.SLOT_IN_USE_FLAG) != 0) {
-        return false;
-      }
-    } while (!unsafe.compareAndSwapLong(null, address,
-                prev, prev | Slot.SLOT_IN_USE_FLAG));
-    return true;
+  /**
+   * Determine if this shared memory object is full.
+   *
+   * @return    True if the shared memory object is full.
+   */
+  synchronized final public boolean isFull() {
+    return allocatedSlots.nextClearBit(0) >= slots.length;
   }
 
   /**
-   * Allocate a new Slot in this shared memory segment.
+   * Calculate the base address of a slot.
    *
-   * @return        A newly allocated Slot, or null if there were no available
-   *                slots.
+   * @param slotIdx   Index of the slot.
+   * @return          The base address of the slot.
    */
-  public Slot allocateNextSlot() throws IOException {
-    ShortCircuitSharedMemorySegment.this.refCount.reference();
-    Slot slot = null;
-    try {
-      final int numSlots = length / BYTES_PER_SLOT;
-      for (int i = 0; i < numSlots; i++) {
-        long address = this.baseAddress + (i * BYTES_PER_SLOT);
-        if (allocateSlot(address)) {
-          slot = new Slot(address);
-          break;
-        }
-      }
-    } finally {
-      if (slot == null) {
-        if (refCount.unreference()) {
-          free();
-        }
-      }
+  private final long calculateSlotAddress(int slotIdx) {
+    return this.baseAddress + (slotIdx * BYTES_PER_SLOT);
+  }
+
+  /**
+   * Allocate a new slot and register it.
+   *
+   * This function chooses an empty slot, initializes it, and then returns
+   * the relevant Slot object.
+   *
+   * @return    The new slot.
+   */
+  synchronized public final Slot allocAndRegisterSlot(
+      ExtendedBlockId blockId) {
+    int idx = allocatedSlots.nextClearBit(0);
+    if (idx >= slots.length) {
+      throw new RuntimeException(this + ": no more slots are available.");
+    }
+    allocatedSlots.set(idx, true);
+    Slot slot = new Slot(calculateSlotAddress(idx), blockId);
+    slot.makeValid();
+    slots[idx] = slot;
+    if (LOG.isTraceEnabled()) {
+      //LOG.trace(this + ": allocAndRegisterSlot " + idx);
+      LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
+                  StringUtils.getStackTrace(Thread.currentThread()));
     }
     return slot;
   }
 
-  @Override
-  public void close() throws IOException {
-    refCount.setClosed();
-    if (refCount.unreference()) {
-      free();
+  synchronized public final Slot getSlot(int slotIdx)
+      throws InvalidRequestException {
+    if (!allocatedSlots.get(slotIdx)) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " does not exist.");
+    }
+    return slots[slotIdx];
+  }
+
+  /**
+   * Register a slot.
+   *
+   * This function looks at a slot which has already been initialized (by
+   * another process), and registers it with us.  Then, it returns the 
+   * relevant Slot object.
+   *
+   * @return    The slot.
+   *
+   * @throws InvalidRequestException
+   *            If the slot index we're trying to allocate has not been
+   *            initialized, or is already in use.
+   */
+  synchronized public final Slot registerSlot(int slotIdx,
+      ExtendedBlockId blockId) throws InvalidRequestException {
+    if (allocatedSlots.get(slotIdx)) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " is already in use.");
+    }
+    Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
+    if (!slot.isValid()) {
+      throw new InvalidRequestException(this + ": slot " + slotIdx +
+          " has not been allocated.");
+    }
+    slots[slotIdx] = slot;
+    allocatedSlots.set(slotIdx, true);
+    if (LOG.isTraceEnabled()) {
+      //LOG.trace(this + ": registerSlot " + slotIdx);
+      LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots +
+                  StringUtils.getStackTrace(Thread.currentThread()));
+    }
+    return slot;
+  }
+
+  /**
+   * Unregisters a slot.
+   * 
+   * This doesn't alter the contents of the slot.  It just means
+   *
+   * @param slotIdx  Index of the slot to unregister.
+   */
+  synchronized public final void unregisterSlot(int slotIdx) {
+    Preconditions.checkState(allocatedSlots.get(slotIdx),
+        "tried to unregister slot " + slotIdx + ", which was not registered.");
+    allocatedSlots.set(slotIdx, false);
+    slots[slotIdx] = null;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + ": unregisterSlot " + slotIdx);
     }
   }
+  
+  /**
+   * Iterate over all allocated slots.
+   * 
+   * Note that this method isn't safe if 
+   *
+   * @return        The slot iterator.
+   */
+  public SlotIterator slotIterator() {
+    return new SlotIterator();
+  }
 
-  void free() throws IOException {
-    IOUtils.cleanup(LOG, stream);
-    POSIX.munmap(baseAddress, length);
+  public void free() {
+    try {
+      POSIX.munmap(baseAddress, mmappedLength);
+    } catch (IOException e) {
+      LOG.warn(this + ": failed to munmap", e);
+    }
+    LOG.trace(this + ": freed");
+  }
+  
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "(" + shmId + ")";
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Mon Mar  3 03:58:37 2014
@@ -19,26 +19,23 @@ package org.apache.hadoop.hdfs.client;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
+import java.io.Closeable;
 import java.nio.MappedByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * A memory-mapped region used by an HDFS client.
- * 
- * This class includes a reference count and some other information used by
- * ClientMmapManager to track and cache mmaps.
+ * A reference to a memory-mapped region used by an HDFS client.
  */
 @InterfaceAudience.Private
-public class ClientMmap {
+public class ClientMmap implements Closeable {
   static final Log LOG = LogFactory.getLog(ClientMmap.class);
   
   /**
    * A reference to the block replica which this mmap relates to.
    */
-  private final ShortCircuitReplica replica;
+  private ShortCircuitReplica replica;
   
   /**
    * The java ByteBuffer object.
@@ -46,33 +43,30 @@ public class ClientMmap {
   private final MappedByteBuffer map;
 
   /**
-   * Reference count of this ClientMmap object.
+   * Whether or not this ClientMmap anchors the replica into memory while
+   * it exists.  Closing an anchored ClientMmap unanchors the replica.
    */
-  private final AtomicInteger refCount = new AtomicInteger(1);
+  private final boolean anchored;
 
-  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map) {
+  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map,
+      boolean anchored) {
     this.replica = replica;
     this.map = map;
+    this.anchored = anchored;
   }
 
   /**
-   * Increment the reference count.
-   *
-   * @return   The new reference count.
+   * Close the ClientMmap object.
    */
-  void ref() {
-    refCount.addAndGet(1);
-  }
-
-  /**
-   * Decrement the reference count.
-   *
-   * The parent replica gets unreferenced each time the reference count 
-   * of this object goes to 0.
-   */
-  public void unref() {
-    refCount.addAndGet(-1);
-    replica.unref();
+  @Override
+  public void close() {
+    if (replica != null) {
+      if (anchored) {
+        replica.removeNoChecksumAnchor();
+      }
+      replica.unref();
+    }
+    replica = null;
   }
 
   public MappedByteBuffer getMappedByteBuffer() {

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java?rev=1573433&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java Mon Mar  3 03:58:37 2014
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.ShortCircuitShm;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager.EndpointShmManager;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * DfsClientShm is a subclass of ShortCircuitShm which is used by the
+ * DfsClient.
+ * When the UNIX domain socket associated with this shared memory segment
+ * closes unexpectedly, we mark the slots inside this segment as stale.
+ * ShortCircuitReplica objects that contain stale slots are themselves stale,
+ * and will not be used to service new reads or mmap operations.
+ * However, in-progress read or mmap operations will continue to proceed.
+ * Once the last slot is deallocated, the segment can be safely munmapped.
+ */
+public class DfsClientShm extends ShortCircuitShm
+    implements DomainSocketWatcher.Handler {
+  /**
+   * The EndpointShmManager associated with this shared memory segment.
+   */
+  private final EndpointShmManager manager;
+
+  /**
+   * The UNIX domain socket associated with this DfsClientShm.
+   * We rely on the DomainSocketWatcher to close the socket associated with
+   * this DomainPeer when necessary.
+   */
+  private final DomainPeer peer;
+
+  /**
+   * True if this shared memory segment has lost its connection to the
+   * DataNode.
+   *
+   * {@link DfsClientShm#handle} sets this to true.
+   */
+  private boolean stale = false;
+
+  DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
+      DomainPeer peer) throws IOException {
+    super(shmId, stream);
+    this.manager = manager;
+    this.peer = peer;
+  }
+
+  public EndpointShmManager getEndpointShmManager() {
+    return manager;
+  }
+
+  public DomainPeer getPeer() {
+    return peer;
+  }
+
+  /**
+   * Determine if the shared memory segment is stale.
+   *
+   * This must be called with the DfsClientShmManager lock held.
+   *
+   * @return   True if the shared memory segment is stale.
+   */
+  public synchronized boolean isStale() {
+    return stale;
+  }
+
+  /**
+   * Handle the closure of the UNIX domain socket associated with this shared
+   * memory segment by marking this segment as stale.
+   *
+   * If there are no slots associated with this shared memory segment, it will
+   * be freed immediately in this function.
+   */
+  @Override
+  public boolean handle(DomainSocket sock) {
+    manager.unregisterShm(getShmId());
+    synchronized (this) {
+      Preconditions.checkState(!stale);
+      stale = true;
+      boolean hadSlots = false;
+      for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
+        Slot slot = iter.next();
+        slot.makeInvalid();
+        hadSlots = true;
+      }
+      if (!hadSlots) {
+        free();
+      }
+    }
+    return true;
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java?rev=1573433&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java Mon Mar  3 03:58:37 2014
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Manages short-circuit memory segments for an HDFS client.
+ * 
+ * Clients are responsible for requesting and releasing shared memory segments used
+ * for communicating with the DataNode. The client will try to allocate new slots
+ * in the set of existing segments, falling back to getting a new segment from the
+ * DataNode via {@link DataTransferProtocol#requestShortCircuitFds}.
+ * 
+ * The counterpart to this class on the DataNode is {@link ShortCircuitRegistry}.
+ * See {@link ShortCircuitRegistry} for more information on the communication protocol.
+ */
+@InterfaceAudience.Private
+public class DfsClientShmManager {
+  private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
+
+  /**
+   * Manages short-circuit memory segments that pertain to a given DataNode.
+   */
+  class EndpointShmManager {
+    /**
+     * The datanode we're managing.
+     */
+    private final DatanodeInfo datanode;
+
+    /**
+     * Shared memory segments which have no empty slots.
+     *
+     * Protected by the manager lock.
+     */
+    private final TreeMap<ShmId, DfsClientShm> full =
+        new TreeMap<ShmId, DfsClientShm>();
+
+    /**
+     * Shared memory segments which have at least one empty slot.
+     *
+     * Protected by the manager lock.
+     */
+    private final TreeMap<ShmId, DfsClientShm> notFull =
+        new TreeMap<ShmId, DfsClientShm>();
+
+    /**
+     * True if this datanode doesn't support short-circuit shared memory
+     * segments.
+     *
+     * Protected by the manager lock.
+     */
+    private boolean disabled = false;
+
+    /**
+     * True if we're in the process of loading a shared memory segment from
+     * this DataNode.
+     *
+     * Protected by the manager lock.
+     */
+    private boolean loading = false;
+
+    EndpointShmManager (DatanodeInfo datanode) {
+      this.datanode = datanode;
+    }
+
+    /**
+     * Pull a slot out of a preexisting shared memory segment.
+     *
+     * Must be called with the manager lock held.
+     *
+     * @param blockId     The blockId to put inside the Slot object.
+     *
+     * @return            null if none of our shared memory segments contain a
+     *                      free slot; the slot object otherwise.
+     */
+    private Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
+      if (notFull.isEmpty()) {
+        return null;
+      }
+      Entry<ShmId, DfsClientShm> entry = notFull.firstEntry();
+      DfsClientShm shm = entry.getValue();
+      ShmId shmId = shm.getShmId();
+      Slot slot = shm.allocAndRegisterSlot(blockId);
+      if (shm.isFull()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
+              " out of " + shm);
+        }
+        DfsClientShm removedShm = notFull.remove(shmId);
+        Preconditions.checkState(removedShm == shm);
+        full.put(shmId, shm);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
+              " out of " + shm);
+        }
+      }
+      return slot;
+    }
+
+    /**
+     * Ask the DataNode for a new shared memory segment.  This function must be
+     * called with the manager lock held.  We will release the lock while
+     * communicating with the DataNode.
+     *
+     * @param clientName    The current client name.
+     * @param peer          The peer to use to talk to the DataNode.
+     *
+     * @return              Null if the DataNode does not support shared memory
+     *                        segments, or experienced an error creating the
+     *                        shm.  The shared memory segment itself on success.
+     * @throws IOException  If there was an error communicating over the socket.
+     *                        We will not throw an IOException unless the socket
+     *                        itself (or the network) is the problem.
+     */
+    private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
+        throws IOException {
+      final DataOutputStream out = 
+          new DataOutputStream(
+              new BufferedOutputStream(peer.getOutputStream()));
+      new Sender(out).requestShortCircuitShm(clientName);
+      ShortCircuitShmResponseProto resp = 
+          ShortCircuitShmResponseProto.parseFrom(
+              PBHelper.vintPrefixed(peer.getInputStream()));
+      String error = resp.hasError() ? resp.getError() : "(unknown)";
+      switch (resp.getStatus()) {
+      case SUCCESS:
+        DomainSocket sock = peer.getDomainSocket();
+        byte buf[] = new byte[1];
+        FileInputStream fis[] = new FileInputStream[1];
+        if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
+          throw new EOFException("got EOF while trying to transfer the " +
+              "file descriptor for the shared memory segment.");
+        }
+        if (fis[0] == null) {
+          throw new IOException("the datanode " + datanode + " failed to " +
+              "pass a file descriptor for the shared memory segment.");
+        }
+        try {
+          DfsClientShm shm = 
+              new DfsClientShm(PBHelper.convert(resp.getId()),
+                  fis[0], this, peer);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": createNewShm: created " + shm);
+          }
+          return shm;
+        } finally {
+          IOUtils.cleanup(LOG,  fis[0]);
+        }
+      case ERROR_UNSUPPORTED:
+        // The DataNode just does not support short-circuit shared memory
+        // access, and we should stop asking.
+        LOG.info(this + ": datanode does not support short-circuit " +
+            "shared memory access: " + error);
+        disabled = true;
+        return null;
+      default:
+        // The datanode experienced some kind of unexpected error when trying to
+        // create the short-circuit shared memory segment.
+        LOG.warn(this + ": error requesting short-circuit shared memory " +
+            "access: " + error);
+        return null;
+      }
+    }
+
+    /**
+     * Allocate a new shared memory slot connected to this datanode.
+     *
+     * Must be called with the EndpointShmManager lock held.
+     *
+     * @param peer          The peer to use to talk to the DataNode.
+     * @param clientName    The client name.
+     * @param usedPeer      (out param) Will be set to true if we used the peer.
+     *                        When a peer is used
+     *
+     * @return              null if the DataNode does not support shared memory
+     *                        segments, or experienced an error creating the
+     *                        shm.  The shared memory segment itself on success.
+     * @throws IOException  If there was an error communicating over the socket.
+     */
+    Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
+        String clientName, ExtendedBlockId blockId) throws IOException {
+      while (true) {
+        if (disabled) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": shared memory segment access is disabled.");
+          }
+          return null;
+        }
+        // Try to use an existing slot.
+        Slot slot = allocSlotFromExistingShm(blockId);
+        if (slot != null) {
+          return slot;
+        }
+        // There are no free slots.  If someone is loading more slots, wait
+        // for that to finish.
+        if (loading) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": waiting for loading to finish...");
+          }
+          finishedLoading.awaitUninterruptibly();
+        } else {
+          // Otherwise, load the slot ourselves.
+          loading = true;
+          lock.unlock();
+          DfsClientShm shm;
+          try {
+            shm = requestNewShm(clientName, peer);
+            if (shm == null) continue;
+            // See #{DfsClientShmManager#domainSocketWatcher} for details
+            // about why we do this before retaking the manager lock.
+            domainSocketWatcher.add(peer.getDomainSocket(), shm);
+            // The DomainPeer is now our responsibility, and should not be
+            // closed by the caller.
+            usedPeer.setValue(true);
+          } finally {
+            lock.lock();
+            loading = false;
+            finishedLoading.signalAll();
+          }
+          if (shm.isStale()) {
+            // If the peer closed immediately after the shared memory segment
+            // was created, the DomainSocketWatcher callback might already have
+            // fired and marked the shm as stale.  In this case, we obviously
+            // don't want to add the SharedMemorySegment to our list of valid
+            // not-full segments.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(this + ": the UNIX domain socket associated with " +
+                  "this short-circuit memory closed before we could make " +
+                  "use of the shm.");
+            }
+          } else {
+            notFull.put(shm.getShmId(), shm);
+          }
+        }
+      }
+    }
+    
+    /**
+     * Stop tracking a slot.
+     *
+     * Must be called with the EndpointShmManager lock held.
+     *
+     * @param slot          The slot to release.
+     */
+    void freeSlot(Slot slot) {
+      DfsClientShm shm = (DfsClientShm)slot.getShm();
+      shm.unregisterSlot(slot.getSlotIdx());
+      if (shm.isStale()) {
+        // Stale shared memory segments should not be tracked here.
+        Preconditions.checkState(!full.containsKey(shm.getShmId()));
+        Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
+        if (shm.isEmpty()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": freeing empty stale " + shm);
+          }
+          shm.free();
+        }
+      } else {
+        ShmId shmId = shm.getShmId();
+        full.remove(shmId); // The shm can't be full if we just freed a slot.
+        if (shm.isEmpty()) {
+          notFull.remove(shmId);
+  
+          // If the shared memory segment is now empty, we call shutdown(2) on
+          // the UNIX domain socket associated with it.  The DomainSocketWatcher,
+          // which is watching this socket, will call DfsClientShm#handle,
+          // cleaning up this shared memory segment.
+          //
+          // See #{DfsClientShmManager#domainSocketWatcher} for details about why
+          // we don't want to call DomainSocketWatcher#remove directly here.
+          //
+          // Note that we could experience 'fragmentation' here, where the
+          // DFSClient allocates a bunch of slots in different shared memory
+          // segments, and then frees most of them, but never fully empties out
+          // any segment.  We make some attempt to avoid this fragmentation by
+          // always allocating new slots out of the shared memory segment with the
+          // lowest ID, but it could still occur.  In most workloads,
+          // fragmentation should not be a major concern, since it doesn't impact
+          // peak file descriptor usage or the speed of allocation.
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": shutting down UNIX domain socket for " +
+                "empty " + shm);
+          }
+          shutdown(shm);
+        } else {
+          notFull.put(shmId, shm);
+        }
+      }
+    }
+    
+    /**
+     * Unregister a shared memory segment.
+     *
+     * Once a segment is unregistered, we will not allocate any more slots
+     * inside that segment.
+     *
+     * The DomainSocketWatcher calls this while holding the DomainSocketWatcher
+     * lock.
+     *
+     * @param shmId         The ID of the shared memory segment to unregister.
+     */
+    void unregisterShm(ShmId shmId) {
+      lock.lock();
+      try {
+        full.remove(shmId);
+        notFull.remove(shmId);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return String.format("EndpointShmManager(%s, parent=%s)",
+          datanode, DfsClientShmManager.this);
+    }
+
+    PerDatanodeVisitorInfo getVisitorInfo() {
+      return new PerDatanodeVisitorInfo(full, notFull, disabled);
+    }
+
+    final void shutdown(DfsClientShm shm) {
+      try {
+        shm.getPeer().getDomainSocket().shutdown();
+      } catch (IOException e) {
+        LOG.warn(this + ": error shutting down shm: got IOException calling " +
+            "shutdown(SHUT_RDWR)", e);
+      }
+    }
+  }
+
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * A condition variable which is signalled when we finish loading a segment
+   * from the Datanode.
+   */
+  private final Condition finishedLoading = lock.newCondition();
+
+  /**
+   * Information about each Datanode.
+   */
+  private final HashMap<DatanodeInfo, EndpointShmManager> datanodes =
+      new HashMap<DatanodeInfo, EndpointShmManager>(1);
+  
+  /**
+   * The DomainSocketWatcher which keeps track of the UNIX domain socket
+   * associated with each shared memory segment.
+   *
+   * Note: because the DomainSocketWatcher makes callbacks into this
+   * DfsClientShmManager object, you must MUST NOT attempt to take the
+   * DomainSocketWatcher lock while holding the DfsClientShmManager lock,
+   * or else deadlock might result.   This means that most DomainSocketWatcher
+   * methods are off-limits unless you release the manager lock first.
+   */
+  private final DomainSocketWatcher domainSocketWatcher;
+  
+  DfsClientShmManager(int interruptCheckPeriodMs) throws IOException {
+    this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs);
+  }
+  
+  public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
+      MutableBoolean usedPeer, ExtendedBlockId blockId,
+      String clientName) throws IOException {
+    lock.lock();
+    try {
+      EndpointShmManager shmManager = datanodes.get(datanode);
+      if (shmManager == null) {
+        shmManager = new EndpointShmManager(datanode);
+        datanodes.put(datanode, shmManager);
+      }
+      return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  public void freeSlot(Slot slot) {
+    lock.lock();
+    try {
+      DfsClientShm shm = (DfsClientShm)slot.getShm();
+      shm.getEndpointShmManager().freeSlot(slot);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  public static class PerDatanodeVisitorInfo {
+    public final TreeMap<ShmId, DfsClientShm> full;
+    public final TreeMap<ShmId, DfsClientShm> notFull;
+    public final boolean disabled;
+
+    PerDatanodeVisitorInfo(TreeMap<ShmId, DfsClientShm> full,
+        TreeMap<ShmId, DfsClientShm> notFull, boolean disabled) {
+      this.full = full;
+      this.notFull = notFull;
+      this.disabled = disabled;
+    }
+  }
+
+  @VisibleForTesting
+  public interface Visitor {
+    void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+        throws IOException;
+  }
+
+  @VisibleForTesting
+  public void visit(Visitor visitor) throws IOException {
+    lock.lock();
+    try {
+      HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = 
+          new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>();
+      for (Entry<DatanodeInfo, EndpointShmManager> entry :
+            datanodes.entrySet()) {
+        info.put(entry.getKey(), entry.getValue().getVisitorInfo());
+      }
+      visitor.visit(info);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("ShortCircuitShmManager(%08x)",
+        System.identityHashCode(this));
+  }
+}



Mime
View raw message