hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1567728 [2/2] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ hadoop-hdfs/src/...
Date Wed, 12 Feb 2014 19:24:47 GMT
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java Wed Feb 12 19:24:46 2014
@@ -27,29 +27,71 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.net.unix.DomainSocket;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
 class DomainSocketFactory {
   private static final Log LOG = BlockReaderLocal.LOG;
-  private final Conf conf;
 
-  enum PathStatus {
-    UNUSABLE,
-    SHORT_CIRCUIT_DISABLED,
+  public enum PathState {
+    UNUSABLE(false, false),
+    SHORT_CIRCUIT_DISABLED(true, false),
+    VALID(true, true);
+
+    PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) {
+      this.usableForDataTransfer = usableForDataTransfer;
+      this.usableForShortCircuit = usableForShortCircuit;
+    }
+
+    public boolean getUsableForDataTransfer() {
+      return usableForDataTransfer;
+    }
+
+    public boolean getUsableForShortCircuit() {
+      return usableForShortCircuit;
+    }
+
+    private final boolean usableForDataTransfer;
+    private final boolean usableForShortCircuit;
+  }
+
+  public static class PathInfo {
+    private final static PathInfo NOT_CONFIGURED =
+          new PathInfo("", PathState.UNUSABLE);
+
+    final private String path;
+    final private PathState state;
+
+    PathInfo(String path, PathState state) {
+      this.path = path;
+      this.state = state;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public PathState getPathState() {
+      return state;
+    }
+    
+    @Override
+    public String toString() {
+      return new StringBuilder().append("PathInfo{path=").append(path).
+          append(", state=").append(state).append("}").toString();
+    }
   }
 
   /**
    * Information about domain socket paths.
    */
-  Cache<String, PathStatus> pathInfo =
+  Cache<String, PathState> pathMap =
       CacheBuilder.newBuilder()
       .expireAfterWrite(10, TimeUnit.MINUTES)
       .build();
 
   public DomainSocketFactory(Conf conf) {
-    this.conf = conf;
-
     final String feature;
     if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) {
       feature = "The short-circuit local reads feature";
@@ -75,51 +117,46 @@ class DomainSocketFactory {
   }
 
   /**
-   * Create a DomainSocket.
-   * 
-   * @param addr        The address of the DataNode
-   * @param stream      The DFSInputStream the socket will be created for.
+   * Get information about a domain socket path.
+   *
+   * @param addr         The inet address to use.
+   * @param conf         The client configuration.
    *
-   * @return            null if the socket could not be created; the
-   *                    socket otherwise.  If there was an error while
-   *                    creating the socket, we will add the socket path
-   *                    to our list of failed domain socket paths.
+   * @return             Information about the socket path.
    */
-  DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
+  public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) {
     // If there is no domain socket path configured, we can't use domain
     // sockets.
-    if (conf.domainSocketPath.isEmpty()) return null;
+    if (conf.domainSocketPath.isEmpty()) return PathInfo.NOT_CONFIGURED;
     // If we can't do anything with the domain socket, don't create it.
     if (!conf.domainSocketDataTraffic &&
         (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) {
-      return null;
+      return PathInfo.NOT_CONFIGURED;
     }
-    // UNIX domain sockets can only be used to talk to local peers
-    if (!DFSClient.isLocalAddress(addr)) return null;
     // If the DomainSocket code is not loaded, we can't create
     // DomainSocket objects.
-    if (DomainSocket.getLoadingFailureReason() != null) return null;
+    if (DomainSocket.getLoadingFailureReason() != null) {
+      return PathInfo.NOT_CONFIGURED;
+    }
+    // UNIX domain sockets can only be used to talk to local peers
+    if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
     String escapedPath = DomainSocket.
         getEffectivePath(conf.domainSocketPath, addr.getPort());
-    PathStatus info = pathInfo.getIfPresent(escapedPath);
-    if (info == PathStatus.UNUSABLE) {
-      // We tried to connect to this domain socket before, and it was totally
-      // unusable.
-      return null;
-    }
-    if ((!conf.domainSocketDataTraffic) &&
-        ((info == PathStatus.SHORT_CIRCUIT_DISABLED) || 
-            stream.shortCircuitForbidden())) {
-      // If we don't want to pass data over domain sockets, and we don't want
-      // to pass file descriptors over them either, we have no use for domain
-      // sockets.
-      return null;
+    PathState status = pathMap.getIfPresent(escapedPath);
+    if (status == null) {
+      return new PathInfo(escapedPath, PathState.VALID);
+    } else {
+      return new PathInfo(escapedPath, status);
     }
+  }
+
+  public DomainSocket createSocket(PathInfo info, int socketTimeout) {
+    Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE);
     boolean success = false;
     DomainSocket sock = null;
     try {
-      sock = DomainSocket.connect(escapedPath);
-      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, conf.socketTimeout);
+      sock = DomainSocket.connect(info.getPath());
+      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout);
       success = true;
     } catch (IOException e) {
       LOG.warn("error creating DomainSocket", e);
@@ -129,7 +166,7 @@ class DomainSocketFactory {
         if (sock != null) {
           IOUtils.closeQuietly(sock);
         }
-        pathInfo.put(escapedPath, PathStatus.UNUSABLE);
+        pathMap.put(info.getPath(), PathState.UNUSABLE);
         sock = null;
       }
     }
@@ -137,10 +174,10 @@ class DomainSocketFactory {
   }
 
   public void disableShortCircuitForPath(String path) {
-    pathInfo.put(path, PathStatus.SHORT_CIRCUIT_DISABLED);
+    pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED);
   }
 
   public void disableDomainSocketPath(String path) {
-    pathInfo.put(path, PathStatus.UNUSABLE);
+    pathMap.put(path, PathState.UNUSABLE);
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java Wed Feb 12 19:24:46 2014
@@ -89,42 +89,19 @@ class PeerCache {
     LinkedListMultimap.create();
   private final int capacity;
   private final long expiryPeriod;
-  private static PeerCache instance = null;
   
-  @VisibleForTesting
-  PeerCache(int c, long e) {
+  public PeerCache(int c, long e) {
     this.capacity = c;
     this.expiryPeriod = e;
 
     if (capacity == 0 ) {
       LOG.info("SocketCache disabled.");
-    }
-    else if (expiryPeriod == 0) {
+    } else if (expiryPeriod == 0) {
       throw new IllegalStateException("Cannot initialize expiryPeriod to " +
-         expiryPeriod + "when cache is enabled.");
+         expiryPeriod + " when cache is enabled.");
     }
   }
  
-  public static synchronized PeerCache getInstance(int c, long e) {
-    // capacity is only initialized once
-    if (instance == null) {
-      instance = new PeerCache(c, e);
-    } else { //already initialized once
-      if (instance.capacity != c || instance.expiryPeriod != e) {
-        LOG.info("capacity and expiry periods already set to " +
-          instance.capacity + " and " + instance.expiryPeriod +
-          " respectively. Cannot set it to " + c + " and " + e);
-      }
-    }
-
-    return instance;
-  }
-
-  @VisibleForTesting
-  public static synchronized void setInstance(int c, long e) {
-    instance = new PeerCache(c, e);
-  }
-
   private boolean isDaemonStarted() {
     return (daemon == null)? false: true;
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Feb 12 19:24:46 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FSInputCheck
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -492,8 +491,7 @@ public class RemoteBlockReader extends F
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Wed Feb 12 19:24:46 2014
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -457,8 +456,7 @@ public class RemoteBlockReader2  impleme
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Wed Feb 12 19:24:46 2014
@@ -17,24 +17,14 @@
  */
 package org.apache.hadoop.hdfs.client;
 
-import java.io.FileInputStream;
-
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.io.nativeio.NativeIO;
 
-import java.io.IOException;
-import java.lang.ref.WeakReference;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel.MapMode;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * A memory-mapped region used by an HDFS client.
  * 
@@ -46,111 +36,46 @@ public class ClientMmap {
   static final Log LOG = LogFactory.getLog(ClientMmap.class);
   
   /**
-   * A reference to the manager of this mmap.
-   * 
-   * This is only a weak reference to help minimize the damange done by
-   * code which leaks references accidentally.
+   * A reference to the block replica which this mmap relates to.
    */
-  private final WeakReference<ClientMmapManager> manager;
+  private final ShortCircuitReplica replica;
   
   /**
-   * The actual mapped memory region.
+   * The java ByteBuffer object.
    */
   private final MappedByteBuffer map;
-  
-  /**
-   * A reference count tracking how many threads are using this object.
-   */
-  private final AtomicInteger refCount = new AtomicInteger(1);
 
   /**
-   * Block pertaining to this mmap
+   * Reference count of this ClientMmap object.
    */
-  private final ExtendedBlock block;
-  
-  /**
-   * The DataNode where this mmap came from.
-   */
-  private final DatanodeID datanodeID;
-
-  /**
-   * The monotonic time when this mmap was last evictable.
-   */
-  private long lastEvictableTimeNs;
-
-  public static ClientMmap load(ClientMmapManager manager, FileInputStream in, 
-      ExtendedBlock block, DatanodeID datanodeID) 
-          throws IOException {
-    MappedByteBuffer map =
-        in.getChannel().map(MapMode.READ_ONLY, 0,
-            in.getChannel().size());
-    return new ClientMmap(manager, map, block, datanodeID);
-  }
+  private final AtomicInteger refCount = new AtomicInteger(1);
 
-  private ClientMmap(ClientMmapManager manager, MappedByteBuffer map, 
-        ExtendedBlock block, DatanodeID datanodeID) 
-            throws IOException {
-    this.manager = new WeakReference<ClientMmapManager>(manager);
+  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map) {
+    this.replica = replica;
     this.map = map;
-    this.block = block;
-    this.datanodeID = datanodeID;
-    this.lastEvictableTimeNs = 0;
   }
 
   /**
-   * Decrement the reference count on this object.
-   * Should be called with the ClientMmapManager lock held.
+   * Increment the reference count.
+   *
+   * @return   The new reference count.
    */
-  public void unref() {
-    int count = refCount.decrementAndGet();
-    if (count < 0) {
-      throw new IllegalArgumentException("can't decrement the " +
-          "reference count on this ClientMmap lower than 0.");
-    } else if (count == 0) {
-      ClientMmapManager man = manager.get();
-      if (man == null) {
-        unmap();
-      } else {
-        man.makeEvictable(this);
-      }
-    }
+  void ref() {
+    refCount.addAndGet(1);
   }
 
   /**
-   * Increment the reference count on this object.
+   * Decrement the reference count.
    *
-   * @return     The new reference count.
+   * The parent replica gets unreferenced each time the reference count 
+   * of this object goes to 0.
    */
-  public int ref() {
-    return refCount.getAndIncrement();
-  }
-
-  @VisibleForTesting
-  public ExtendedBlock getBlock() {
-    return block;
-  }
-
-  DatanodeID getDatanodeID() {
-    return datanodeID;
+  public void unref() {
+    refCount.addAndGet(-1);
+    replica.unref();
   }
 
   public MappedByteBuffer getMappedByteBuffer() {
     return map;
   }
-
-  public void setLastEvictableTimeNs(long lastEvictableTimeNs) {
-    this.lastEvictableTimeNs = lastEvictableTimeNs;
-  }
-
-  public long getLastEvictableTimeNs() {
-    return this.lastEvictableTimeNs;
-  }
-
-  /**
-   * Unmap the memory region.
-   */
-  void unmap() {
-    assert(refCount.get() == 0);
-    NativeIO.POSIX.munmap(map);
-  }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Feb 12 19:24:46 2014
@@ -27,8 +27,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -225,44 +228,67 @@ public class JspHelper {
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, Configuration conf, DFSClient.Conf dfsConf,
-      DataEncryptionKey encryptionKey)
+      JspWriter out, final Configuration conf, DFSClient.Conf dfsConf,
+      final DataEncryptionKey encryptionKey)
           throws IOException {
     if (chunkSizeToView == 0) return;
-    Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-    s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
-    s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-      
     int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
-      // Use the block name for file name. 
-    String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file,
-        new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead,  true,
-        "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
-        new DatanodeID(addr.getAddress().getHostAddress(),
-            addr.getHostName(), poolId, addr.getPort(), 0, 0, 0), null,
-            null, null, false, CachingStrategy.newDefaultStrategy());
-        
+    BlockReader blockReader = new BlockReaderFactory(dfsConf).
+      setInetSocketAddress(addr).
+      setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
+      setFileName(BlockReaderFactory.getFileName(addr, poolId, blockId)).
+      setBlockToken(blockToken).
+      setStartOffset(offsetIntoBlock).
+      setLength(amtToRead).
+      setVerifyChecksum(true).
+      setClientName("JspHelper").
+      setClientCacheContext(ClientContext.getFromConf(conf)).
+      setDatanodeInfo(new DatanodeInfo(
+          new DatanodeID(addr.getAddress().getHostAddress(),
+              addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setConfiguration(conf).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+          try {
+            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+            peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeSocket(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
+
     final byte[] buf = new byte[amtToRead];
-    int readOffset = 0;
-    int retries = 2;
-    while ( amtToRead > 0 ) {
-      int numRead = amtToRead;
-      try {
-        blockReader.readFully(buf, readOffset, amtToRead);
-      }
-      catch (IOException e) {
-        retries--;
-        if (retries == 0)
-          throw new IOException("Could not read data from datanode");
-        continue;
+    try {
+      int readOffset = 0;
+      int retries = 2;
+      while (amtToRead > 0) {
+        int numRead = amtToRead;
+        try {
+          blockReader.readFully(buf, readOffset, amtToRead);
+        } catch (IOException e) {
+          retries--;
+          if (retries == 0)
+            throw new IOException("Could not read data from datanode");
+          continue;
+        }
+        amtToRead -= numRead;
+        readOffset += numRead;
       }
-      amtToRead -= numRead;
-      readOffset += numRead;
+    } finally {
+      blockReader.close();
     }
-    blockReader.close();
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Wed Feb 12 19:24:46 2014
@@ -34,6 +34,8 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 
 /**
@@ -55,7 +57,8 @@ public class BlockMetadataHeader {
   private short version;
   private DataChecksum checksum = null;
     
-  BlockMetadataHeader(short version, DataChecksum checksum) {
+  @VisibleForTesting
+  public BlockMetadataHeader(short version, DataChecksum checksum) {
     this.checksum = checksum;
     this.version = version;
   }
@@ -148,7 +151,8 @@ public class BlockMetadataHeader {
    * @return 
    * @throws IOException
    */
-  private static void writeHeader(DataOutputStream out, 
+  @VisibleForTesting
+  public static void writeHeader(DataOutputStream out, 
                                   BlockMetadataHeader header) 
                                   throws IOException {
     out.writeShort(header.getVersion());

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Feb 12 19:24:46 2014
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,9 +41,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -569,11 +573,10 @@ public class NamenodeFsck {
     int failures = 0;
     InetSocketAddress targetAddr = null;
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
-    Socket s = null;
     BlockReader blockReader = null; 
     ExtendedBlock block = lblock.getBlock(); 
 
-    while (s == null) {
+    while (blockReader == null) {
       DatanodeInfo chosenNode;
       
       try {
@@ -593,34 +596,47 @@ public class NamenodeFsck {
         continue;
       }
       try {
-        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-        s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-        s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-        
-        String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
-            block.getBlockId());
-        blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
-            file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
-            TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                getDataEncryptionKey()), chosenNode, null, null, null, 
-                false, CachingStrategy.newDropBehind());
-        
+        String file = BlockReaderFactory.getFileName(targetAddr,
+            block.getBlockPoolId(), block.getBlockId());
+        blockReader = new BlockReaderFactory(dfs.getConf()).
+            setFileName(file).
+            setBlock(block).
+            setBlockToken(lblock.getBlockToken()).
+            setStartOffset(0).
+            setLength(-1).
+            setVerifyChecksum(true).
+            setClientName("fsck").
+            setDatanodeInfo(chosenNode).
+            setInetSocketAddress(targetAddr).
+            setCachingStrategy(CachingStrategy.newDropBehind()).
+            setClientCacheContext(dfs.getClientContext()).
+            setConfiguration(namenode.conf).
+            setRemotePeerFactory(new RemotePeerFactory() {
+              @Override
+              public Peer newConnectedPeer(InetSocketAddress addr)
+                  throws IOException {
+                Peer peer = null;
+                Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
+                try {
+                  s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+                  s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+                  peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
+                        getDataEncryptionKey());
+                } finally {
+                  if (peer == null) {
+                    IOUtils.closeQuietly(s);
+                  }
+                }
+                return peer;
+              }
+            }).
+            build();
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         LOG.info("Failed to connect to " + targetAddr + ":" + ex);
         deadNodes.add(chosenNode);
-        if (s != null) {
-          try {
-            s.close();
-          } catch (IOException iex) {
-          }
-        }
-        s = null;
       }
     }
-    if (blockReader == null) {
-      throw new Exception("Could not open data stream for " + lblock.getBlock());
-    }
     byte[] buf = new byte[1024];
     int cnt = 0;
     boolean success = true;
@@ -638,10 +654,11 @@ public class NamenodeFsck {
       LOG.error("Error reading block", e);
       success = false;
     } finally {
-      try {s.close(); } catch (Exception e1) {}
+      blockReader.close();
     }
-    if (!success)
+    if (!success) {
       throw new Exception("Could not copy block data for " + lblock.getBlock());
+    }
   }
       
   /*

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Feb 12 19:24:46 2014
@@ -162,6 +162,16 @@
 </property>
 
 <property>
+  <name>dfs.client.cached.conn.retry</name>
+  <value>3</value>
+  <description>The number of times the HDFS client will pull a socket from the
+   cache.  Once this number is exceeded, the client will try to create a new
+   socket.
+  </description>
+</property>
+
+
+<property>
   <name>dfs.https.server.keystore.resource</name>
   <value>ssl-server.xml</value>
   <description>Resource file from which ssl server keystore
@@ -1490,6 +1500,26 @@
 </property>
 
 <property>
+  <name>dfs.client.mmap.retry.timeout.ms</name>
+  <value>300000</value>
+  <description>
+    The minimum amount of time that we will wait before retrying a failed mmap
+    operation.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.short.circuit.replica.stale.threshold.ms</name>
+  <value>3000000</value>
+  <description>
+    The maximum amount of time that we will consider a short-circuit replica to
+    be valid, if there is no communication from the DataNode.  After this time
+    has elapsed, we will re-fetch the short-circuit replica even if it is in
+    the cache.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.path.based.cache.block.map.allocation.percent</name>
   <value>0.25</value>
   <description>
@@ -1618,4 +1648,15 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.client.context</name>
+  <value>default</value>
+  <description>
+    The name of the DFSClient context that we should use.  Clients that share
+    a context share a socket cache and short-circuit cache, among other things.
+    You should only change this if you don't want to share with another set of
+    threads.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Wed Feb 12 19:24:46 2014
@@ -28,32 +28,40 @@ import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Map;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 
@@ -250,17 +258,39 @@ public class TestEnhancedByteBufferAcces
     }
   }
 
-  private static class CountingVisitor
-      implements ClientMmapManager.ClientMmapVisitor {
-    int count = 0;
-
-    @Override
-    public void accept(ClientMmap mmap) {
-      count++;
+  private static class CountingVisitor implements CacheVisitor {
+    private final int expectedNumOutstandingMmaps;
+    private final int expectedNumReplicas;
+    private final int expectedNumEvictable;
+    private final int expectedNumMmapedEvictable;
+
+    CountingVisitor(int expectedNumOutstandingMmaps,
+        int expectedNumReplicas, int expectedNumEvictable,
+        int expectedNumMmapedEvictable) {
+      this.expectedNumOutstandingMmaps = expectedNumOutstandingMmaps;
+      this.expectedNumReplicas = expectedNumReplicas;
+      this.expectedNumEvictable = expectedNumEvictable;
+      this.expectedNumMmapedEvictable = expectedNumMmapedEvictable;
     }
 
-    public void reset() {
-      count = 0;
+    @Override
+    public void visit(int numOutstandingMmaps,
+        Map<Key, ShortCircuitReplica> replicas,
+        Map<Key, InvalidToken> failedLoads,
+        Map<Long, ShortCircuitReplica> evictable,
+        Map<Long, ShortCircuitReplica> evictableMmapped) {
+      if (expectedNumOutstandingMmaps >= 0) {
+        Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
+      }
+      if (expectedNumReplicas >= 0) {
+        Assert.assertEquals(expectedNumReplicas, replicas.size());
+      }
+      if (expectedNumEvictable >= 0) {
+        Assert.assertEquals(expectedNumEvictable, evictable.size());
+      }
+      if (expectedNumMmapedEvictable >= 0) {
+        Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
+      }
     }
   }
 
@@ -271,105 +301,98 @@ public class TestEnhancedByteBufferAcces
     final Path TEST_PATH = new Path("/a");
     final int TEST_FILE_LENGTH = 16385;
     final int RANDOM_SEED = 23453;
+    final String CONTEXT = "testZeroCopyMmapCacheContext";
     FSDataInputStream fsIn = null;
-    ByteBuffer results[] = { null, null, null, null, null };
-    
+    ByteBuffer results[] = { null, null, null, null };
+
     DistributedFileSystem fs = null;
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, TEST_PATH,
+        TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
     try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-      cluster.waitActive();
-      fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, TEST_PATH,
-          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
-      try {
-        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
-      } catch (InterruptedException e) {
-        Assert.fail("unexpected InterruptedException during " +
-            "waitReplication: " + e);
-      } catch (TimeoutException e) {
-        Assert.fail("unexpected TimeoutException during " +
-            "waitReplication: " + e);
+      DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+    } catch (InterruptedException e) {
+      Assert.fail("unexpected InterruptedException during " +
+          "waitReplication: " + e);
+    } catch (TimeoutException e) {
+      Assert.fail("unexpected TimeoutException during " +
+          "waitReplication: " + e);
+    }
+    fsIn = fs.open(TEST_PATH);
+    byte original[] = new byte[TEST_FILE_LENGTH];
+    IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+    fsIn.close();
+    fsIn = fs.open(TEST_PATH);
+    final ShortCircuitCache cache = ClientContext.get(
+        CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache();
+    cache.accept(new CountingVisitor(0, 5, 5, 0));
+    results[0] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+    fsIn.seek(0);
+    results[1] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+
+    // The mmap should be of the first block of the file.
+    final ExtendedBlock firstBlock =
+        DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+    cache.accept(new CacheVisitor() {
+      @Override
+      public void visit(int numOutstandingMmaps,
+          Map<Key, ShortCircuitReplica> replicas,
+          Map<Key, InvalidToken> failedLoads, 
+          Map<Long, ShortCircuitReplica> evictable,
+          Map<Long, ShortCircuitReplica> evictableMmapped) {
+        ShortCircuitReplica replica = replicas.get(
+            new Key(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
+        Assert.assertNotNull(replica);
+        Assert.assertTrue(replica.hasMmap());
+        // The replica should not yet be evictable, since we have it open.
+        Assert.assertNull(replica.getEvictableTimeNs());
       }
-      fsIn = fs.open(TEST_PATH);
-      byte original[] = new byte[TEST_FILE_LENGTH];
-      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
-      fsIn.close();
-      fsIn = fs.open(TEST_PATH);
-      final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
-      final CountingVisitor countingVisitor = new CountingVisitor();
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-      mmapManager.visitEvictable(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-      results[0] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      fsIn.seek(0);
-      results[1] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(1, countingVisitor.count);
-      countingVisitor.reset();
-      mmapManager.visitEvictable(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-      countingVisitor.reset();
-
-      // The mmaps should be of the first block of the file.
-      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
-      mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
-        @Override
-        public void accept(ClientMmap mmap) {
-          Assert.assertEquals(firstBlock, mmap.getBlock());
-        }
-      });
+    });
 
-      // Read more blocks.
-      results[2] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      results[3] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      try {
-        results[4] = fsIn.read(null, 4096,
-            EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-        Assert.fail("expected UnsupportedOperationException");
-      } catch (UnsupportedOperationException e) {
-        // expected
+    // Read more blocks.
+    results[2] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+    results[3] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+
+    // we should have 3 mmaps, 1 evictable
+    cache.accept(new CountingVisitor(3, 5, 2, 0));
+
+    // After we close the cursors, the mmaps should be evictable for 
+    // a brief period of time.  Then, they should be closed (we're 
+    // using a very quick timeout)
+    for (ByteBuffer buffer : results) {
+      if (buffer != null) {
+        fsIn.releaseBuffer(buffer);
       }
-
-      // we should have 3 mmaps, 0 evictable
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(3, countingVisitor.count);
-      countingVisitor.reset();
-      mmapManager.visitEvictable(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-
-      // After we close the cursors, the mmaps should be evictable for 
-      // a brief period of time.  Then, they should be closed (we're 
-      // using a very quick timeout)
-      for (ByteBuffer buffer : results) {
-        if (buffer != null) {
-          fsIn.releaseBuffer(buffer);
-        }
-      }
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        public Boolean get() {
-          countingVisitor.reset();
-          try {
-            mmapManager.visitEvictable(countingVisitor);
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-            return false;
-          }
-          return (0 == countingVisitor.count);
-        }
-      }, 10, 10000);
-      countingVisitor.reset();
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-    } finally {
-      if (fsIn != null) fsIn.close();
-      if (fs != null) fs.close();
-      if (cluster != null) cluster.shutdown();
     }
+    fsIn.close();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        final MutableBoolean finished = new MutableBoolean(false);
+        cache.accept(new CacheVisitor() {
+          @Override
+          public void visit(int numOutstandingMmaps,
+              Map<Key, ShortCircuitReplica> replicas,
+              Map<Key, InvalidToken> failedLoads,
+              Map<Long, ShortCircuitReplica> evictable,
+              Map<Long, ShortCircuitReplica> evictableMmapped) {
+            finished.setValue(evictableMmapped.isEmpty());
+          }
+        });
+        return finished.booleanValue();
+      }
+    }, 10, 60000);
+
+    cache.accept(new CountingVisitor(0, -1, -1, -1));
+    
+    fs.close();
+    cluster.shutdown();
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Wed Feb 12 19:24:46 2014
@@ -28,8 +28,12 @@ import java.net.Socket;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -38,6 +42,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
 
 /**
  * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
@@ -141,22 +147,54 @@ public class BlockReaderTestUtil {
    */
   public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
       throws IOException {
+    return getBlockReader(cluster, testBlock, offset, lenToRead);
+  }
+
+  /**
+   * Get a BlockReader for the given block.
+   */
+  public static BlockReader getBlockReader(MiniDFSCluster cluster,
+      LocatedBlock testBlock, int offset, int lenToRead) throws IOException {
     InetSocketAddress targetAddr = null;
-    Socket sock = null;
     ExtendedBlock block = testBlock.getBlock();
     DatanodeInfo[] nodes = testBlock.getLocations();
     targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
-    sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
-    sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-    sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-
-    return BlockReaderFactory.newBlockReader(
-      new DFSClient.Conf(conf),
-      targetAddr.toString()+ ":" + block.getBlockId(), block,
-      testBlock.getBlockToken(), 
-      offset, lenToRead,
-      true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
-      nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy());
+
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    return new BlockReaderFactory(fs.getClient().getConf()).
+      setInetSocketAddress(targetAddr).
+      setBlock(block).
+      setFileName(targetAddr.toString()+ ":" + block.getBlockId()).
+      setBlockToken(testBlock.getBlockToken()).
+      setStartOffset(offset).
+      setLength(lenToRead).
+      setVerifyChecksum(true).
+      setClientName("BlockReaderTestUtil").
+      setDatanodeInfo(nodes[0]).
+      setClientCacheContext(ClientContext.getFromConf(fs.getConf())).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setConfiguration(fs.getConf()).
+      setAllowShortCircuitLocalReads(true).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.
+              getDefaultSocketFactory(fs.getConf()).createSocket();
+          try {
+            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+            peer = TcpPeerServer.peerFromSocket(sock);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeQuietly(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
   }
 
   /**
@@ -167,4 +205,13 @@ public class BlockReaderTestUtil {
     int ipcport = nodes[0].getIpcPort();
     return cluster.getDataNode(ipcport);
   }
-}
+  
+  public static void enableBlockReaderFactoryTracing() {
+    LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
+        Level.TRACE);
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Feb 12 19:24:46 2014
@@ -185,10 +185,26 @@ public class DFSTestUtil {
     }
   }
 
-  public static String readFile(FileSystem fs, Path fileName) throws IOException {
+  public static String readFile(FileSystem fs, Path fileName)
+      throws IOException {
+    byte buf[] = readFileBuffer(fs, fileName);
+	return new String(buf, 0, buf.length);
+  }
+
+  public static byte[] readFileBuffer(FileSystem fs, Path fileName) 
+      throws IOException {
     ByteArrayOutputStream os = new ByteArrayOutputStream();
-    IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
-    return os.toString();
+    try {
+      FSDataInputStream in = fs.open(fileName);
+      try {
+        IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
+        return os.toByteArray();
+      } finally {
+        in.close();
+      }
+    } finally {
+      os.close();
+    }
   }
   
   public static void createFile(FileSystem fs, Path fileName, long fileLen, 
@@ -250,6 +266,13 @@ public class DFSTestUtil {
     }
   }
   
+  public static byte[] calculateFileContentsFromSeed(long seed, int length) {
+    Random rb = new Random(seed);
+    byte val[] = new byte[length];
+    rb.nextBytes(val);
+    return val;
+  }
+  
   /** check if the files have been copied correctly. */
   public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
     Path root = new Path(topdir);
@@ -569,8 +592,12 @@ public class DFSTestUtil {
   
   public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
     HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
-    in.readByte();
-    return in.getCurrentBlock();
+    try {
+      in.readByte();
+      return in.getCurrentBlock();
+    } finally {
+      in.close();
+    }
   }  
 
   public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Wed Feb 12 19:24:46 2014
@@ -30,13 +30,17 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -155,6 +159,8 @@ public class TestBlockReaderLocal {
       File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
 
       DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+      ShortCircuitCache shortCircuitCache =
+          ClientContext.getFromConf(conf).getShortCircuitCache();
       cluster.shutdown();
       cluster = null;
       test.setup(dataFile, checksum);
@@ -164,16 +170,17 @@ public class TestBlockReaderLocal {
       };
       dataIn = streams[0];
       metaIn = streams[1];
+      Key key = new Key(block.getBlockId(), block.getBlockPoolId());
+      ShortCircuitReplica replica = new ShortCircuitReplica(
+          key, dataIn, metaIn, shortCircuitCache, Time.now());
       blockReaderLocal = new BlockReaderLocal.Builder(
               new DFSClient.Conf(conf)).
           setFilename(TEST_PATH.getName()).
           setBlock(block).
-          setStreams(streams).
+          setShortCircuitReplica(replica).
           setDatanodeID(datanodeID).
           setCachingStrategy(new CachingStrategy(false, readahead)).
           setVerifyChecksum(checksum).
-          setBlockMetadataHeader(BlockMetadataHeader.preadHeader(
-              metaIn.getChannel())).
           build();
       dataIn = null;
       metaIn = null;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Feb 12 19:24:46 2014
@@ -25,18 +25,8 @@ import java.net.InetSocketAddress;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 /**
  * This class tests the client connection caching in a single node
@@ -49,30 +39,6 @@ public class TestConnCache {
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
 
   /**
-   * A mock Answer to remember the BlockReader used.
-   *
-   * It verifies that all invocation to DFSInputStream.getBlockReader()
-   * use the same peer.
-   */
-  private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
-    public RemoteBlockReader2 reader = null;
-    private Peer peer = null;
-
-    @Override
-    public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
-      RemoteBlockReader2 prevReader = reader;
-      reader = (RemoteBlockReader2) invocation.callRealMethod();
-      if (peer == null) {
-        peer = reader.getPeer();
-      } else if (prevReader != null) {
-        Assert.assertSame("DFSInputStream should use the same peer",
-                   peer, reader.getPeer());
-      }
-      return reader;
-    }
-  }
-
-  /**
    * (Optionally) seek to position, read and verify data.
    *
    * Seek to specified position if pos is non-negative.
@@ -115,33 +81,29 @@ public class TestConnCache {
    * @throws Exception 
    */
   @Test
-  @SuppressWarnings("unchecked")
   public void testReadFromOneDN() throws Exception {
-    BlockReaderTestUtil util = new BlockReaderTestUtil(1,
-        new HdfsConfiguration());
+    HdfsConfiguration configuration = new HdfsConfiguration();
+    // One of the goals of this test is to verify that we don't open more
+    // than one socket.  So use a different client context, so that we
+    // get our own socket cache, rather than sharing with the other test 
+    // instances.  Also use a really long socket timeout so that nothing
+    // gets closed before we get around to checking the cache size at the end.
+    final String contextName = "testReadFromOneDNContext";
+    configuration.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, contextName);
+    configuration.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+        100000000L);
+    BlockReaderTestUtil util = new BlockReaderTestUtil(1, configuration);
     final Path testFile = new Path("/testConnCache.dat");
     byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
     DFSClient client = new DFSClient(
         new InetSocketAddress("localhost",
             util.getCluster().getNameNodePort()), util.getConf());
-    DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
+    ClientContext cacheContext =
+        ClientContext.get(contextName, client.getConf());
+    DFSInputStream in = client.open(testFile.toString());
     LOG.info("opened " + testFile.toString());
     byte[] dataBuf = new byte[BLOCK_SIZE];
 
-    MockGetBlockReader answer = new MockGetBlockReader();
-    Mockito.doAnswer(answer).when(in).getBlockReader(
-                           (InetSocketAddress) Matchers.anyObject(),
-                           (DatanodeInfo) Matchers.anyObject(),
-                           Matchers.anyString(),
-                           (ExtendedBlock) Matchers.anyObject(),
-                           (Token<BlockTokenIdentifier>) Matchers.anyObject(),
-                           Matchers.anyLong(),
-                           Matchers.anyLong(),
-                           Matchers.anyInt(),
-                           Matchers.anyBoolean(),
-                           Matchers.anyString(),
-                           (CachingStrategy)Matchers.anyObject());
-
     // Initial read
     pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
     // Read again and verify that the socket is the same
@@ -153,5 +115,8 @@ public class TestConnCache {
     pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
 
     in.close();
+    client.close();
+    Assert.assertEquals(1,
+        ClientContext.getFromConf(configuration).getPeerCache().size());
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Wed Feb 12 19:24:46 2014
@@ -22,7 +22,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -86,21 +86,22 @@ public class TestDataTransferKeepalive {
     // the datanode-side expiration time.
     final long CLIENT_EXPIRY_MS = 60000L;
     clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testDatanodeRespectsKeepAliveTimeout");
     DistributedFileSystem fs =
         (DistributedFileSystem)FileSystem.get(cluster.getURI(),
             clientConf);
+    PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache();
 
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Clients that write aren't currently re-used.
-    assertEquals(0, fs.dfs.peerCache.size());
+    assertEquals(0, peerCache.size());
     assertXceiverCount(0);
 
     // Reads the file, so we should get a
     // cached socket, and should have an xceiver on the other side.
     DFSTestUtil.readFile(fs, TEST_FILE);
-    assertEquals(1, fs.dfs.peerCache.size());
+    assertEquals(1, peerCache.size());
     assertXceiverCount(1);
 
     // Sleep for a bit longer than the keepalive timeout
@@ -111,15 +112,13 @@ public class TestDataTransferKeepalive {
     // The socket is still in the cache, because we don't
     // notice that it's closed until we try to read
     // from it again.
-    assertEquals(1, fs.dfs.peerCache.size());
+    assertEquals(1, peerCache.size());
     
     // Take it out of the cache - reading should
     // give an EOF.
-    Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
+    Peer peer = peerCache.get(dn.getDatanodeId(), false);
     assertNotNull(peer);
     assertEquals(-1, peer.getInputStream().read());
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
-        DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
   }
 
   /**
@@ -132,34 +131,33 @@ public class TestDataTransferKeepalive {
     // the datanode-side expiration time.
     final long CLIENT_EXPIRY_MS = 10L;
     clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testClientResponsesKeepAliveTimeout");
     DistributedFileSystem fs =
         (DistributedFileSystem)FileSystem.get(cluster.getURI(),
             clientConf);
+    PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache();
 
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Clients that write aren't currently re-used.
-    assertEquals(0, fs.dfs.peerCache.size());
+    assertEquals(0, peerCache.size());
     assertXceiverCount(0);
 
     // Reads the file, so we should get a
     // cached socket, and should have an xceiver on the other side.
     DFSTestUtil.readFile(fs, TEST_FILE);
-    assertEquals(1, fs.dfs.peerCache.size());
+    assertEquals(1, peerCache.size());
     assertXceiverCount(1);
 
     // Sleep for a bit longer than the client keepalive timeout.
     Thread.sleep(CLIENT_EXPIRY_MS + 1);
     
     // Taking out a peer which is expired should give a null.
-    Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
+    Peer peer = peerCache.get(dn.getDatanodeId(), false);
     assertTrue(peer == null);
 
     // The socket cache is now empty.
-    assertEquals(0, fs.dfs.peerCache.size());
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
-        DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
+    assertEquals(0, peerCache.size());
   }
 
   /**
@@ -174,7 +172,7 @@ public class TestDataTransferKeepalive {
     final long CLIENT_EXPIRY_MS = 600000L;
     Configuration clientConf = new Configuration(conf);
     clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testSlowReader");
     DistributedFileSystem fs =
         (DistributedFileSystem)FileSystem.get(cluster.getURI(),
             clientConf);
@@ -209,7 +207,12 @@ public class TestDataTransferKeepalive {
   @Test(timeout=30000)
   public void testManyClosedSocketsInCache() throws Exception {
     // Make a small file
-    DistributedFileSystem fs = cluster.getFileSystem();
+    Configuration clientConf = new Configuration(conf);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testManyClosedSocketsInCache");
+    DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(),
+            clientConf);
+    PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache();
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Insert a bunch of dead sockets in the cache, by opening
@@ -227,15 +230,14 @@ public class TestDataTransferKeepalive {
       IOUtils.cleanup(null, stms);
     }
     
-    DFSClient client = ((DistributedFileSystem)fs).dfs;
-    assertEquals(5, client.peerCache.size());
+    assertEquals(5, peerCache.size());
     
     // Let all the xceivers timeout
     Thread.sleep(1500);
     assertXceiverCount(0);
 
     // Client side still has the sockets cached
-    assertEquals(5, client.peerCache.size());
+    assertEquals(5, peerCache.size());
 
     // Reading should not throw an exception.
     DFSTestUtil.readFile(fs, TEST_FILE);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java Wed Feb 12 19:24:46 2014
@@ -53,7 +53,8 @@ public class TestDisableConnCache {
     FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf());
     try {
       DFSTestUtil.readFile(fsWithoutCache, testFile);
-      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size());
+      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).
+          dfs.getClientContext().getPeerCache().size());
     } finally {
       fsWithoutCache.close();
       util.shutdown();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Wed Feb 12 19:24:46 2014
@@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
+import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -35,8 +36,9 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -126,8 +128,9 @@ public class TestShortCircuitLocalRead {
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
+    ClientContext getClientContext = ClientContext.getFromConf(conf);
     if (legacyShortCircuitFails) {
-      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+      assertFalse(getClientContext.getDisableLegacyBlockReaderLocal());
     }
     
     FSDataInputStream stm = fs.open(name);
@@ -156,7 +159,7 @@ public class TestShortCircuitLocalRead {
     checkData(actual, readOffset, expected, "Read 3");
     
     if (legacyShortCircuitFails) {
-      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
+      assertTrue(getClientContext.getDisableLegacyBlockReaderLocal());
     }
     stm.close();
   }
@@ -176,8 +179,9 @@ public class TestShortCircuitLocalRead {
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
+    ClientContext clientContext = ClientContext.getFromConf(conf);
     if (legacyShortCircuitFails) {
-      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+      assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
     }
     
     HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
@@ -210,7 +214,7 @@ public class TestShortCircuitLocalRead {
     }
     checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
     if (legacyShortCircuitFails) {
-      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
+      assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
     }
     stm.close();
   }
@@ -224,7 +228,6 @@ public class TestShortCircuitLocalRead {
 
   public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
       int readOffset) throws IOException, InterruptedException {
-    String shortCircuitUser = getCurrentUser();
     doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
         null, getCurrentUser(), false);
   }
@@ -240,6 +243,10 @@ public class TestShortCircuitLocalRead {
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         ignoreChecksum);
+    // Set a random client context name so that we don't share a cache with
+    // other invocations of this function.
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+        UUID.randomUUID().toString());
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
         new File(sockDir.getDir(),
           "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
@@ -323,18 +330,6 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(true, 10*blockSize+100, 777);
   }
 
-  private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
-      final DatanodeID dnInfo, final Configuration conf) throws IOException,
-      InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-      @Override
-      public ClientDatanodeProtocol run() throws Exception {
-        return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000,
-            false);
-      }
-    });
-  }
-  
   private static DistributedFileSystem getFileSystem(String user, final URI uri,
       final Configuration conf) throws InterruptedException, IOException {
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
@@ -556,8 +551,7 @@ public class TestShortCircuitLocalRead {
           for (int i = 0; i < iteration; i++) {
             try {
               String user = getCurrentUser();
-              checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf,
-                  true);
+              checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true);
             } catch (IOException e) {
               e.printStackTrace();
             } catch (InterruptedException e) {
@@ -609,7 +603,8 @@ public class TestShortCircuitLocalRead {
     stm.write(fileData);
     stm.close();
     try {
-      checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf, shortCircuitFails);
+      checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, 
+          conf, shortCircuitFails);
       //RemoteBlockReader have unsupported method read(ByteBuffer bf)
       assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error",
                     checkUnsupportedMethod(fs, file1, fileData, readOffset));

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Wed Feb 12 19:24:46 2014
@@ -38,10 +38,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,10 +61,13 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestBlockTokenWithDFS {
@@ -131,50 +140,70 @@ public class TestBlockTokenWithDFS {
   }
 
   // try reading a block using a BlockReader directly
-  private static void tryRead(Configuration conf, LocatedBlock lblock,
+  private static void tryRead(final Configuration conf, LocatedBlock lblock,
       boolean shouldSucceed) {
     InetSocketAddress targetAddr = null;
-    Socket s = null;
+    IOException ioe = null;
     BlockReader blockReader = null;
     ExtendedBlock block = lblock.getBlock();
     try {
       DatanodeInfo[] nodes = lblock.getLocations();
       targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
-      s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-      s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-      s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-
-      String file = BlockReaderFactory.getFileName(targetAddr, 
-          "test-blockpoolid", block.getBlockId());
-      blockReader = BlockReaderFactory.newBlockReader(
-          new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
-          true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
-          nodes[0], null, null, null, false,
-          CachingStrategy.newDefaultStrategy());
 
+      blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+          setFileName(BlockReaderFactory.getFileName(targetAddr, 
+                        "test-blockpoolid", block.getBlockId())).
+          setBlock(block).
+          setBlockToken(lblock.getBlockToken()).
+          setInetSocketAddress(targetAddr).
+          setStartOffset(0).
+          setLength(-1).
+          setVerifyChecksum(true).
+          setClientName("TestBlockTokenWithDFS").
+          setDatanodeInfo(nodes[0]).
+          setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+          setClientCacheContext(ClientContext.getFromConf(conf)).
+          setConfiguration(conf).
+          setRemotePeerFactory(new RemotePeerFactory() {
+            @Override
+            public Peer newConnectedPeer(InetSocketAddress addr)
+                throws IOException {
+              Peer peer = null;
+              Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+              try {
+                sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+                sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+                peer = TcpPeerServer.peerFromSocket(sock);
+              } finally {
+                if (peer == null) {
+                  IOUtils.closeSocket(sock);
+                }
+              }
+              return peer;
+            }
+          }).
+          build();
     } catch (IOException ex) {
-      if (ex instanceof InvalidBlockTokenException) {
-        assertFalse("OP_READ_BLOCK: access token is invalid, "
-            + "when it is expected to be valid", shouldSucceed);
-        return;
-      }
-      fail("OP_READ_BLOCK failed due to reasons other than access token: "
-          + StringUtils.stringifyException(ex));
+      ioe = ex;
     } finally {
-      if (s != null) {
+      if (blockReader != null) {
         try {
-          s.close();
-        } catch (IOException iex) {
-        } finally {
-          s = null;
+          blockReader.close();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
         }
       }
     }
-    if (blockReader == null) {
-      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    if (shouldSucceed) {
+      Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, "
+            + "when it is expected to be valid", blockReader);
+    } else {
+      Assert.assertNotNull("OP_READ_BLOCK: access token is valid, "
+          + "when it is expected to be invalid", ioe);
+      Assert.assertTrue(
+          "OP_READ_BLOCK failed due to reasons other than access token: ",
+          ioe instanceof InvalidBlockTokenException);
     }
-    assertTrue("OP_READ_BLOCK: access token is valid, "
-        + "when it is expected to be invalid", shouldSucceed);
   }
 
   // get a conf for testing
@@ -347,9 +376,13 @@ public class TestBlockTokenWithDFS {
       /*
        * testing READ interface on DN using a BlockReader
        */
-
-      new DFSClient(new InetSocketAddress("localhost",
+      DFSClient client = null;
+      try {
+        client = new DFSClient(new InetSocketAddress("localhost",
           cluster.getNameNodePort()), conf);
+      } finally {
+        if (client != null) client.close();
+      }
       List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
           FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
       LocatedBlock lblock = locatedBlocks.get(0); // first block

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1567728&r1=1567727&r2=1567728&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed Feb 12 19:24:46 2014
@@ -35,11 +35,14 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -48,13 +51,14 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -284,23 +288,43 @@ public class TestDataNodeVolumeFailure {
   private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
     throws IOException {
     InetSocketAddress targetAddr = null;
-    Socket s = null;
     ExtendedBlock block = lblock.getBlock(); 
    
     targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
-      
-    s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-    s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-    s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-
-    String file = BlockReaderFactory.getFileName(targetAddr, 
-        "test-blockpoolid",
-        block.getBlockId());
-    BlockReader blockReader =
-      BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
-        lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
-        TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false,
-        CachingStrategy.newDefaultStrategy());
+
+    BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+      setInetSocketAddress(targetAddr).
+      setBlock(block).
+      setFileName(BlockReaderFactory.getFileName(targetAddr,
+                    "test-blockpoolid", block.getBlockId())).
+      setBlockToken(lblock.getBlockToken()).
+      setStartOffset(0).
+      setLength(-1).
+      setVerifyChecksum(true).
+      setClientName("TestDataNodeVolumeFailure").
+      setDatanodeInfo(datanode).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setClientCacheContext(ClientContext.getFromConf(conf)).
+      setConfiguration(conf).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+          try {
+            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+            peer = TcpPeerServer.peerFromSocket(sock);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeSocket(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
     blockReader.close();
   }
   



Mime
View raw message