hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1573821 [1/2] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/datatr...
Date Mon, 03 Mar 2014 23:52:00 GMT
Author: szetszwo
Date: Mon Mar  3 23:51:58 2014
New Revision: 1573821

URL: http://svn.apache.org/r1573821
Log:
Merge r1569890 through r1573813 from trunk.

Added:
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java
      - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java
      - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
      - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
      - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java
      - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java
Removed:
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitSharedMemorySegment.java
Modified:
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
    hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java

Propchange: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1573120-1573813

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Mar  3 23:51:58 2014
@@ -370,6 +370,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
     (suresh)
 
+    HDFS-5950. The DFSClient and DataNode should use shared memory segments to
+    communicate short-circuit information. (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
@@ -514,6 +517,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5956. A file size is multiplied by the replication factor in 'hdfs oiv
     -p FileDistribution' option. (Akira Ajisaka via wheat9)
 
+    HDFS-5866. '-maxSize' and '-step' option fail in OfflineImageViewer.
+    (Akira Ajisaka via wheat9)
+
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

Propchange: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1573120-1573813

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Mon Mar  3 23:51:58 2014
@@ -23,7 +23,6 @@ import java.util.EnumSet;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
  * A BlockReader is responsible for reading a single block

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Mon Mar  3 23:51:58 2014
@@ -24,6 +24,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,6 +33,8 @@ import org.apache.hadoop.hdfs.client.Sho
 import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -410,7 +413,6 @@ public class BlockReaderFactory implemen
         setBlock(block).
         setStartOffset(startOffset).
         setShortCircuitReplica(info.getReplica()).
-        setDatanodeID(datanode).
         setVerifyChecksum(verifyChecksum).
         setCachingStrategy(cachingStrategy).
         build();
@@ -438,12 +440,31 @@ public class BlockReaderFactory implemen
     while (true) {
       curPeer = nextDomainPeer();
       if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
       DomainPeer peer = (DomainPeer)curPeer.peer;
+      Slot slot = null;
+      ShortCircuitCache cache = clientContext.getShortCircuitCache();
       try {
-        ShortCircuitReplicaInfo info = requestFileDescriptors(peer);
+        MutableBoolean usedPeer = new MutableBoolean(false);
+        slot = cache.allocShmSlot(datanode, peer, usedPeer,
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
+            clientName);
+        if (usedPeer.booleanValue()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": allocShmSlot used up our previous socket " +
+              peer.getDomainSocket() + ".  Allocating a new one...");
+          }
+          curPeer = nextDomainPeer();
+          if (curPeer == null) break;
+          peer = (DomainPeer)curPeer.peer;
+        }
+        ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
         clientContext.getPeerCache().put(datanode, peer);
         return info;
       } catch (IOException e) {
+        if (slot != null) {
+          cache.freeSlot(slot);
+        }
         if (curPeer.fromCache) {
           // Handle an I/O error we got when using a cached socket.
           // These are considered less serious, because the socket may be stale.
@@ -470,16 +491,22 @@ public class BlockReaderFactory implemen
   /**
    * Request file descriptors from a DomainPeer.
    *
+   * @param peer   The peer to use for communication.
+   * @param slot   If non-null, the shared memory slot to associate with the 
+   *               new ShortCircuitReplica.
+   * 
    * @return  A ShortCircuitReplica object if we could communicate with the
    *          datanode; null, otherwise. 
    * @throws  IOException If we encountered an I/O exception while communicating
    *          with the datanode.
    */
-  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer)
-        throws IOException {
+  private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
+          Slot slot) throws IOException {
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
-    new Sender(out).requestShortCircuitFds(block, token, 1);
+    SlotId slotId = slot == null ? null : slot.getSlotId();
+    new Sender(out).requestShortCircuitFds(block, token, slotId, 1);
     DataInputStream in = new DataInputStream(peer.getInputStream());
     BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
         PBHelper.vintPrefixed(in));
@@ -491,9 +518,10 @@ public class BlockReaderFactory implemen
       sock.recvFileInputStreams(fis, buf, 0, buf.length);
       ShortCircuitReplica replica = null;
       try {
-        ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-        replica = new ShortCircuitReplica(key, fis[0], fis[1],
-            clientContext.getShortCircuitCache(), Time.monotonicNow());
+        ExtendedBlockId key =
+            new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+        replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
+            Time.monotonicNow(), slot);
       } catch (IOException e) {
         // This indicates an error reading from disk, or a format error.  Since
         // it's not a socket communication problem, we return null rather than
@@ -527,8 +555,9 @@ public class BlockReaderFactory implemen
       }
       return new ShortCircuitReplicaInfo(new InvalidToken(msg));
     default:
-      LOG.warn(this + "unknown response code " + resp.getStatus() + " while " +
-          "attempting to set up short-circuit access. " + resp.getMessage());
+      LOG.warn(this + ": unknown response code " + resp.getStatus() +
+          " while attempting to set up short-circuit access. " +
+          resp.getMessage());
       clientContext.getDomainSocketFactory()
           .disableShortCircuitForPath(pathInfo.getPath());
       return null;
@@ -565,6 +594,7 @@ public class BlockReaderFactory implemen
     while (true) {
       BlockReaderPeer curPeer = nextDomainPeer();
       if (curPeer == null) break;
+      if (curPeer.fromCache) remainingCacheTries--;
       DomainPeer peer = (DomainPeer)curPeer.peer;
       BlockReader blockReader = null;
       try {
@@ -630,6 +660,7 @@ public class BlockReaderFactory implemen
       try {
         curPeer = nextTcpPeer();
         if (curPeer == null) break;
+        if (curPeer.fromCache) remainingCacheTries--;
         peer = curPeer.peer;
         blockReader = getRemoteBlockReader(peer);
         return blockReader;
@@ -662,7 +693,7 @@ public class BlockReaderFactory implemen
     return null;
   }
 
-  private static class BlockReaderPeer {
+  public static class BlockReaderPeer {
     final Peer peer;
     final boolean fromCache;
     
@@ -681,7 +712,6 @@ public class BlockReaderFactory implemen
     if (remainingCacheTries > 0) {
       Peer peer = clientContext.getPeerCache().get(datanode, true);
       if (peer != null) {
-        remainingCacheTries--;
         if (LOG.isTraceEnabled()) {
           LOG.trace("nextDomainPeer: reusing existing peer " + peer);
         }
@@ -706,7 +736,6 @@ public class BlockReaderFactory implemen
     if (remainingCacheTries > 0) {
       Peer peer = clientContext.getPeerCache().get(datanode, false);
       if (peer != null) {
-        remainingCacheTries--;
         if (LOG.isTraceEnabled()) {
           LOG.trace("nextTcpPeer: reusing existing peer " + peer);
         }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Mon Mar  3 23:51:58 2014
@@ -17,26 +17,21 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.EnumSet;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -70,8 +65,6 @@ class BlockReaderLocal implements BlockR
     private String filename;
     private ShortCircuitReplica replica;
     private long dataPos;
-    private DatanodeID datanodeID;
-    private boolean mlocked;
     private ExtendedBlock block;
 
     public Builder(Conf conf) {
@@ -108,16 +101,6 @@ class BlockReaderLocal implements BlockR
       return this;
     }
 
-    public Builder setDatanodeID(DatanodeID datanodeID) {
-      this.datanodeID = datanodeID;
-      return this;
-    }
-
-    public Builder setMlocked(boolean mlocked) {
-      this.mlocked = mlocked;
-      return this;
-    }
-
     public Builder setBlock(ExtendedBlock block) {
       this.block = block;
       return this;
@@ -165,19 +148,9 @@ class BlockReaderLocal implements BlockR
   private final boolean verifyChecksum;
 
   /**
-   * If true, this block is mlocked on the DataNode.
-   */
-  private final AtomicBoolean mlocked;
-
-  /**
    * Name of the block, for logging purposes.
    */
   private final String filename;
-
-  /**
-   * DataNode which contained this block.
-   */
-  private final DatanodeID datanodeID;
   
   /**
    * Block ID and Block Pool ID.
@@ -220,8 +193,6 @@ class BlockReaderLocal implements BlockR
    */
   private int maxReadaheadLength;
 
-  private ClientMmap clientMmap;
-
   /**
    * Buffers data starting at the current dataPos and extending on
    * for dataBuf.limit().
@@ -247,9 +218,7 @@ class BlockReaderLocal implements BlockR
     this.checksum = header.getChecksum();
     this.verifyChecksum = builder.verifyChecksum &&
         (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
-    this.mlocked = new AtomicBoolean(builder.mlocked);
     this.filename = builder.filename;
-    this.datanodeID = builder.datanodeID;
     this.block = builder.block;
     this.bytesPerChecksum = checksum.getBytesPerChecksum();
     this.checksumSize = checksum.getChecksumSize();
@@ -380,42 +349,55 @@ class BlockReaderLocal implements BlockR
     return total;
   }
 
-  private boolean getCanSkipChecksum() {
-    return (!verifyChecksum) || mlocked.get();
+  private boolean createNoChecksumContext() {
+    if (verifyChecksum) {
+      return replica.addNoChecksumAnchor();
+    } else {
+      return true;
+    }
   }
-  
+
+  private void releaseNoChecksumContext() {
+    if (verifyChecksum) {
+      replica.removeNoChecksumAnchor();
+    }
+  }
+
   @Override
   public synchronized int read(ByteBuffer buf) throws IOException {
-    boolean canSkipChecksum = getCanSkipChecksum();
-    
-    String traceString = null;
-    if (LOG.isTraceEnabled()) {
-      traceString = new StringBuilder().
-          append("read(").
-          append("buf.remaining=").append(buf.remaining()).
-          append(", block=").append(block).
-          append(", filename=").append(filename).
-          append(", canSkipChecksum=").append(canSkipChecksum).
-          append(")").toString();
-      LOG.info(traceString + ": starting");
-    }
-    int nRead;
+    boolean canSkipChecksum = createNoChecksumContext();
     try {
-      if (canSkipChecksum && zeroReadaheadRequested) {
-        nRead = readWithoutBounceBuffer(buf);
-      } else {
-        nRead = readWithBounceBuffer(buf, canSkipChecksum);
+      String traceString = null;
+      if (LOG.isTraceEnabled()) {
+        traceString = new StringBuilder().
+            append("read(").
+            append("buf.remaining=").append(buf.remaining()).
+            append(", block=").append(block).
+            append(", filename=").append(filename).
+            append(", canSkipChecksum=").append(canSkipChecksum).
+            append(")").toString();
+        LOG.info(traceString + ": starting");
+      }
+      int nRead;
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(buf);
+        } else {
+          nRead = readWithBounceBuffer(buf, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.info(traceString + ": I/O error", e);
+        }
+        throw e;
       }
-    } catch (IOException e) {
       if (LOG.isTraceEnabled()) {
-        LOG.info(traceString + ": I/O error", e);
+        LOG.info(traceString + ": returning " + nRead);
       }
-      throw e;
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.info(traceString + ": returning " + nRead);
+      return nRead;
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
     }
-    return nRead;
   }
 
   private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
@@ -531,34 +513,38 @@ class BlockReaderLocal implements BlockR
   @Override
   public synchronized int read(byte[] arr, int off, int len)
         throws IOException {
-    boolean canSkipChecksum = getCanSkipChecksum();
-    String traceString = null;
-    if (LOG.isTraceEnabled()) {
-      traceString = new StringBuilder().
-          append("read(arr.length=").append(arr.length).
-          append(", off=").append(off).
-          append(", len=").append(len).
-          append(", filename=").append(filename).
-          append(", block=").append(block).
-          append(", canSkipChecksum=").append(canSkipChecksum).
-          append(")").toString();
-      LOG.trace(traceString + ": starting");
-    }
+    boolean canSkipChecksum = createNoChecksumContext();
     int nRead;
     try {
-      if (canSkipChecksum && zeroReadaheadRequested) {
-        nRead = readWithoutBounceBuffer(arr, off, len);
-      } else {
-        nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+      String traceString = null;
+      if (LOG.isTraceEnabled()) {
+        traceString = new StringBuilder().
+            append("read(arr.length=").append(arr.length).
+            append(", off=").append(off).
+            append(", len=").append(len).
+            append(", filename=").append(filename).
+            append(", block=").append(block).
+            append(", canSkipChecksum=").append(canSkipChecksum).
+            append(")").toString();
+        LOG.trace(traceString + ": starting");
+      }
+      try {
+        if (canSkipChecksum && zeroReadaheadRequested) {
+          nRead = readWithoutBounceBuffer(arr, off, len);
+        } else {
+          nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(traceString + ": I/O error", e);
+        }
+        throw e;
       }
-    } catch (IOException e) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace(traceString + ": I/O error", e);
+        LOG.trace(traceString + ": returning " + nRead);
       }
-      throw e;
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(traceString + ": returning " + nRead);
+    } finally {
+      if (canSkipChecksum) releaseNoChecksumContext();
     }
     return nRead;
   }
@@ -648,28 +634,45 @@ class BlockReaderLocal implements BlockR
     return true;
   }
 
+  /**
+   * Get or create a memory map for this replica.
+   * 
+   * There are two kinds of ClientMmap objects we could fetch here: one that 
+   * will always read pre-checksummed data, and one that may read data that
+   * hasn't been checksummed.
+   *
+   * If we fetch the former, "safe" kind of ClientMmap, we have to increment
+   * the anchor count on the shared memory slot.  This will tell the DataNode
+   * not to munlock the block until this ClientMmap is closed.
+   * If we fetch the latter, we don't bother with anchoring.
+   *
+   * @param opts     The options to use, such as SKIP_CHECKSUMS.
+   * 
+   * @return         null on failure; the ClientMmap otherwise.
+   */
   @Override
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
-    if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) &&
-          verifyChecksum && (!mlocked.get())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("can't get an mmap for " + block + " of " + filename + 
-            " since SKIP_CHECKSUMS was not given, " +
-            "we aren't skipping checksums, and the block is not mlocked.");
+    boolean anchor = verifyChecksum &&
+        (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
+    if (anchor) {
+      if (!createNoChecksumContext()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("can't get an mmap for " + block + " of " + filename + 
+              " since SKIP_CHECKSUMS was not given, " +
+              "we aren't skipping checksums, and the block is not mlocked.");
+        }
+        return null;
       }
-      return null;
     }
-    return replica.getOrCreateClientMmap();
-  }
-
-  /**
-   * Set the mlocked state of the BlockReader.
-   * This method does NOT need to be synchronized because mlocked is atomic.
-   *
-   * @param mlocked  the new mlocked state of the BlockReader.
-   */
-  public void setMlocked(boolean mlocked) {
-    this.mlocked.set(mlocked);
+    ClientMmap clientMmap = null;
+    try {
+      clientMmap = replica.getOrCreateClientMmap(anchor);
+    } finally {
+      if ((clientMmap == null) && anchor) {
+        releaseNoChecksumContext();
+      }
+    }
+    return clientMmap;
   }
   
   @VisibleForTesting
@@ -681,4 +684,22 @@ class BlockReaderLocal implements BlockR
   int getMaxReadaheadLength() {
     return this.maxReadaheadLength;
   }
+  
+  /**
+   * Make the replica anchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceAnchorable() {
+    replica.getSlot().makeAnchorable();
+  }
+
+  /**
+   * Make the replica unanchorable.  Normally this can only be done by the
+   * DataNode.  This method is only for testing.
+   */
+  @VisibleForTesting
+  void forceUnanchorable() {
+    replica.getSlot().makeUnanchorable();
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java Mon Mar  3 23:51:58 2014
@@ -99,7 +99,8 @@ public class ClientContext {
         conf.shortCircuitMmapCacheSize,
         conf.shortCircuitMmapCacheExpiryMs,
         conf.shortCircuitMmapCacheRetryTimeout,
-        conf.shortCircuitCacheStaleThresholdMs);
+        conf.shortCircuitCacheStaleThresholdMs,
+        conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
     this.peerCache =
           new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
     this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
@@ -129,7 +130,9 @@ public class ClientContext {
       append(", useLegacyBlockReaderLocal = ").
       append(conf.useLegacyBlockReaderLocal).
       append(", domainSocketDataTraffic = ").
-      append(conf.domainSocketDataTraffic);
+      append(conf.domainSocketDataTraffic).
+      append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
+      append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
 
     return builder.toString();
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Mon Mar  3 23:51:58 2014
@@ -282,6 +282,7 @@ public class DFSClient implements java.i
     final boolean domainSocketDataTraffic;
     final int shortCircuitStreamsCacheSize;
     final long shortCircuitStreamsCacheExpiryMs; 
+    final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
     
     final int shortCircuitMmapCacheSize;
     final long shortCircuitMmapCacheExpiryMs;
@@ -414,6 +415,9 @@ public class DFSClient implements java.i
       shortCircuitCacheStaleThresholdMs = conf.getLong(
           DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
           DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
+      shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
+          DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
+          DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
 
       datanodeRestartTimeout = conf.getLong(
           DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Mar  3 23:51:58 2014
@@ -477,6 +477,10 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup";
   public static final String  DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file";
   public static final String  DFS_DATANODE_USER_NAME_KEY = "dfs.datanode.kerberos.principal";
+  public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH = "dfs.datanode.shared.file.descriptor.path";
+  public static final String  DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT = "/dev/shm";
+  public static final String  DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
+  public static final int     DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000;
   public static final String  DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
   public static final String  DFS_NAMENODE_USER_NAME_KEY = "dfs.namenode.kerberos.principal";
   public static final String  DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.namenode.kerberos.internal.spnego.principal";

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Mar  3 23:51:58 2014
@@ -39,6 +39,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferUtil;
@@ -1630,7 +1631,7 @@ implements ByteBufferReadable, CanSetDro
       success = true;
     } finally {
       if (!success) {
-        clientMmap.unref();
+        IOUtils.closeQuietly(clientMmap);
       }
     }
     return buffer;
@@ -1644,7 +1645,7 @@ implements ByteBufferReadable, CanSetDro
           "that was not created by this stream, " + buffer);
     }
     if (val instanceof ClientMmap) {
-      ((ClientMmap)val).unref();
+      IOUtils.closeQuietly((ClientMmap)val);
     } else if (val instanceof ByteBufferPool) {
       ((ByteBufferPool)val).putBuffer(buffer);
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java Mon Mar  3 23:51:58 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * An immutable key which identifies a block.
@@ -34,6 +35,10 @@ final public class ExtendedBlockId {
    */
   private final String bpId;
 
+  public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) {
+    return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
+  }
+  
   public ExtendedBlockId(long blockId, String bpId) {
     this.blockId = blockId;
     this.bpId = bpId;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java Mon Mar  3 23:51:58 2014
@@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.security.UserGroupInformation;
 
 public interface RemotePeerFactory {
   /**

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Mon Mar  3 23:51:58 2014
@@ -19,26 +19,23 @@ package org.apache.hadoop.hdfs.client;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
+import java.io.Closeable;
 import java.nio.MappedByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * A memory-mapped region used by an HDFS client.
- * 
- * This class includes a reference count and some other information used by
- * ClientMmapManager to track and cache mmaps.
+ * A reference to a memory-mapped region used by an HDFS client.
  */
 @InterfaceAudience.Private
-public class ClientMmap {
+public class ClientMmap implements Closeable {
   static final Log LOG = LogFactory.getLog(ClientMmap.class);
   
   /**
    * A reference to the block replica which this mmap relates to.
    */
-  private final ShortCircuitReplica replica;
+  private ShortCircuitReplica replica;
   
   /**
    * The java ByteBuffer object.
@@ -46,33 +43,30 @@ public class ClientMmap {
   private final MappedByteBuffer map;
 
   /**
-   * Reference count of this ClientMmap object.
+   * Whether or not this ClientMmap anchors the replica into memory while
+   * it exists.  Closing an anchored ClientMmap unanchors the replica.
    */
-  private final AtomicInteger refCount = new AtomicInteger(1);
+  private final boolean anchored;
 
-  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map) {
+  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map,
+      boolean anchored) {
     this.replica = replica;
     this.map = map;
+    this.anchored = anchored;
   }
 
   /**
-   * Increment the reference count.
-   *
-   * @return   The new reference count.
+   * Close the ClientMmap object.
    */
-  void ref() {
-    refCount.addAndGet(1);
-  }
-
-  /**
-   * Decrement the reference count.
-   *
-   * The parent replica gets unreferenced each time the reference count 
-   * of this object goes to 0.
-   */
-  public void unref() {
-    refCount.addAndGet(-1);
-    replica.unref();
+  @Override
+  public void close() {
+    if (replica != null) {
+      if (anchored) {
+        replica.removeNoChecksumAnchor();
+      }
+      replica.unref();
+    }
+    replica = null;
   }
 
   public MappedByteBuffer getMappedByteBuffer() {

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java Mon Mar  3 23:51:58 2014
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hdfs.client;
 
+import java.io.BufferedOutputStream;
 import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
@@ -33,14 +36,23 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -154,6 +166,69 @@ public class ShortCircuitCache implement
     }
   }
 
+  /**
+   * A task which asks the DataNode to release a short-circuit shared memory
+   * slot.  If successful, this will tell the DataNode to stop monitoring
+   * changes to the mlock status of the replica associated with the slot.
+   * It will also allow us (the client) to re-use this slot for another
+   * replica.  If we can't communicate with the DataNode for some reason,
+   * we tear down the shared memory segment to avoid being in an inconsistent
+   * state.
+   */
+  private class SlotReleaser implements Runnable {
+    /**
+     * The slot that we need to release.
+     */
+    private final Slot slot;
+
+    SlotReleaser(Slot slot) {
+      this.slot = slot;
+    }
+
+    @Override
+    public void run() {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
+      }
+      final DfsClientShm shm = (DfsClientShm)slot.getShm();
+      final DomainSocket shmSock = shm.getPeer().getDomainSocket();
+      DomainSocket sock = null;
+      DataOutputStream out = null;
+      final String path = shmSock.getPath();
+      boolean success = false;
+      try {
+        sock = DomainSocket.connect(path);
+        out = new DataOutputStream(
+            new BufferedOutputStream(sock.getOutputStream()));
+        new Sender(out).releaseShortCircuitFds(slot.getSlotId());
+        DataInputStream in = new DataInputStream(sock.getInputStream());
+        ReleaseShortCircuitAccessResponseProto resp =
+            ReleaseShortCircuitAccessResponseProto.parseFrom(
+                PBHelper.vintPrefixed(in));
+        if (resp.getStatus() != Status.SUCCESS) {
+          String error = resp.hasError() ? resp.getError() : "(unknown)";
+          throw new IOException(resp.getStatus().toString() + ": " + error);
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(ShortCircuitCache.this + ": released " + slot);
+        }
+        success = true;
+      } catch (IOException e) {
+        LOG.error(ShortCircuitCache.this + ": failed to release " +
+            "short-circuit shared memory slot " + slot + " by sending " +
+            "ReleaseShortCircuitAccessRequestProto to " + path +
+            ".  Closing shared memory segment.", e);
+      } finally {
+        if (success) {
+          shmManager.freeSlot(slot);
+        } else {
+          shm.getEndpointShmManager().shutdown(shm);
+        }
+        IOUtils.cleanup(LOG, sock, out);
+      }
+    }
+  }
+
   public interface ShortCircuitReplicaCreator {
     /**
      * Attempt to create a ShortCircuitReplica object.
@@ -173,9 +248,17 @@ public class ShortCircuitCache implement
   /**
    * The executor service that runs the cacheCleaner.
    */
-  private final ScheduledThreadPoolExecutor executor
+  private final ScheduledThreadPoolExecutor cleanerExecutor
+  = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner").
+          build());
+
+  /**
+   * The executor service that runs the cacheCleaner.
+   */
+  private final ScheduledThreadPoolExecutor releaserExecutor
       = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
-          setDaemon(true).setNameFormat("ShortCircuitCache Cleaner").
+          setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser").
           build());
 
   /**
@@ -253,6 +336,11 @@ public class ShortCircuitCache implement
   private int outstandingMmapCount = 0;
 
   /**
+   * Manages short-circuit shared memory segments for the client.
+   */
+  private final DfsClientShmManager shmManager;
+
+  /**
    * Create a {@link ShortCircuitCache} object from a {@link Configuration}
    */
   public static ShortCircuitCache fromConf(Configuration conf) {
@@ -268,12 +356,14 @@ public class ShortCircuitCache implement
         conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
             DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT),
         conf.getLong(DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
-            DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT));
+            DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT),
+        conf.getInt(DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
+            DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT));
   }
 
   public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
       int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
-      long mmapRetryTimeoutMs, long staleThresholdMs) {
+      long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) {
     Preconditions.checkArgument(maxTotalSize >= 0);
     this.maxTotalSize = maxTotalSize;
     Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0);
@@ -284,6 +374,15 @@ public class ShortCircuitCache implement
     this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs;
     this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
     this.staleThresholdMs = staleThresholdMs;
+    DfsClientShmManager shmManager = null;
+    if (shmInterruptCheckMs > 0) {
+      try {
+        shmManager = new DfsClientShmManager(shmInterruptCheckMs);
+      } catch (IOException e) {
+        LOG.error("failed to create ShortCircuitShmManager", e);
+      }
+    }
+    this.shmManager = shmManager;
   }
 
   public long getMmapRetryTimeoutMs() {
@@ -339,7 +438,14 @@ public class ShortCircuitCache implement
   void unref(ShortCircuitReplica replica) {
     lock.lock();
     try {
+      // If the replica is stale, but we haven't purged it yet, let's do that.
+      // It would be a shame to evict a non-stale replica so that we could put
+      // a stale one into the cache.
+      if ((!replica.purged) && replica.isStale()) {
+        purge(replica);
+      }
       String addedString = "";
+      boolean shouldTrimEvictionMaps = false;
       int newRefCount = --replica.refCount;
       if (newRefCount == 0) {
         // Close replica, since there are no remaining references to it.
@@ -362,7 +468,7 @@ public class ShortCircuitCache implement
             insertEvictable(System.nanoTime(), replica, evictable);
             addedString = "added to evictable, ";
           }
-          trimEvictionMaps();
+          shouldTrimEvictionMaps = true;
         }
       } else {
         Preconditions.checkArgument(replica.refCount >= 0,
@@ -375,6 +481,9 @@ public class ShortCircuitCache implement
             (newRefCount + 1) + " -> " + newRefCount +
             StringUtils.getStackTrace(Thread.currentThread()));
       }
+      if (shouldTrimEvictionMaps) {
+        trimEvictionMaps();
+      }
     } finally {
       lock.unlock();
     }
@@ -442,7 +551,7 @@ public class ShortCircuitCache implement
        replica = evictable.firstEntry().getValue();
       }
       if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": trimEvictionMaps is purging " +
+        LOG.trace(this + ": trimEvictionMaps is purging " + replica +
           StringUtils.getStackTrace(Thread.currentThread()));
       }
       purge(replica);
@@ -542,7 +651,7 @@ public class ShortCircuitCache implement
     }
     if (LOG.isTraceEnabled()) {
       StringBuilder builder = new StringBuilder();
-      builder.append(this).append(": ").append(": removed ").
+      builder.append(this).append(": ").append(": purged ").
           append(replica).append(" from the cache.");
       if (removedFromInfoMap) {
         builder.append("  Removed from the replicaInfoMap.");
@@ -706,7 +815,7 @@ public class ShortCircuitCache implement
       cacheCleaner = new CacheCleaner();
       long rateMs = cacheCleaner.getRateInMs();
       ScheduledFuture<?> future =
-          executor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
+          cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
               TimeUnit.MILLISECONDS);
       cacheCleaner.setFuture(future);
       if (LOG.isDebugEnabled()) {
@@ -716,16 +825,16 @@ public class ShortCircuitCache implement
     }
   }
 
-  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica) {
+  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
+      boolean anchored) {
     Condition newCond;
     lock.lock();
     try {
       while (replica.mmapData != null) {
-        if (replica.mmapData instanceof ClientMmap) {
+        if (replica.mmapData instanceof MappedByteBuffer) {
           ref(replica);
-          ClientMmap clientMmap = (ClientMmap)replica.mmapData;
-          clientMmap.ref();
-          return clientMmap;
+          MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData;
+          return new ClientMmap(replica, mmap, anchored);
         } else if (replica.mmapData instanceof Long) {
           long lastAttemptTimeMs = (Long)replica.mmapData;
           long delta = Time.monotonicNow() - lastAttemptTimeMs;
@@ -762,12 +871,11 @@ public class ShortCircuitCache implement
         newCond.signalAll();
         return null;
       } else {
-        ClientMmap clientMmap = new ClientMmap(replica, map);
         outstandingMmapCount++;
-        replica.mmapData = clientMmap;
+        replica.mmapData = map;
         ref(replica);
         newCond.signalAll();
-        return clientMmap;
+        return new ClientMmap(replica, map, anchored);
       }
     } finally {
       lock.unlock();
@@ -878,4 +986,58 @@ public class ShortCircuitCache implement
     return "ShortCircuitCache(0x" +
         Integer.toHexString(System.identityHashCode(this)) + ")";
   }
+
+  /**
+   * Allocate a new shared memory slot.
+   *
+   * @param datanode       The datanode to allocate a shm slot with.
+   * @param peer           A peer connected to the datanode.
+   * @param usedPeer       Will be set to true if we use up the provided peer.
+   * @param blockId        The block id and block pool id of the block we're 
+   *                         allocating this slot for.
+   * @param clientName     The name of the DFSClient allocating the shared
+   *                         memory.
+   * @return               Null if short-circuit shared memory is disabled;
+   *                         a short-circuit memory slot otherwise.
+   * @throws IOException   An exception if there was an error talking to 
+   *                         the datanode.
+   */
+  public Slot allocShmSlot(DatanodeInfo datanode,
+        DomainPeer peer, MutableBoolean usedPeer,
+        ExtendedBlockId blockId, String clientName) throws IOException {
+    if (shmManager != null) {
+      return shmManager.allocSlot(datanode, peer, usedPeer,
+          blockId, clientName);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Free a slot immediately.
+   *
+   * ONLY use this if the DataNode is not yet aware of the slot.
+   * 
+   * @param slot           The slot to free.
+   */
+  public void freeSlot(Slot slot) {
+    Preconditions.checkState(shmManager != null);
+    slot.makeInvalid();
+    shmManager.freeSlot(slot);
+  }
+  
+  /**
+   * Schedule a shared memory slot to be released.
+   *
+   * @param slot           The slot to release.
+   */
+  public void scheduleSlotReleaser(Slot slot) {
+    Preconditions.checkState(shmManager != null);
+    releaserExecutor.execute(new SlotReleaser(slot));
+  }
+
+  @VisibleForTesting
+  public DfsClientShmManager getDfsClientShmManager() {
+    return shmManager;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java Mon Mar  3 23:51:58 2014
@@ -28,6 +28,7 @@ import java.nio.channels.FileChannel.Map
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -81,6 +82,11 @@ public class ShortCircuitReplica {
   private final long creationTimeMs;
 
   /**
+   * If non-null, the shared memory slot associated with this replica.
+   */
+  private final Slot slot;
+  
+  /**
    * Current mmap state.
    *
    * Protected by the cache lock.
@@ -114,7 +120,7 @@ public class ShortCircuitReplica {
 
   public ShortCircuitReplica(ExtendedBlockId key,
       FileInputStream dataStream, FileInputStream metaStream,
-      ShortCircuitCache cache, long creationTimeMs) throws IOException {
+      ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
     this.key = key;
     this.dataStream = dataStream;
     this.metaStream = metaStream;
@@ -126,6 +132,7 @@ public class ShortCircuitReplica {
     }
     this.cache = cache;
     this.creationTimeMs = creationTimeMs;
+    this.slot = slot;
   }
 
   /**
@@ -141,21 +148,61 @@ public class ShortCircuitReplica {
    * Must be called with the cache lock held.
    */
   boolean isStale() {
-    long deltaMs = Time.monotonicNow() - creationTimeMs;
-    long staleThresholdMs = cache.getStaleThresholdMs();
-    if (deltaMs > staleThresholdMs) {
+    if (slot != null) {
+      // Check staleness by looking at the shared memory area we use to
+      // communicate with the DataNode.
+      boolean stale = !slot.isValid();
       if (LOG.isTraceEnabled()) {
-        LOG.trace(this + " is stale because it's " + deltaMs +
-            " ms old, and staleThresholdMs = " + staleThresholdMs);
+        LOG.trace(this + ": checked shared memory segment.  isStale=" + stale);
       }
-      return true;
+      return stale;
     } else {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + " is not stale because it's only " + deltaMs +
-            " ms old, and staleThresholdMs = " + staleThresholdMs);
+      // Fall back to old, time-based staleness method.
+      long deltaMs = Time.monotonicNow() - creationTimeMs;
+      long staleThresholdMs = cache.getStaleThresholdMs();
+      if (deltaMs > staleThresholdMs) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + " is stale because it's " + deltaMs +
+              " ms old, and staleThresholdMs = " + staleThresholdMs);
+        }
+        return true;
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + " is not stale because it's only " + deltaMs +
+              " ms old, and staleThresholdMs = " + staleThresholdMs);
+        }
+        return false;
       }
+    }
+  }
+  
+  /**
+   * Try to add a no-checksum anchor to our shared memory slot.
+   *
+   * It is only possible to add this anchor when the block is mlocked on the Datanode.
+   * The DataNode will not munlock the block until the number of no-checksum anchors
+   * for the block reaches zero.
+   * 
+   * This method does not require any synchronization.
+   *
+   * @return     True if we successfully added a no-checksum anchor.
+   */
+  public boolean addNoChecksumAnchor() {
+    if (slot == null) {
       return false;
     }
+    return slot.addAnchor();
+  }
+
+  /**
+   * Remove a no-checksum anchor for our shared memory slot.
+   *
+   * This method does not require any synchronization.
+   */
+  public void removeNoChecksumAnchor() {
+    if (slot != null) {
+      slot.removeAnchor();
+    }
   }
 
   /**
@@ -165,7 +212,7 @@ public class ShortCircuitReplica {
    */
   @VisibleForTesting
   public boolean hasMmap() {
-    return ((mmapData != null) && (mmapData instanceof ClientMmap));
+    return ((mmapData != null) && (mmapData instanceof MappedByteBuffer));
   }
 
   /**
@@ -174,8 +221,8 @@ public class ShortCircuitReplica {
    * Must be called with the cache lock held.
    */
   void munmap() {
-    ClientMmap clientMmap = (ClientMmap)mmapData;
-    NativeIO.POSIX.munmap(clientMmap.getMappedByteBuffer());
+    MappedByteBuffer mmap = (MappedByteBuffer)mmapData;
+    NativeIO.POSIX.munmap(mmap);
     mmapData = null;
   }
 
@@ -186,12 +233,25 @@ public class ShortCircuitReplica {
    * cache or elsewhere.
    */
   void close() {
+    String suffix = "";
+    
     Preconditions.checkState(refCount == 0,
         "tried to close replica with refCount " + refCount + ": " + this);
+    refCount = -1;
     Preconditions.checkState(purged,
         "tried to close unpurged replica " + this);
-    if (hasMmap()) munmap();
+    if (hasMmap()) {
+      munmap();
+      suffix += "  munmapped.";
+    }
     IOUtils.cleanup(LOG, dataStream, metaStream);
+    if (slot != null) {
+      cache.scheduleSlotReleaser(slot);
+      suffix += "  scheduling " + slot + " for later release.";
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("closed " + this + suffix);
+    }
   }
 
   public FileInputStream getDataStream() {
@@ -210,8 +270,8 @@ public class ShortCircuitReplica {
     return key;
   }
 
-  public ClientMmap getOrCreateClientMmap() {
-    return cache.getOrCreateClientMmap(this);
+  public ClientMmap getOrCreateClientMmap(boolean anchor) {
+    return cache.getOrCreateClientMmap(this, anchor);
   }
 
   MappedByteBuffer loadMmapInternal() {
@@ -250,6 +310,11 @@ public class ShortCircuitReplica {
     this.evictableTimeNs = evictableTimeNs;
   }
 
+  @VisibleForTesting
+  public Slot getSlot() {
+    return slot;
+  }
+
   /**
    * Convert the replica to a string for debugging purposes.
    * Note that we can't take the lock here.

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Mon Mar  3 23:51:58 2014
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -116,14 +117,30 @@ public interface DataTransferProtocol {
    *
    * @param blk             The block to get file descriptors for.
    * @param blockToken      Security token for accessing the block.
+   * @param slotId          The shared memory slot id to use, or null 
+   *                          to use no slot id.
    * @param maxVersion      Maximum version of the block data the client 
-   *                        can understand.
+   *                          can understand.
    */
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
-      int maxVersion) throws IOException;
+      SlotId slotId, int maxVersion) throws IOException;
 
   /**
+   * Release a pair of short-circuit FDs requested earlier.
+   *
+   * @param slotId          SlotID used by the earlier file descriptors.
+   */
+  public void releaseShortCircuitFds(final SlotId slotId) throws IOException;
+
+  /**
+   * Request a short circuit shared memory area from a DataNode.
+   * 
+   * @pram clientName       The name of the client.
+   */
+  public void requestShortCircuitShm(String clientName) throws IOException;
+  
+  /**
    * Receive a block from a source datanode
    * and then notifies the namenode
    * to remove the copy from the original datanode.

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java Mon Mar  3 23:51:58 2014
@@ -35,7 +35,9 @@ public enum Op {
   COPY_BLOCK((byte)84),
   BLOCK_CHECKSUM((byte)85),
   TRANSFER_BLOCK((byte)86),
-  REQUEST_SHORT_CIRCUIT_FDS((byte)87);
+  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
+  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
+  REQUEST_SHORT_CIRCUIT_SHM((byte)89);
 
   /** The code for this operation. */
   public final byte code;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Mon Mar  3 23:51:58 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
@@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 
@@ -82,6 +85,12 @@ public abstract class Receiver implement
     case REQUEST_SHORT_CIRCUIT_FDS:
       opRequestShortCircuitFds(in);
       break;
+    case RELEASE_SHORT_CIRCUIT_FDS:
+      opReleaseShortCircuitFds(in);
+      break;
+    case REQUEST_SHORT_CIRCUIT_SHM:
+      opRequestShortCircuitShm(in);
+      break;
     default:
       throw new IOException("Unknown op " + op + " in data stream");
     }
@@ -141,9 +150,26 @@ public abstract class Receiver implement
   private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
     final OpRequestShortCircuitAccessProto proto =
       OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
+    SlotId slotId = (proto.hasSlotId()) ? 
+        PBHelper.convert(proto.getSlotId()) : null;
     requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
         PBHelper.convert(proto.getHeader().getToken()),
-        proto.getMaxVersion());
+        slotId, proto.getMaxVersion());
+  }
+
+  /** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */
+  private void opReleaseShortCircuitFds(DataInputStream in)
+      throws IOException {
+    final ReleaseShortCircuitAccessRequestProto proto =
+      ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in));
+    releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
+  }
+
+  /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */
+  private void opRequestShortCircuitShm(DataInputStream in) throws IOException {
+    final ShortCircuitShmRequestProto proto =
+        ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in));
+    requestShortCircuitShm(proto.getClientName());
   }
 
   /** Receive OP_REPLACE_BLOCK */

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Mon Mar  3 23:51:58 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
@@ -37,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -161,15 +164,37 @@ public class Sender implements DataTrans
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
-      int maxVersion) throws IOException {
-    OpRequestShortCircuitAccessProto proto =
+      SlotId slotId, int maxVersion) throws IOException {
+    OpRequestShortCircuitAccessProto.Builder builder =
         OpRequestShortCircuitAccessProto.newBuilder()
           .setHeader(DataTransferProtoUtil.buildBaseHeader(
-            blk, blockToken)).setMaxVersion(maxVersion).build();
+            blk, blockToken)).setMaxVersion(maxVersion);
+    if (slotId != null) {
+      builder.setSlotId(PBHelper.convert(slotId));
+    }
+    OpRequestShortCircuitAccessProto proto = builder.build();
     send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
   }
   
   @Override
+  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
+    ReleaseShortCircuitAccessRequestProto proto = 
+        ReleaseShortCircuitAccessRequestProto.newBuilder().
+        setSlotId(PBHelper.convert(slotId)).
+        build();
+    send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
+  }
+
+  @Override
+  public void requestShortCircuitShm(String clientName) throws IOException {
+    ShortCircuitShmRequestProto proto =
+        ShortCircuitShmRequestProto.newBuilder().
+        setClientName(clientName).
+        build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
+  }
+  
+  @Override
   public void replaceBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Mar  3 23:51:58 2014
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -91,6 +93,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@@ -2055,5 +2059,29 @@ public class PBHelper {
         .addAllEntries(convertAclEntryProto(e.getEntries())).build();
     return GetAclStatusResponseProto.newBuilder().setResult(r).build();
   }
+
+  public static ShortCircuitShmSlotProto convert(SlotId slotId) {
+    return ShortCircuitShmSlotProto.newBuilder().
+        setShmId(convert(slotId.getShmId())).
+        setSlotIdx(slotId.getSlotIdx()).
+        build();
+  }
+
+  public static ShortCircuitShmIdProto convert(ShmId shmId) {
+    return ShortCircuitShmIdProto.newBuilder().
+        setHi(shmId.getHi()).
+        setLo(shmId.getLo()).
+        build();
+
+  }
+
+  public static SlotId convert(ShortCircuitShmSlotProto slotId) {
+    return new SlotId(PBHelper.convert(slotId.getShmId()),
+        slotId.getSlotIdx());
+  }
+
+  public static ShmId convert(ShortCircuitShmIdProto shmId) {
+    return new ShmId(shmId.getHi(), shmId.getLo());
+  }
 }
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Mar  3 23:51:58 2014
@@ -185,6 +185,7 @@ public class DataNode extends Configured
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
   Daemon localDataXceiverServer = null;
+  ShortCircuitRegistry shortCircuitRegistry = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
@@ -540,6 +541,7 @@ public class DataNode extends Configured
             domainPeerServer.getBindPath());
       }
     }
+    this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
   }
 
   static DomainPeerServer getDomainPeerServer(Configuration conf,
@@ -1304,6 +1306,7 @@ public class DataNode extends Configured
       MBeans.unregister(dataNodeInfoBeanName);
       dataNodeInfoBeanName = null;
     }
+    if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
     LOG.info("Shutdown complete.");
     synchronized(this) {
       // it is already false, but setting it again to avoid a findbug warning.
@@ -1957,7 +1960,8 @@ public class DataNode extends Configured
    * 
    * @return the fsdataset that stores the blocks
    */
-  FsDatasetSpi<?> getFSDataset() {
+  @VisibleForTesting
+  public FsDatasetSpi<?> getFSDataset() {
     return data;
   }
 
@@ -2568,4 +2572,8 @@ public class DataNode extends Configured
   DataStorage getStorage() {
     return storage;
   }
+
+  public ShortCircuitRegistry getShortCircuitRegistry() {
+    return shortCircuitRegistry;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Mar  3 23:51:58 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
@@ -42,6 +43,9 @@ import java.nio.channels.ClosedChannelEx
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -58,6 +62,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -65,11 +71,13 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -84,7 +92,7 @@ class DataXceiver extends Receiver imple
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private final Peer peer;
+  private Peer peer;
   private final String remoteAddress; // address of remote side
   private final String localAddress;  // local address of this daemon
   private final DataNode datanode;
@@ -220,7 +228,8 @@ class DataXceiver extends Receiver imple
         opStartTime = now();
         processOp(op);
         ++opsProcessed;
-      } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
+      } while ((peer != null) &&
+          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
     } catch (Throwable t) {
       LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
                 ((op == null) ? "unknown" : op.name()) + " operation " +
@@ -232,15 +241,17 @@ class DataXceiver extends Receiver imple
             + datanode.getXceiverCount());
       }
       updateCurrentThreadName("Cleaning up");
-      dataXceiverServer.closePeer(peer);
-      IOUtils.closeStream(in);
+      if (peer != null) {
+        dataXceiverServer.closePeer(peer);
+        IOUtils.closeStream(in);
+      }
     }
   }
 
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> token,
-      int maxVersion) throws IOException {
+      SlotId slotId, int maxVersion) throws IOException {
     updateCurrentThreadName("Passing file descriptors for block " + blk);
     BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
     FileInputStream fis[] = null;
@@ -249,7 +260,17 @@ class DataXceiver extends Receiver imple
         throw new IOException("You cannot pass file descriptors over " +
             "anything but a UNIX domain socket.");
       }
-      fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+      if (slotId != null) {
+        datanode.shortCircuitRegistry.registerSlot(
+            ExtendedBlockId.fromExtendedBlock(blk), slotId);
+      }
+      try {
+        fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+      } finally {
+        if ((fis == null) && (slotId != null)) {
+          datanode.shortCircuitRegistry.unregisterSlot(slotId);
+        }
+      }
       bld.setStatus(SUCCESS);
       bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
     } catch (ShortCircuitFdsVersionException e) {
@@ -294,6 +315,122 @@ class DataXceiver extends Receiver imple
   }
 
   @Override
+  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
+    boolean success = false;
+    try {
+      String error;
+      Status status;
+      try {
+        datanode.shortCircuitRegistry.unregisterSlot(slotId);
+        error = null;
+        status = Status.SUCCESS;
+      } catch (UnsupportedOperationException e) {
+        error = "unsupported operation";
+        status = Status.ERROR_UNSUPPORTED;
+      } catch (Throwable e) {
+        error = e.getMessage();
+        status = Status.ERROR_INVALID;
+      }
+      ReleaseShortCircuitAccessResponseProto.Builder bld =
+          ReleaseShortCircuitAccessResponseProto.newBuilder();
+      bld.setStatus(status);
+      if (error != null) {
+        bld.setError(error);
+      }
+      bld.build().writeDelimitedTo(socketOut);
+      success = true;
+    } finally {
+      if (ClientTraceLog.isInfoEnabled()) {
+        BlockSender.ClientTraceLog.info(String.format(
+            "src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS," +
+            " shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b",
+            slotId.getShmId().getHi(), slotId.getShmId().getLo(),
+            slotId.getSlotIdx(), datanode.getDatanodeUuid(), success));
+      }
+    }
+  }
+
+  private void sendShmErrorResponse(Status status, String error)
+      throws IOException {
+    ShortCircuitShmResponseProto.newBuilder().setStatus(status).
+        setError(error).build().writeDelimitedTo(socketOut);
+  }
+
+  private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
+      throws IOException {
+    ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
+        setId(PBHelper.convert(shmInfo.shmId)).build().
+        writeDelimitedTo(socketOut);
+    // Send the file descriptor for the shared memory segment.
+    byte buf[] = new byte[] { (byte)0 };
+    FileDescriptor shmFdArray[] =
+        new FileDescriptor[] { shmInfo.stream.getFD() };
+    sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length);
+  }
+
+  @Override
+  public void requestShortCircuitShm(String clientName) throws IOException {
+    NewShmInfo shmInfo = null;
+    boolean success = false;
+    DomainSocket sock = peer.getDomainSocket();
+    try {
+      if (sock == null) {
+        sendShmErrorResponse(ERROR_INVALID, "Bad request from " +
+            peer + ": must request a shared " +
+            "memory segment over a UNIX domain socket.");
+        return;
+      }
+      try {
+        shmInfo = datanode.shortCircuitRegistry.
+            createNewMemorySegment(clientName, sock);
+        // After calling #{ShortCircuitRegistry#createNewMemorySegment}, the
+        // socket is managed by the DomainSocketWatcher, not the DataXceiver.
+        releaseSocket();
+      } catch (UnsupportedOperationException e) {
+        sendShmErrorResponse(ERROR_UNSUPPORTED, 
+            "This datanode has not been configured to support " +
+            "short-circuit shared memory segments.");
+        return;
+      } catch (IOException e) {
+        sendShmErrorResponse(ERROR,
+            "Failed to create shared file descriptor: " + e.getMessage());
+        return;
+      }
+      sendShmSuccessResponse(sock, shmInfo);
+      success = true;
+    } finally {
+      if (ClientTraceLog.isInfoEnabled()) {
+        if (success) {
+          BlockSender.ClientTraceLog.info(String.format(
+              "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
+              "op: REQUEST_SHORT_CIRCUIT_SHM," +
+              " shmId: %016x%016x, srvID: %s, success: true",
+              clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(),
+              datanode.getDatanodeUuid()));
+        } else {
+          BlockSender.ClientTraceLog.info(String.format(
+              "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
+              "op: REQUEST_SHORT_CIRCUIT_SHM, " +
+              "shmId: n/a, srvID: %s, success: false",
+              clientName, datanode.getDatanodeUuid()));
+        }
+      }
+      if ((!success) && (peer == null)) {
+        // If we failed to pass the shared memory segment to the client,
+        // close the UNIX domain socket now.  This will trigger the 
+        // DomainSocketWatcher callback, cleaning up the segment.
+        IOUtils.cleanup(null, sock);
+      }
+      IOUtils.cleanup(null, shmInfo);
+    }
+  }
+
+  void releaseSocket() {
+    dataXceiverServer.releasePeer(peer);
+    peer = null;
+  }
+
+  @Override
   public void readBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1573821&r1=1573820&r2=1573821&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Mon Mar  3 23:51:58 2014
@@ -248,4 +248,8 @@ class DataXceiverServer implements Runna
   synchronized int getNumPeers() {
     return peers.size();
   }
+
+  synchronized void releasePeer(Peer peer) {
+    peers.remove(peer);
+  }
 }



Mime
View raw message