hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1573433 [2/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/ap...
Date Mon, 03 Mar 2014 03:58:38 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java Mon Mar  3 03:58:37 2014
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hdfs.client;
 
+import java.io.BufferedOutputStream;
 import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
@@ -33,14 +36,23 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -154,6 +166,69 @@ public class ShortCircuitCache implement
     }
   }
 
+  /**
+   * A task which asks the DataNode to release a short-circuit shared memory
+   * slot.  If successful, this will tell the DataNode to stop monitoring
+   * changes to the mlock status of the replica associated with the slot.
+   * It will also allow us (the client) to re-use this slot for another
+   * replica.  If we can't communicate with the DataNode for some reason,
+   * we tear down the shared memory segment to avoid being in an inconsistent
+   * state.
+   */
+  private class SlotReleaser implements Runnable {
+    /**
+     * The slot that we need to release.
+     */
+    private final Slot slot;
+
+    SlotReleaser(Slot slot) {
+      this.slot = slot;
+    }
+
+    @Override
+    public void run() {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
+      }
+      final DfsClientShm shm = (DfsClientShm)slot.getShm();
+      final DomainSocket shmSock = shm.getPeer().getDomainSocket();
+      DomainSocket sock = null;
+      DataOutputStream out = null;
+      final String path = shmSock.getPath();
+      boolean success = false;
+      try {
+        sock = DomainSocket.connect(path);
+        out = new DataOutputStream(
+            new BufferedOutputStream(sock.getOutputStream()));
+        new Sender(out).releaseShortCircuitFds(slot.getSlotId());
+        DataInputStream in = new DataInputStream(sock.getInputStream());
+        ReleaseShortCircuitAccessResponseProto resp =
+            ReleaseShortCircuitAccessResponseProto.parseFrom(
+                PBHelper.vintPrefixed(in));
+        if (resp.getStatus() != Status.SUCCESS) {
+          String error = resp.hasError() ? resp.getError() : "(unknown)";
+          throw new IOException(resp.getStatus().toString() + ": " + error);
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(ShortCircuitCache.this + ": released " + slot);
+        }
+        success = true;
+      } catch (IOException e) {
+        LOG.error(ShortCircuitCache.this + ": failed to release " +
+            "short-circuit shared memory slot " + slot + " by sending " +
+            "ReleaseShortCircuitAccessRequestProto to " + path +
+            ".  Closing shared memory segment.", e);
+      } finally {
+        if (success) {
+          shmManager.freeSlot(slot);
+        } else {
+          shm.getEndpointShmManager().shutdown(shm);
+        }
+        IOUtils.cleanup(LOG, sock, out);
+      }
+    }
+  }
+
   public interface ShortCircuitReplicaCreator {
     /**
      * Attempt to create a ShortCircuitReplica object.
@@ -173,9 +248,17 @@ public class ShortCircuitCache implement
   /**
    * The executor service that runs the cacheCleaner.
    */
-  private final ScheduledThreadPoolExecutor executor
+  private final ScheduledThreadPoolExecutor cleanerExecutor
+  = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner").
+          build());
+
+  /**
+   * The executor service that runs the cacheCleaner.
+   */
+  private final ScheduledThreadPoolExecutor releaserExecutor
       = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
-          setDaemon(true).setNameFormat("ShortCircuitCache Cleaner").
+          setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser").
           build());
 
   /**
@@ -253,6 +336,11 @@ public class ShortCircuitCache implement
   private int outstandingMmapCount = 0;
 
   /**
+   * Manages short-circuit shared memory segments for the client.
+   */
+  private final DfsClientShmManager shmManager;
+
+  /**
    * Create a {@link ShortCircuitCache} object from a {@link Configuration}
    */
   public static ShortCircuitCache fromConf(Configuration conf) {
@@ -268,12 +356,14 @@ public class ShortCircuitCache implement
         conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
             DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT),
         conf.getLong(DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
-            DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT));
+            DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT),
+        conf.getInt(DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
+            DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT));
   }
 
   public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
       int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
-      long mmapRetryTimeoutMs, long staleThresholdMs) {
+      long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) {
     Preconditions.checkArgument(maxTotalSize >= 0);
     this.maxTotalSize = maxTotalSize;
     Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0);
@@ -284,6 +374,15 @@ public class ShortCircuitCache implement
     this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs;
     this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
     this.staleThresholdMs = staleThresholdMs;
+    DfsClientShmManager shmManager = null;
+    if (shmInterruptCheckMs > 0) {
+      try {
+        shmManager = new DfsClientShmManager(shmInterruptCheckMs);
+      } catch (IOException e) {
+        LOG.error("failed to create ShortCircuitShmManager", e);
+      }
+    }
+    this.shmManager = shmManager;
   }
 
   public long getMmapRetryTimeoutMs() {
@@ -339,7 +438,14 @@ public class ShortCircuitCache implement
   void unref(ShortCircuitReplica replica) {
     lock.lock();
     try {
+      // If the replica is stale, but we haven't purged it yet, let's do that.
+      // It would be a shame to evict a non-stale replica so that we could put
+      // a stale one into the cache.
+      if ((!replica.purged) && replica.isStale()) {
+        purge(replica);
+      }
       String addedString = "";
+      boolean shouldTrimEvictionMaps = false;
       int newRefCount = --replica.refCount;
       if (newRefCount == 0) {
         // Close replica, since there are no remaining references to it.
@@ -362,7 +468,7 @@ public class ShortCircuitCache implement
             insertEvictable(System.nanoTime(), replica, evictable);
             addedString = "added to evictable, ";
           }
-          trimEvictionMaps();
+          shouldTrimEvictionMaps = true;
         }
       } else {
         Preconditions.checkArgument(replica.refCount >= 0,
@@ -375,6 +481,9 @@ public class ShortCircuitCache implement
             (newRefCount + 1) + " -> " + newRefCount +
             StringUtils.getStackTrace(Thread.currentThread()));
       }
+      if (shouldTrimEvictionMaps) {
+        trimEvictionMaps();
+      }
     } finally {
       lock.unlock();
     }
@@ -442,7 +551,7 @@ public class ShortCircuitCache implement
        replica = evictable.firstEntry().getValue();
       }
       if (LOG.isTraceEnabled()) {
-        LOG.trace(this + ": trimEvictionMaps is purging " +
+        LOG.trace(this + ": trimEvictionMaps is purging " + replica +
           StringUtils.getStackTrace(Thread.currentThread()));
       }
       purge(replica);
@@ -542,7 +651,7 @@ public class ShortCircuitCache implement
     }
     if (LOG.isTraceEnabled()) {
       StringBuilder builder = new StringBuilder();
-      builder.append(this).append(": ").append(": removed ").
+      builder.append(this).append(": ").append(": purged ").
           append(replica).append(" from the cache.");
       if (removedFromInfoMap) {
         builder.append("  Removed from the replicaInfoMap.");
@@ -706,7 +815,7 @@ public class ShortCircuitCache implement
       cacheCleaner = new CacheCleaner();
       long rateMs = cacheCleaner.getRateInMs();
       ScheduledFuture<?> future =
-          executor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
+          cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
               TimeUnit.MILLISECONDS);
       cacheCleaner.setFuture(future);
       if (LOG.isDebugEnabled()) {
@@ -716,16 +825,16 @@ public class ShortCircuitCache implement
     }
   }
 
-  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica) {
+  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
+      boolean anchored) {
     Condition newCond;
     lock.lock();
     try {
       while (replica.mmapData != null) {
-        if (replica.mmapData instanceof ClientMmap) {
+        if (replica.mmapData instanceof MappedByteBuffer) {
           ref(replica);
-          ClientMmap clientMmap = (ClientMmap)replica.mmapData;
-          clientMmap.ref();
-          return clientMmap;
+          MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData;
+          return new ClientMmap(replica, mmap, anchored);
         } else if (replica.mmapData instanceof Long) {
           long lastAttemptTimeMs = (Long)replica.mmapData;
           long delta = Time.monotonicNow() - lastAttemptTimeMs;
@@ -762,12 +871,11 @@ public class ShortCircuitCache implement
         newCond.signalAll();
         return null;
       } else {
-        ClientMmap clientMmap = new ClientMmap(replica, map);
         outstandingMmapCount++;
-        replica.mmapData = clientMmap;
+        replica.mmapData = map;
         ref(replica);
         newCond.signalAll();
-        return clientMmap;
+        return new ClientMmap(replica, map, anchored);
       }
     } finally {
       lock.unlock();
@@ -878,4 +986,58 @@ public class ShortCircuitCache implement
     return "ShortCircuitCache(0x" +
         Integer.toHexString(System.identityHashCode(this)) + ")";
   }
+
+  /**
+   * Allocate a new shared memory slot.
+   *
+   * @param datanode       The datanode to allocate a shm slot with.
+   * @param peer           A peer connected to the datanode.
+   * @param usedPeer       Will be set to true if we use up the provided peer.
+   * @param blockId        The block id and block pool id of the block we're 
+   *                         allocating this slot for.
+   * @param clientName     The name of the DFSClient allocating the shared
+   *                         memory.
+   * @return               Null if short-circuit shared memory is disabled;
+   *                         a short-circuit memory slot otherwise.
+   * @throws IOException   An exception if there was an error talking to 
+   *                         the datanode.
+   */
+  public Slot allocShmSlot(DatanodeInfo datanode,
+        DomainPeer peer, MutableBoolean usedPeer,
+        ExtendedBlockId blockId, String clientName) throws IOException {
+    if (shmManager != null) {
+      return shmManager.allocSlot(datanode, peer, usedPeer,
+          blockId, clientName);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Free a slot immediately.
+   *
+   * ONLY use this if the DataNode is not yet aware of the slot.
+   * 
+   * @param slot           The slot to free.
+   */
+  public void freeSlot(Slot slot) {
+    Preconditions.checkState(shmManager != null);
+    slot.makeInvalid();
+    shmManager.freeSlot(slot);
+  }
+  
+  /**
+   * Schedule a shared memory slot to be released.
+   *
+   * @param slot           The slot to release.
+   */
+  public void scheduleSlotReleaser(Slot slot) {
+    Preconditions.checkState(shmManager != null);
+    releaserExecutor.execute(new SlotReleaser(slot));
+  }
+
+  @VisibleForTesting
+  public DfsClientShmManager getDfsClientShmManager() {
+    return shmManager;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java Mon Mar  3 03:58:37 2014
@@ -28,6 +28,7 @@ import java.nio.channels.FileChannel.Map
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -81,6 +82,11 @@ public class ShortCircuitReplica {
   private final long creationTimeMs;
 
   /**
+   * If non-null, the shared memory slot associated with this replica.
+   */
+  private final Slot slot;
+  
+  /**
    * Current mmap state.
    *
    * Protected by the cache lock.
@@ -114,7 +120,7 @@ public class ShortCircuitReplica {
 
   public ShortCircuitReplica(ExtendedBlockId key,
       FileInputStream dataStream, FileInputStream metaStream,
-      ShortCircuitCache cache, long creationTimeMs) throws IOException {
+      ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
     this.key = key;
     this.dataStream = dataStream;
     this.metaStream = metaStream;
@@ -126,6 +132,7 @@ public class ShortCircuitReplica {
     }
     this.cache = cache;
     this.creationTimeMs = creationTimeMs;
+    this.slot = slot;
   }
 
   /**
@@ -141,21 +148,61 @@ public class ShortCircuitReplica {
    * Must be called with the cache lock held.
    */
   boolean isStale() {
-    long deltaMs = Time.monotonicNow() - creationTimeMs;
-    long staleThresholdMs = cache.getStaleThresholdMs();
-    if (deltaMs > staleThresholdMs) {
+    if (slot != null) {
+      // Check staleness by looking at the shared memory area we use to
+      // communicate with the DataNode.
+      boolean stale = !slot.isValid();
       if (LOG.isTraceEnabled()) {
-        LOG.trace(this + " is stale because it's " + deltaMs +
-            " ms old, and staleThresholdMs = " + staleThresholdMs);
+        LOG.trace(this + ": checked shared memory segment.  isStale=" + stale);
       }
-      return true;
+      return stale;
     } else {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(this + " is not stale because it's only " + deltaMs +
-            " ms old, and staleThresholdMs = " + staleThresholdMs);
+      // Fall back to old, time-based staleness method.
+      long deltaMs = Time.monotonicNow() - creationTimeMs;
+      long staleThresholdMs = cache.getStaleThresholdMs();
+      if (deltaMs > staleThresholdMs) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + " is stale because it's " + deltaMs +
+              " ms old, and staleThresholdMs = " + staleThresholdMs);
+        }
+        return true;
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + " is not stale because it's only " + deltaMs +
+              " ms old, and staleThresholdMs = " + staleThresholdMs);
+        }
+        return false;
       }
+    }
+  }
+  
+  /**
+   * Try to add a no-checksum anchor to our shared memory slot.
+   *
+   * It is only possible to add this anchor when the block is mlocked on the Datanode.
+   * The DataNode will not munlock the block until the number of no-checksum anchors
+   * for the block reaches zero.
+   * 
+   * This method does not require any synchronization.
+   *
+   * @return     True if we successfully added a no-checksum anchor.
+   */
+  public boolean addNoChecksumAnchor() {
+    if (slot == null) {
       return false;
     }
+    return slot.addAnchor();
+  }
+
+  /**
+   * Remove a no-checksum anchor for our shared memory slot.
+   *
+   * This method does not require any synchronization.
+   */
+  public void removeNoChecksumAnchor() {
+    if (slot != null) {
+      slot.removeAnchor();
+    }
   }
 
   /**
@@ -165,7 +212,7 @@ public class ShortCircuitReplica {
    */
   @VisibleForTesting
   public boolean hasMmap() {
-    return ((mmapData != null) && (mmapData instanceof ClientMmap));
+    return ((mmapData != null) && (mmapData instanceof MappedByteBuffer));
   }
 
   /**
@@ -174,8 +221,8 @@ public class ShortCircuitReplica {
    * Must be called with the cache lock held.
    */
   void munmap() {
-    ClientMmap clientMmap = (ClientMmap)mmapData;
-    NativeIO.POSIX.munmap(clientMmap.getMappedByteBuffer());
+    MappedByteBuffer mmap = (MappedByteBuffer)mmapData;
+    NativeIO.POSIX.munmap(mmap);
     mmapData = null;
   }
 
@@ -186,12 +233,25 @@ public class ShortCircuitReplica {
    * cache or elsewhere.
    */
   void close() {
+    String suffix = "";
+    
     Preconditions.checkState(refCount == 0,
         "tried to close replica with refCount " + refCount + ": " + this);
+    refCount = -1;
     Preconditions.checkState(purged,
         "tried to close unpurged replica " + this);
-    if (hasMmap()) munmap();
+    if (hasMmap()) {
+      munmap();
+      suffix += "  munmapped.";
+    }
     IOUtils.cleanup(LOG, dataStream, metaStream);
+    if (slot != null) {
+      cache.scheduleSlotReleaser(slot);
+      suffix += "  scheduling " + slot + " for later release.";
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("closed " + this + suffix);
+    }
   }
 
   public FileInputStream getDataStream() {
@@ -210,8 +270,8 @@ public class ShortCircuitReplica {
     return key;
   }
 
-  public ClientMmap getOrCreateClientMmap() {
-    return cache.getOrCreateClientMmap(this);
+  public ClientMmap getOrCreateClientMmap(boolean anchor) {
+    return cache.getOrCreateClientMmap(this, anchor);
   }
 
   MappedByteBuffer loadMmapInternal() {
@@ -250,6 +310,11 @@ public class ShortCircuitReplica {
     this.evictableTimeNs = evictableTimeNs;
   }
 
+  @VisibleForTesting
+  public Slot getSlot() {
+    return slot;
+  }
+
   /**
    * Convert the replica to a string for debugging purposes.
    * Note that we can't take the lock here.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Mon Mar  3 03:58:37 2014
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -116,14 +117,30 @@ public interface DataTransferProtocol {
    *
    * @param blk             The block to get file descriptors for.
    * @param blockToken      Security token for accessing the block.
+   * @param slotId          The shared memory slot id to use, or null 
+   *                          to use no slot id.
    * @param maxVersion      Maximum version of the block data the client 
-   *                        can understand.
+   *                          can understand.
    */
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
-      int maxVersion) throws IOException;
+      SlotId slotId, int maxVersion) throws IOException;
 
   /**
+   * Release a pair of short-circuit FDs requested earlier.
+   *
+   * @param slotId          SlotID used by the earlier file descriptors.
+   */
+  public void releaseShortCircuitFds(final SlotId slotId) throws IOException;
+
+  /**
+   * Request a short circuit shared memory area from a DataNode.
+   * 
+   * @pram clientName       The name of the client.
+   */
+  public void requestShortCircuitShm(String clientName) throws IOException;
+  
+  /**
    * Receive a block from a source datanode
    * and then notifies the namenode
    * to remove the copy from the original datanode.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java Mon Mar  3 03:58:37 2014
@@ -35,7 +35,9 @@ public enum Op {
   COPY_BLOCK((byte)84),
   BLOCK_CHECKSUM((byte)85),
   TRANSFER_BLOCK((byte)86),
-  REQUEST_SHORT_CIRCUIT_FDS((byte)87);
+  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
+  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
+  REQUEST_SHORT_CIRCUIT_SHM((byte)89);
 
   /** The code for this operation. */
   public final byte code;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Mon Mar  3 03:58:37 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
@@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 
@@ -82,6 +85,12 @@ public abstract class Receiver implement
     case REQUEST_SHORT_CIRCUIT_FDS:
       opRequestShortCircuitFds(in);
       break;
+    case RELEASE_SHORT_CIRCUIT_FDS:
+      opReleaseShortCircuitFds(in);
+      break;
+    case REQUEST_SHORT_CIRCUIT_SHM:
+      opRequestShortCircuitShm(in);
+      break;
     default:
       throw new IOException("Unknown op " + op + " in data stream");
     }
@@ -141,9 +150,26 @@ public abstract class Receiver implement
   private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
     final OpRequestShortCircuitAccessProto proto =
       OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
+    SlotId slotId = (proto.hasSlotId()) ? 
+        PBHelper.convert(proto.getSlotId()) : null;
     requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
         PBHelper.convert(proto.getHeader().getToken()),
-        proto.getMaxVersion());
+        slotId, proto.getMaxVersion());
+  }
+
+  /** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */
+  private void opReleaseShortCircuitFds(DataInputStream in)
+      throws IOException {
+    final ReleaseShortCircuitAccessRequestProto proto =
+      ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in));
+    releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
+  }
+
+  /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */
+  private void opRequestShortCircuitShm(DataInputStream in) throws IOException {
+    final ShortCircuitShmRequestProto proto =
+        ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in));
+    requestShortCircuitShm(proto.getClientName());
   }
 
   /** Receive OP_REPLACE_BLOCK */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Mon Mar  3 03:58:37 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
@@ -37,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -161,15 +164,37 @@ public class Sender implements DataTrans
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
-      int maxVersion) throws IOException {
-    OpRequestShortCircuitAccessProto proto =
+      SlotId slotId, int maxVersion) throws IOException {
+    OpRequestShortCircuitAccessProto.Builder builder =
         OpRequestShortCircuitAccessProto.newBuilder()
           .setHeader(DataTransferProtoUtil.buildBaseHeader(
-            blk, blockToken)).setMaxVersion(maxVersion).build();
+            blk, blockToken)).setMaxVersion(maxVersion);
+    if (slotId != null) {
+      builder.setSlotId(PBHelper.convert(slotId));
+    }
+    OpRequestShortCircuitAccessProto proto = builder.build();
     send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
   }
   
   @Override
+  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
+    ReleaseShortCircuitAccessRequestProto proto = 
+        ReleaseShortCircuitAccessRequestProto.newBuilder().
+        setSlotId(PBHelper.convert(slotId)).
+        build();
+    send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
+  }
+
+  @Override
+  public void requestShortCircuitShm(String clientName) throws IOException {
+    ShortCircuitShmRequestProto proto =
+        ShortCircuitShmRequestProto.newBuilder().
+        setClientName(clientName).
+        build();
+    send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
+  }
+  
+  @Override
   public void replaceBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Mar  3 03:58:37 2014
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -85,6 +87,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@@ -1979,5 +1983,29 @@ public class PBHelper {
         .addAllEntries(convertAclEntryProto(e.getEntries())).build();
     return GetAclStatusResponseProto.newBuilder().setResult(r).build();
   }
+
+  public static ShortCircuitShmSlotProto convert(SlotId slotId) {
+    return ShortCircuitShmSlotProto.newBuilder().
+        setShmId(convert(slotId.getShmId())).
+        setSlotIdx(slotId.getSlotIdx()).
+        build();
+  }
+
+  public static ShortCircuitShmIdProto convert(ShmId shmId) {
+    return ShortCircuitShmIdProto.newBuilder().
+        setHi(shmId.getHi()).
+        setLo(shmId.getLo()).
+        build();
+
+  }
+
+  public static SlotId convert(ShortCircuitShmSlotProto slotId) {
+    return new SlotId(PBHelper.convert(slotId.getShmId()),
+        slotId.getSlotIdx());
+  }
+
+  public static ShmId convert(ShortCircuitShmIdProto shmId) {
+    return new ShmId(shmId.getHi(), shmId.getLo());
+  }
 }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Mar  3 03:58:37 2014
@@ -230,6 +230,7 @@ public class DataNode extends Configured
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
   Daemon localDataXceiverServer = null;
+  ShortCircuitRegistry shortCircuitRegistry = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
@@ -579,6 +580,7 @@ public class DataNode extends Configured
             domainPeerServer.getBindPath());
       }
     }
+    this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
   }
 
   static DomainPeerServer getDomainPeerServer(Configuration conf,
@@ -1317,6 +1319,7 @@ public class DataNode extends Configured
       MBeans.unregister(dataNodeInfoBeanName);
       dataNodeInfoBeanName = null;
     }
+    if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
   }
   
   
@@ -1953,7 +1956,8 @@ public class DataNode extends Configured
    * 
    * @return the fsdataset that stores the blocks
    */
-  FsDatasetSpi<?> getFSDataset() {
+  @VisibleForTesting
+  public FsDatasetSpi<?> getFSDataset() {
     return data;
   }
 
@@ -2514,4 +2518,8 @@ public class DataNode extends Configured
   boolean shouldRun() {
     return shouldRun;
   }
+
+  public ShortCircuitRegistry getShortCircuitRegistry() {
+    return shortCircuitRegistry;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Mar  3 03:58:37 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
@@ -42,6 +43,9 @@ import java.nio.channels.ClosedChannelEx
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -58,6 +62,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -65,11 +71,13 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -84,7 +92,7 @@ class DataXceiver extends Receiver imple
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  private final Peer peer;
+  private Peer peer;
   private final String remoteAddress; // address of remote side
   private final String localAddress;  // local address of this daemon
   private final DataNode datanode;
@@ -220,7 +228,8 @@ class DataXceiver extends Receiver imple
         opStartTime = now();
         processOp(op);
         ++opsProcessed;
-      } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
+      } while ((peer != null) &&
+          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
     } catch (Throwable t) {
       LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
                 ((op == null) ? "unknown" : op.name()) + " operation " +
@@ -232,15 +241,17 @@ class DataXceiver extends Receiver imple
             + datanode.getXceiverCount());
       }
       updateCurrentThreadName("Cleaning up");
-      dataXceiverServer.closePeer(peer);
-      IOUtils.closeStream(in);
+      if (peer != null) {
+        dataXceiverServer.closePeer(peer);
+        IOUtils.closeStream(in);
+      }
     }
   }
 
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> token,
-      int maxVersion) throws IOException {
+      SlotId slotId, int maxVersion) throws IOException {
     updateCurrentThreadName("Passing file descriptors for block " + blk);
     BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
     FileInputStream fis[] = null;
@@ -249,7 +260,17 @@ class DataXceiver extends Receiver imple
         throw new IOException("You cannot pass file descriptors over " +
             "anything but a UNIX domain socket.");
       }
-      fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+      if (slotId != null) {
+        datanode.shortCircuitRegistry.registerSlot(
+            ExtendedBlockId.fromExtendedBlock(blk), slotId);
+      }
+      try {
+        fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+      } finally {
+        if ((fis == null) && (slotId != null)) {
+          datanode.shortCircuitRegistry.unregisterSlot(slotId);
+        }
+      }
       bld.setStatus(SUCCESS);
       bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
     } catch (ShortCircuitFdsVersionException e) {
@@ -294,6 +315,122 @@ class DataXceiver extends Receiver imple
   }
 
   @Override
+  public void releaseShortCircuitFds(SlotId slotId) throws IOException {
+    boolean success = false;
+    try {
+      String error;
+      Status status;
+      try {
+        datanode.shortCircuitRegistry.unregisterSlot(slotId);
+        error = null;
+        status = Status.SUCCESS;
+      } catch (UnsupportedOperationException e) {
+        error = "unsupported operation";
+        status = Status.ERROR_UNSUPPORTED;
+      } catch (Throwable e) {
+        error = e.getMessage();
+        status = Status.ERROR_INVALID;
+      }
+      ReleaseShortCircuitAccessResponseProto.Builder bld =
+          ReleaseShortCircuitAccessResponseProto.newBuilder();
+      bld.setStatus(status);
+      if (error != null) {
+        bld.setError(error);
+      }
+      bld.build().writeDelimitedTo(socketOut);
+      success = true;
+    } finally {
+      if (ClientTraceLog.isInfoEnabled()) {
+        BlockSender.ClientTraceLog.info(String.format(
+            "src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS," +
+            " shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b",
+            slotId.getShmId().getHi(), slotId.getShmId().getLo(),
+            slotId.getSlotIdx(), datanode.getDatanodeUuid(), success));
+      }
+    }
+  }
+
+  private void sendShmErrorResponse(Status status, String error)
+      throws IOException {
+    ShortCircuitShmResponseProto.newBuilder().setStatus(status).
+        setError(error).build().writeDelimitedTo(socketOut);
+  }
+
+  private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
+      throws IOException {
+    ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
+        setId(PBHelper.convert(shmInfo.shmId)).build().
+        writeDelimitedTo(socketOut);
+    // Send the file descriptor for the shared memory segment.
+    byte buf[] = new byte[] { (byte)0 };
+    FileDescriptor shmFdArray[] =
+        new FileDescriptor[] { shmInfo.stream.getFD() };
+    sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length);
+  }
+
+  @Override
+  public void requestShortCircuitShm(String clientName) throws IOException {
+    NewShmInfo shmInfo = null;
+    boolean success = false;
+    DomainSocket sock = peer.getDomainSocket();
+    try {
+      if (sock == null) {
+        sendShmErrorResponse(ERROR_INVALID, "Bad request from " +
+            peer + ": must request a shared " +
+            "memory segment over a UNIX domain socket.");
+        return;
+      }
+      try {
+        shmInfo = datanode.shortCircuitRegistry.
+            createNewMemorySegment(clientName, sock);
+        // After calling #{ShortCircuitRegistry#createNewMemorySegment}, the
+        // socket is managed by the DomainSocketWatcher, not the DataXceiver.
+        releaseSocket();
+      } catch (UnsupportedOperationException e) {
+        sendShmErrorResponse(ERROR_UNSUPPORTED, 
+            "This datanode has not been configured to support " +
+            "short-circuit shared memory segments.");
+        return;
+      } catch (IOException e) {
+        sendShmErrorResponse(ERROR,
+            "Failed to create shared file descriptor: " + e.getMessage());
+        return;
+      }
+      sendShmSuccessResponse(sock, shmInfo);
+      success = true;
+    } finally {
+      if (ClientTraceLog.isInfoEnabled()) {
+        if (success) {
+          BlockSender.ClientTraceLog.info(String.format(
+              "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
+              "op: REQUEST_SHORT_CIRCUIT_SHM," +
+              " shmId: %016x%016x, srvID: %s, success: true",
+              clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(),
+              datanode.getDatanodeUuid()));
+        } else {
+          BlockSender.ClientTraceLog.info(String.format(
+              "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
+              "op: REQUEST_SHORT_CIRCUIT_SHM, " +
+              "shmId: n/a, srvID: %s, success: false",
+              clientName, datanode.getDatanodeUuid()));
+        }
+      }
+      if ((!success) && (peer == null)) {
+        // If we failed to pass the shared memory segment to the client,
+        // close the UNIX domain socket now.  This will trigger the 
+        // DomainSocketWatcher callback, cleaning up the segment.
+        IOUtils.cleanup(null, sock);
+      }
+      IOUtils.cleanup(null, shmInfo);
+    }
+  }
+
+  void releaseSocket() {
+    dataXceiverServer.releasePeer(peer);
+    peer = null;
+  }
+
+  @Override
   public void readBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Mon Mar  3 03:58:37 2014
@@ -201,4 +201,8 @@ class DataXceiverServer implements Runna
     peers.remove(peer);
     IOUtils.cleanup(null, peer);
   }
+
+  synchronized void releasePeer(Peer peer) {
+    peers.remove(peer);
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Mar  3 03:58:37 2014
@@ -266,6 +266,15 @@ public class FsDatasetCache {
     ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
     Value prevValue = mappableBlockMap.get(key);
 
+    if (!dataset.datanode.getShortCircuitRegistry().
+            processBlockMunlockRequest(key)) {
+      // TODO: we probably want to forcibly uncache the block (and close the 
+      // shm) after a certain timeout has elapsed.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(key + " is anchored, and can't be uncached now.");
+      }
+      return;
+    }
     if (prevValue == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
@@ -380,6 +389,7 @@ public class FsDatasetCache {
           LOG.debug("Successfully cached " + key + ".  We are now caching " +
               newUsedBytes + " bytes in total.");
         }
+        dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
         numBlocksCached.addAndGet(1);
         success = true;
       } finally {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Mon Mar  3 03:58:37 2014
@@ -47,8 +47,6 @@ import com.google.common.base.Preconditi
  */
 @InterfaceAudience.Private
 public final class CachePool {
-  public static final Log LOG = LogFactory.getLog(CachePool.class);
-
   @Nonnull
   private final String poolName;
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Mon Mar  3 03:58:37 2014
@@ -128,6 +128,22 @@ message OpBlockChecksumProto { 
   required BaseHeaderProto header = 1;
 }
 
+/**
+ * An ID uniquely identifying a shared memory segment.
+ */
+message ShortCircuitShmIdProto { 
+  required int64 hi = 1;
+  required int64 lo = 2;
+}
+
+/**
+ * An ID uniquely identifying a slot within a shared memory segment.
+ */
+message ShortCircuitShmSlotProto {
+  required ShortCircuitShmIdProto shmId = 1;
+  required int32 slotIdx = 2; 
+}
+
 message OpRequestShortCircuitAccessProto { 
   required BaseHeaderProto header = 1;
 
@@ -137,6 +153,32 @@ message OpRequestShortCircuitAccessProto
    * if the on-disk format changes.
    */
   required uint32 maxVersion = 2;
+
+  /**
+   * The shared memory slot to use, if we are using one.
+   */
+  optional ShortCircuitShmSlotProto slotId = 3;
+}
+
+message ReleaseShortCircuitAccessRequestProto {
+  required ShortCircuitShmSlotProto slotId = 1;
+}
+
+message ReleaseShortCircuitAccessResponseProto {
+  required Status status = 1;
+  optional string error = 2;
+}
+
+message ShortCircuitShmRequestProto { 
+  // The name of the client requesting the shared memory segment.  This is
+  // purely for logging / debugging purposes.
+  required string clientName = 1;
+}
+
+message ShortCircuitShmResponseProto { 
+  required Status status = 1;
+  optional string error = 2;
+  optional ShortCircuitShmIdProto id = 3;
 }
 
 message PacketHeaderProto {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Mon Mar  3 03:58:37 2014
@@ -1138,6 +1138,27 @@
 </property>
 
 <property>
+  <name>dfs.datanode.shared.file.descriptor.path</name>
+  <value>/dev/shm</value>
+  <description>
+    The path to use when creating file descriptors that will be shared
+    between the DataNode and the DFSClient.  Typically we use /dev/shm, so
+    that the file descriptors will not be written to disk.  Systems that
+    don't have /dev/shm should use /tmp.
+  </description>
+</property>
+
+<property>
+  <name>dfs.short.circuit.shared.memory.watcher.interrupt.check.ms</name>
+  <value>60000</value>
+  <description>
+    The length of time in milliseconds that the short-circuit shared memory
+    watcher will go between checking for java interruptions sent from other
+    threads.  This is provided mainly for unit tests.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.kerberos.internal.spnego.principal</name>
   <value>${dfs.web.authentication.kerberos.principal}</value>
 </property>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Mon Mar  3 03:58:37 2014
@@ -17,9 +17,15 @@
  */
 package org.apache.hadoop.fs;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeoutException;
@@ -34,6 +40,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -42,18 +49,24 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -71,12 +84,28 @@ public class TestEnhancedByteBufferAcces
   private static final Log LOG =
       LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
 
-  static TemporarySocketDirectory sockDir;
+  static private TemporarySocketDirectory sockDir;
+
+  static private CacheManipulator prevCacheManipulator;
 
   @BeforeClass
   public static void init() {
     sockDir = new TemporarySocketDirectory();
     DomainSocket.disableBindPathValidation();
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new CacheManipulator() {
+      @Override
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        LOG.info("mlocking " + identifier);
+      }
+    });
+  }
+
+  @AfterClass
+  public static void teardown() {
+    // Restore the original CacheManipulator
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
   }
 
   private static byte[] byteBufferToArray(ByteBuffer buf) {
@@ -86,12 +115,14 @@ public class TestEnhancedByteBufferAcces
     return resultArray;
   }
   
+  private static int BLOCK_SIZE = 4096;
+  
   public static HdfsConfiguration initZeroCopyTest() {
     Assume.assumeTrue(NativeIO.isAvailable());
     Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
     conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
@@ -99,6 +130,9 @@ public class TestEnhancedByteBufferAcces
           "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
     conf.setBoolean(DFSConfigKeys.
         DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+    conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
     return conf;
   }
 
@@ -549,4 +583,119 @@ public class TestEnhancedByteBufferAcces
       new File(TEST_PATH).delete();
     }
   }
+
+  /**
+   * Test that we can zero-copy read cached data even without disabling
+   * checksums.
+   */
+  @Test(timeout=120000)
+  public void testZeroCopyReadOfCachedData() throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
+    BlockReaderTestUtil.enableHdfsCachingTracing();
+
+    final int TEST_FILE_LENGTH = 16385;
+    final Path TEST_PATH = new Path("/a");
+    final int RANDOM_SEED = 23453;
+    HdfsConfiguration conf = initZeroCopyTest();
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    final String CONTEXT = "testZeroCopyReadOfCachedData";
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, 4096));
+    MiniDFSCluster cluster = null;
+    ByteBuffer result = null;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, TEST_PATH,
+        TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+    DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+    byte original[] = DFSTestUtil.
+        calculateFileContentsFromSeed(RANDOM_SEED, TEST_FILE_LENGTH);
+
+    // Prior to caching, the file can't be read via zero-copy
+    FSDataInputStream fsIn = fs.open(TEST_PATH);
+    try {
+      result = fsIn.read(null, TEST_FILE_LENGTH / 2,
+          EnumSet.noneOf(ReadOption.class));
+      Assert.fail("expected UnsupportedOperationException");
+    } catch (UnsupportedOperationException e) {
+      // expected
+    }
+    // Cache the file
+    fs.addCachePool(new CachePoolInfo("pool1"));
+    long directiveId = fs.addCacheDirective(new CacheDirectiveInfo.Builder().
+        setPath(TEST_PATH).
+        setReplication((short)1).
+        setPool("pool1").
+        build());
+    int numBlocks = (int)Math.ceil((double)TEST_FILE_LENGTH / BLOCK_SIZE);
+    DFSTestUtil.verifyExpectedCacheUsage(
+        DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, BLOCK_SIZE),
+        numBlocks, cluster.getDataNodes().get(0).getFSDataset());
+    try {
+      result = fsIn.read(null, TEST_FILE_LENGTH,
+          EnumSet.noneOf(ReadOption.class));
+    } catch (UnsupportedOperationException e) {
+      Assert.fail("expected to be able to read cached file via zero-copy");
+    }
+    // Verify result
+    Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
+        BLOCK_SIZE), byteBufferToArray(result));
+    // check that the replica is anchored 
+    final ExtendedBlock firstBlock =
+        DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+    final ShortCircuitCache cache = ClientContext.get(
+        CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache();
+    waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
+    // Uncache the replica
+    fs.removeCacheDirective(directiveId);
+    waitForReplicaAnchorStatus(cache, firstBlock, false, true, 1);
+    fsIn.releaseBuffer(result);
+    waitForReplicaAnchorStatus(cache, firstBlock, false, false, 1);
+    DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
+
+    fsIn.close();
+    fs.close();
+    cluster.shutdown();
+  }
+  
+  private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
+      final ExtendedBlock block, final boolean expectedIsAnchorable,
+        final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
+          throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        final MutableBoolean result = new MutableBoolean(false);
+        cache.accept(new CacheVisitor() {
+          @Override
+          public void visit(int numOutstandingMmaps,
+              Map<ExtendedBlockId, ShortCircuitReplica> replicas,
+              Map<ExtendedBlockId, InvalidToken> failedLoads,
+              Map<Long, ShortCircuitReplica> evictable,
+              Map<Long, ShortCircuitReplica> evictableMmapped) {
+            Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
+            ShortCircuitReplica replica =
+                replicas.get(ExtendedBlockId.fromExtendedBlock(block));
+            Assert.assertNotNull(replica);
+            Slot slot = replica.getSlot();
+            if ((expectedIsAnchorable != slot.isAnchorable()) ||
+                (expectedIsAnchored != slot.isAnchored())) {
+              LOG.info("replica " + replica + " has isAnchorable = " +
+                slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 
+                ".  Waiting for isAnchorable = " + expectedIsAnchorable + 
+                ", isAnchored = " + expectedIsAnchored);
+              return;
+            }
+            result.setValue(true);
+          }
+        });
+        return result.toBoolean();
+      }
+    }, 10, 60000);
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Mar  3 03:58:37 2014
@@ -31,6 +31,7 @@ import java.util.Random;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -38,9 +39,13 @@ import org.apache.hadoop.hdfs.net.TcpPee
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -206,6 +211,15 @@ public class BlockReaderTestUtil {
     return cluster.getDataNode(ipcport);
   }
   
+  public static void enableHdfsCachingTracing() {
+    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(CacheManager.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(FsDatasetCache.class.getName()).setLevel(
+        Level.TRACE);
+  }
+
   public static void enableBlockReaderFactoryTracing() {
     LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
         Level.TRACE);
@@ -213,5 +227,18 @@ public class BlockReaderTestUtil {
         Level.TRACE);
     LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
         Level.TRACE);
+    LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
+        Level.TRACE);
+  }
+
+  public static void enableShortCircuitShmTracing() {
+    LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(DataNode.class.getName()).setLevel(
+        Level.TRACE);
   }
 }
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Mon Mar  3 03:58:37 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
 import org.apache.commons.io.FileUtils;
@@ -48,15 +49,18 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.VersionInfo;
 
 import java.io.*;
@@ -1090,4 +1094,47 @@ public class DFSTestUtil {
     buf.duplicate().get(arr);
     return arr;
   }
+
+  /**
+   * Blocks until cache usage hits the expected new value.
+   */
+  public static long verifyExpectedCacheUsage(final long expectedCacheUsed,
+      final long expectedBlocks, final FsDatasetSpi<?> fsd) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      private int tries = 0;
+      
+      @Override
+      public Boolean get() {
+        long curCacheUsed = fsd.getCacheUsed();
+        long curBlocks = fsd.getNumBlocksCached();
+        if ((curCacheUsed != expectedCacheUsed) ||
+            (curBlocks != expectedBlocks)) {
+          if (tries++ > 10) {
+            LOG.info("verifyExpectedCacheUsage: have " +
+                curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
+                curBlocks + "/" + expectedBlocks + " blocks cached. " +
+                "memlock limit = " +
+                NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
+                ".  Waiting...");
+          }
+          return false;
+        }
+        return true;
+      }
+    }, 100, 60000);
+    return expectedCacheUsed;
+  }
+
+  /**
+   * Round a long value up to a multiple of a factor.
+   *
+   * @param val    The value.
+   * @param factor The factor to round up to.  Must be > 1.
+   * @return       The rounded value.
+   */
+  public static long roundUpToMultiple(long val, int factor) {
+    assert (factor > 1);
+    long c = (val + factor - 1) / factor;
+    return c * factor;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Mon Mar  3 03:58:37 2014
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,8 +31,11 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager.PerDatanodeVisitorInfo;
+import org.apache.hadoop.hdfs.client.DfsClientShmManager.Visitor;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@@ -47,6 +52,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
 import static org.hamcrest.CoreMatchers.equalTo;
 
@@ -56,10 +62,6 @@ public class TestBlockReaderFactory {
   @Before
   public void init() {
     DomainSocket.disableBindPathValidation();
-  }
-
-  @Before
-  public void before() {
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
   }
 
@@ -69,7 +71,7 @@ public class TestBlockReaderFactory {
     BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
   }
 
-  private static Configuration createShortCircuitConf(String testName,
+  public static Configuration createShortCircuitConf(String testName,
       TemporarySocketDirectory sockDir) {
     Configuration conf = new Configuration();
     conf.set(DFS_CLIENT_CONTEXT, testName);
@@ -99,6 +101,8 @@ public class TestBlockReaderFactory {
     // the client is.  Both support UNIX domain reads.
     Configuration clientConf = createShortCircuitConf(
         "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir);
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
     clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
     Configuration serverConf = new Configuration(clientConf);
     serverConf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
@@ -289,4 +293,87 @@ public class TestBlockReaderFactory {
     sockDir.close();
     Assert.assertFalse(testFailed.get());
   }
+
+   /**
+   * Test that a client which supports short-circuit reads using
+   * shared memory can fall back to not using shared memory when
+   * the server doesn't support it.
+   */
+  @Test
+  public void testShortCircuitReadFromServerWithoutShm() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration clientConf = createShortCircuitConf(
+        "testShortCircuitReadFromServerWithoutShm", sockDir);
+    Configuration serverConf = new Configuration(clientConf);
+    serverConf.setInt(
+        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testShortCircuitReadFromServerWithoutShm_clientContext");
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    final DatanodeInfo datanode =
+        new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
+    cache.getDfsClientShmManager().visit(new Visitor() {
+      @Override
+      public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+          throws IOException {
+        Assert.assertEquals(1,  info.size());
+        PerDatanodeVisitorInfo vinfo = info.get(datanode);
+        Assert.assertTrue(vinfo.disabled);
+        Assert.assertEquals(0, vinfo.full.size());
+        Assert.assertEquals(0, vinfo.notFull.size());
+      }
+    });
+    cluster.shutdown();
+  }
+ 
+  /**
+   * Test that a client which does not support short-circuit reads using
+   * shared memory can talk with a server which supports it.
+   */
+  @Test
+  public void testShortCircuitReadFromClientWithoutShm() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration clientConf = createShortCircuitConf(
+        "testShortCircuitReadWithoutShm", sockDir);
+    Configuration serverConf = new Configuration(clientConf);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    clientConf.setInt(
+        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testShortCircuitReadFromClientWithoutShm_clientContext");
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.dfs.getClientContext().getShortCircuitCache();
+    Assert.assertEquals(null, cache.getDfsClientShmManager());
+    cluster.shutdown();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1573433&r1=1573432&r2=1573433&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Mon Mar  3 03:58:37 2014
@@ -23,19 +23,21 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.ShortCircuitCache;
 import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.util.Time;
@@ -132,6 +134,8 @@ public class TestBlockReaderLocal {
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
     
     FileSystem fs = null;
+    ShortCircuitShm shm = null;
+    RandomAccessFile raf = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
@@ -156,7 +160,6 @@ public class TestBlockReaderLocal {
       File dataFile = MiniDFSCluster.getBlockFile(0, block);
       File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
 
-      DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
       ShortCircuitCache shortCircuitCache =
           ClientContext.getFromConf(conf).getShortCircuitCache();
       cluster.shutdown();
@@ -168,15 +171,23 @@ public class TestBlockReaderLocal {
       };
       dataIn = streams[0];
       metaIn = streams[1];
-      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
-      ShortCircuitReplica replica = new ShortCircuitReplica(
-          key, dataIn, metaIn, shortCircuitCache, Time.now());
+      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+          block.getBlockPoolId());
+      raf = new RandomAccessFile(
+          new File(sockDir.getDir().getAbsolutePath(),
+            UUID.randomUUID().toString()), "rw");
+      raf.setLength(8192);
+      FileInputStream shmStream = new FileInputStream(raf.getFD());
+      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
+      ShortCircuitReplica replica = 
+          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
+              Time.now(), shm.allocAndRegisterSlot(
+                  ExtendedBlockId.fromExtendedBlock(block)));
       blockReaderLocal = new BlockReaderLocal.Builder(
               new DFSClient.Conf(conf)).
           setFilename(TEST_PATH.getName()).
           setBlock(block).
           setShortCircuitReplica(replica).
-          setDatanodeID(datanodeID).
           setCachingStrategy(new CachingStrategy(false, readahead)).
           setVerifyChecksum(checksum).
           build();
@@ -193,6 +204,8 @@ public class TestBlockReaderLocal {
       if (dataIn != null) dataIn.close();
       if (metaIn != null) metaIn.close();
       if (blockReaderLocal != null) blockReaderLocal.close();
+      if (shm != null) shm.free();
+      if (raf != null) raf.close();
     }
   }
   
@@ -369,13 +382,13 @@ public class TestBlockReaderLocal {
       assertArrayRegionsEqual(original, 6657,
           DFSTestUtil.asArray(buf), 0,
           1);
-      reader.setMlocked(true);
+      reader.forceAnchorable();
       readFully(reader, buf, 0, 5120);
       buf.flip();
       assertArrayRegionsEqual(original, 6658,
           DFSTestUtil.asArray(buf), 0,
           5120);
-      reader.setMlocked(false);
+      reader.forceUnanchorable();
       readFully(reader, buf, 0, 513);
       buf.flip();
       assertArrayRegionsEqual(original, 11778,
@@ -544,10 +557,10 @@ public class TestBlockReaderLocal {
       assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
       readFully(reader, buf, 10, 100);
       assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
-      reader.setMlocked(true);
+      reader.forceAnchorable();
       readFully(reader, buf, 110, 700);
       assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
-      reader.setMlocked(false);
+      reader.forceUnanchorable();
       reader.skip(1); // skip from offset 810 to offset 811
       readFully(reader, buf, 811, 5);
       assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
@@ -599,10 +612,10 @@ public class TestBlockReaderLocal {
       assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
       readFully(reader, buf, 10, 100);
       assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
-      reader.setMlocked(true);
+      reader.forceAnchorable();
       readFully(reader, buf, 110, 700);
       assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
-      reader.setMlocked(false);
+      reader.forceUnanchorable();
       reader.skip(1); // skip from offset 810 to offset 811
       readFully(reader, buf, 811, 5);
       assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);



Mime
View raw message