hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1567720 [3/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/server/common/ src/main/java/org/apache/had...
Date Wed, 12 Feb 2014 19:08:53 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Feb 12 19:08:52 2014
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,9 +41,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -569,11 +573,10 @@ public class NamenodeFsck {
     int failures = 0;
     InetSocketAddress targetAddr = null;
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
-    Socket s = null;
     BlockReader blockReader = null; 
     ExtendedBlock block = lblock.getBlock(); 
 
-    while (s == null) {
+    while (blockReader == null) {
       DatanodeInfo chosenNode;
       
       try {
@@ -593,34 +596,47 @@ public class NamenodeFsck {
         continue;
       }
       try {
-        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-        s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-        s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-        
-        String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
-            block.getBlockId());
-        blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
-            file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
-            TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                getDataEncryptionKey()), chosenNode, null, null, null, 
-                false, CachingStrategy.newDropBehind());
-        
+        String file = BlockReaderFactory.getFileName(targetAddr,
+            block.getBlockPoolId(), block.getBlockId());
+        blockReader = new BlockReaderFactory(dfs.getConf()).
+            setFileName(file).
+            setBlock(block).
+            setBlockToken(lblock.getBlockToken()).
+            setStartOffset(0).
+            setLength(-1).
+            setVerifyChecksum(true).
+            setClientName("fsck").
+            setDatanodeInfo(chosenNode).
+            setInetSocketAddress(targetAddr).
+            setCachingStrategy(CachingStrategy.newDropBehind()).
+            setClientCacheContext(dfs.getClientContext()).
+            setConfiguration(namenode.conf).
+            setRemotePeerFactory(new RemotePeerFactory() {
+              @Override
+              public Peer newConnectedPeer(InetSocketAddress addr)
+                  throws IOException {
+                Peer peer = null;
+                Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
+                try {
+                  s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+                  s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+                  peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
+                        getDataEncryptionKey());
+                } finally {
+                  if (peer == null) {
+                    IOUtils.closeQuietly(s);
+                  }
+                }
+                return peer;
+              }
+            }).
+            build();
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         LOG.info("Failed to connect to " + targetAddr + ":" + ex);
         deadNodes.add(chosenNode);
-        if (s != null) {
-          try {
-            s.close();
-          } catch (IOException iex) {
-          }
-        }
-        s = null;
       }
     }
-    if (blockReader == null) {
-      throw new Exception("Could not open data stream for " + lblock.getBlock());
-    }
     byte[] buf = new byte[1024];
     int cnt = 0;
     boolean success = true;
@@ -638,10 +654,11 @@ public class NamenodeFsck {
       LOG.error("Error reading block", e);
       success = false;
     } finally {
-      try {s.close(); } catch (Exception e1) {}
+      blockReader.close();
     }
-    if (!success)
+    if (!success) {
       throw new Exception("Could not copy block data for " + lblock.getBlock());
+    }
   }
       
   /*

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Feb 12 19:08:52 2014
@@ -162,6 +162,16 @@
 </property>
 
 <property>
+  <name>dfs.client.cached.conn.retry</name>
+  <value>3</value>
+  <description>The number of times the HDFS client will pull a socket from the
+   cache.  Once this number is exceeded, the client will try to create a new
+   socket.
+  </description>
+</property>
+
+
+<property>
   <name>dfs.https.server.keystore.resource</name>
   <value>ssl-server.xml</value>
   <description>Resource file from which ssl server keystore
@@ -1490,6 +1500,26 @@
 </property>
 
 <property>
+  <name>dfs.client.mmap.retry.timeout.ms</name>
+  <value>300000</value>
+  <description>
+    The minimum amount of time that we will wait before retrying a failed mmap
+    operation.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.short.circuit.replica.stale.threshold.ms</name>
+  <value>3000000</value>
+  <description>
+    The maximum amount of time that we will consider a short-circuit replica to
+    be valid, if there is no communication from the DataNode.  After this time
+    has elapsed, we will re-fetch the short-circuit replica even if it is in
+    the cache.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.path.based.cache.block.map.allocation.percent</name>
   <value>0.25</value>
   <description>
@@ -1618,4 +1648,15 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.client.context</name>
+  <value>default</value>
+  <description>
+    The name of the DFSClient context that we should use.  Clients that share
+    a context share a socket cache and short-circuit cache, among other things.
+    You should only change this if you don't want to share with another set of
+    threads.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Wed Feb 12 19:08:52 2014
@@ -28,32 +28,40 @@ import java.util.EnumSet;
 import java.util.Random;
 
 import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Map;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 
@@ -250,17 +258,39 @@ public class TestEnhancedByteBufferAcces
     }
   }
 
-  private static class CountingVisitor
-      implements ClientMmapManager.ClientMmapVisitor {
-    int count = 0;
-
-    @Override
-    public void accept(ClientMmap mmap) {
-      count++;
+  private static class CountingVisitor implements CacheVisitor {
+    private final int expectedNumOutstandingMmaps;
+    private final int expectedNumReplicas;
+    private final int expectedNumEvictable;
+    private final int expectedNumMmapedEvictable;
+
+    CountingVisitor(int expectedNumOutstandingMmaps,
+        int expectedNumReplicas, int expectedNumEvictable,
+        int expectedNumMmapedEvictable) {
+      this.expectedNumOutstandingMmaps = expectedNumOutstandingMmaps;
+      this.expectedNumReplicas = expectedNumReplicas;
+      this.expectedNumEvictable = expectedNumEvictable;
+      this.expectedNumMmapedEvictable = expectedNumMmapedEvictable;
     }
 
-    public void reset() {
-      count = 0;
+    @Override
+    public void visit(int numOutstandingMmaps,
+        Map<Key, ShortCircuitReplica> replicas,
+        Map<Key, InvalidToken> failedLoads,
+        Map<Long, ShortCircuitReplica> evictable,
+        Map<Long, ShortCircuitReplica> evictableMmapped) {
+      if (expectedNumOutstandingMmaps >= 0) {
+        Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
+      }
+      if (expectedNumReplicas >= 0) {
+        Assert.assertEquals(expectedNumReplicas, replicas.size());
+      }
+      if (expectedNumEvictable >= 0) {
+        Assert.assertEquals(expectedNumEvictable, evictable.size());
+      }
+      if (expectedNumMmapedEvictable >= 0) {
+        Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
+      }
     }
   }
 
@@ -271,105 +301,98 @@ public class TestEnhancedByteBufferAcces
     final Path TEST_PATH = new Path("/a");
     final int TEST_FILE_LENGTH = 16385;
     final int RANDOM_SEED = 23453;
+    final String CONTEXT = "testZeroCopyMmapCacheContext";
     FSDataInputStream fsIn = null;
-    ByteBuffer results[] = { null, null, null, null, null };
-    
+    ByteBuffer results[] = { null, null, null, null };
+
     DistributedFileSystem fs = null;
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, TEST_PATH,
+        TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
     try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-      cluster.waitActive();
-      fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, TEST_PATH,
-          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
-      try {
-        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
-      } catch (InterruptedException e) {
-        Assert.fail("unexpected InterruptedException during " +
-            "waitReplication: " + e);
-      } catch (TimeoutException e) {
-        Assert.fail("unexpected TimeoutException during " +
-            "waitReplication: " + e);
+      DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+    } catch (InterruptedException e) {
+      Assert.fail("unexpected InterruptedException during " +
+          "waitReplication: " + e);
+    } catch (TimeoutException e) {
+      Assert.fail("unexpected TimeoutException during " +
+          "waitReplication: " + e);
+    }
+    fsIn = fs.open(TEST_PATH);
+    byte original[] = new byte[TEST_FILE_LENGTH];
+    IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+    fsIn.close();
+    fsIn = fs.open(TEST_PATH);
+    final ShortCircuitCache cache = ClientContext.get(
+        CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache();
+    cache.accept(new CountingVisitor(0, 5, 5, 0));
+    results[0] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+    fsIn.seek(0);
+    results[1] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+
+    // The mmap should be of the first block of the file.
+    final ExtendedBlock firstBlock =
+        DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+    cache.accept(new CacheVisitor() {
+      @Override
+      public void visit(int numOutstandingMmaps,
+          Map<Key, ShortCircuitReplica> replicas,
+          Map<Key, InvalidToken> failedLoads, 
+          Map<Long, ShortCircuitReplica> evictable,
+          Map<Long, ShortCircuitReplica> evictableMmapped) {
+        ShortCircuitReplica replica = replicas.get(
+            new Key(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
+        Assert.assertNotNull(replica);
+        Assert.assertTrue(replica.hasMmap());
+        // The replica should not yet be evictable, since we have it open.
+        Assert.assertNull(replica.getEvictableTimeNs());
       }
-      fsIn = fs.open(TEST_PATH);
-      byte original[] = new byte[TEST_FILE_LENGTH];
-      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
-      fsIn.close();
-      fsIn = fs.open(TEST_PATH);
-      final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
-      final CountingVisitor countingVisitor = new CountingVisitor();
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-      mmapManager.visitEvictable(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-      results[0] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      fsIn.seek(0);
-      results[1] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(1, countingVisitor.count);
-      countingVisitor.reset();
-      mmapManager.visitEvictable(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-      countingVisitor.reset();
-
-      // The mmaps should be of the first block of the file.
-      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
-      mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
-        @Override
-        public void accept(ClientMmap mmap) {
-          Assert.assertEquals(firstBlock, mmap.getBlock());
-        }
-      });
+    });
 
-      // Read more blocks.
-      results[2] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      results[3] = fsIn.read(null, 4096,
-          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-      try {
-        results[4] = fsIn.read(null, 4096,
-            EnumSet.of(ReadOption.SKIP_CHECKSUMS));
-        Assert.fail("expected UnsupportedOperationException");
-      } catch (UnsupportedOperationException e) {
-        // expected
+    // Read more blocks.
+    results[2] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+    results[3] = fsIn.read(null, 4096,
+        EnumSet.of(ReadOption.SKIP_CHECKSUMS));
+
+    // we should have 3 mmaps, 1 evictable
+    cache.accept(new CountingVisitor(3, 5, 2, 0));
+
+    // After we close the cursors, the mmaps should be evictable for 
+    // a brief period of time.  Then, they should be closed (we're 
+    // using a very quick timeout)
+    for (ByteBuffer buffer : results) {
+      if (buffer != null) {
+        fsIn.releaseBuffer(buffer);
       }
-
-      // we should have 3 mmaps, 0 evictable
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(3, countingVisitor.count);
-      countingVisitor.reset();
-      mmapManager.visitEvictable(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-
-      // After we close the cursors, the mmaps should be evictable for 
-      // a brief period of time.  Then, they should be closed (we're 
-      // using a very quick timeout)
-      for (ByteBuffer buffer : results) {
-        if (buffer != null) {
-          fsIn.releaseBuffer(buffer);
-        }
-      }
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        public Boolean get() {
-          countingVisitor.reset();
-          try {
-            mmapManager.visitEvictable(countingVisitor);
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-            return false;
-          }
-          return (0 == countingVisitor.count);
-        }
-      }, 10, 10000);
-      countingVisitor.reset();
-      mmapManager.visitMmaps(countingVisitor);
-      Assert.assertEquals(0, countingVisitor.count);
-    } finally {
-      if (fsIn != null) fsIn.close();
-      if (fs != null) fs.close();
-      if (cluster != null) cluster.shutdown();
     }
+    fsIn.close();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        final MutableBoolean finished = new MutableBoolean(false);
+        cache.accept(new CacheVisitor() {
+          @Override
+          public void visit(int numOutstandingMmaps,
+              Map<Key, ShortCircuitReplica> replicas,
+              Map<Key, InvalidToken> failedLoads,
+              Map<Long, ShortCircuitReplica> evictable,
+              Map<Long, ShortCircuitReplica> evictableMmapped) {
+            finished.setValue(evictableMmapped.isEmpty());
+          }
+        });
+        return finished.booleanValue();
+      }
+    }, 10, 60000);
+
+    cache.accept(new CountingVisitor(0, -1, -1, -1));
+    
+    fs.close();
+    cluster.shutdown();
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Wed Feb 12 19:08:52 2014
@@ -28,8 +28,12 @@ import java.net.Socket;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -38,6 +42,8 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
 
 /**
  * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
@@ -141,22 +147,54 @@ public class BlockReaderTestUtil {
    */
   public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
       throws IOException {
+    return getBlockReader(cluster, testBlock, offset, lenToRead);
+  }
+
+  /**
+   * Get a BlockReader for the given block.
+   */
+  public static BlockReader getBlockReader(MiniDFSCluster cluster,
+      LocatedBlock testBlock, int offset, int lenToRead) throws IOException {
     InetSocketAddress targetAddr = null;
-    Socket sock = null;
     ExtendedBlock block = testBlock.getBlock();
     DatanodeInfo[] nodes = testBlock.getLocations();
     targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
-    sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
-    sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-    sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-
-    return BlockReaderFactory.newBlockReader(
-      new DFSClient.Conf(conf),
-      targetAddr.toString()+ ":" + block.getBlockId(), block,
-      testBlock.getBlockToken(), 
-      offset, lenToRead,
-      true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
-      nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy());
+
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    return new BlockReaderFactory(fs.getClient().getConf()).
+      setInetSocketAddress(targetAddr).
+      setBlock(block).
+      setFileName(targetAddr.toString()+ ":" + block.getBlockId()).
+      setBlockToken(testBlock.getBlockToken()).
+      setStartOffset(offset).
+      setLength(lenToRead).
+      setVerifyChecksum(true).
+      setClientName("BlockReaderTestUtil").
+      setDatanodeInfo(nodes[0]).
+      setClientCacheContext(ClientContext.getFromConf(fs.getConf())).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setConfiguration(fs.getConf()).
+      setAllowShortCircuitLocalReads(true).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.
+              getDefaultSocketFactory(fs.getConf()).createSocket();
+          try {
+            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+            peer = TcpPeerServer.peerFromSocket(sock);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeQuietly(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
   }
 
   /**
@@ -167,4 +205,13 @@ public class BlockReaderTestUtil {
     int ipcport = nodes[0].getIpcPort();
     return cluster.getDataNode(ipcport);
   }
-}
+  
+  public static void enableBlockReaderFactoryTracing() {
+    LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
+        Level.TRACE);
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Feb 12 19:08:52 2014
@@ -185,10 +185,26 @@ public class DFSTestUtil {
     }
   }
 
-  public static String readFile(FileSystem fs, Path fileName) throws IOException {
+  public static String readFile(FileSystem fs, Path fileName)
+      throws IOException {
+    byte buf[] = readFileBuffer(fs, fileName);
+	return new String(buf, 0, buf.length);
+  }
+
+  public static byte[] readFileBuffer(FileSystem fs, Path fileName) 
+      throws IOException {
     ByteArrayOutputStream os = new ByteArrayOutputStream();
-    IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
-    return os.toString();
+    try {
+      FSDataInputStream in = fs.open(fileName);
+      try {
+        IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
+        return os.toByteArray();
+      } finally {
+        in.close();
+      }
+    } finally {
+      os.close();
+    }
   }
   
   public static void createFile(FileSystem fs, Path fileName, long fileLen, 
@@ -230,6 +246,13 @@ public class DFSTestUtil {
     }
   }
   
+  public static byte[] calculateFileContentsFromSeed(long seed, int length) {
+    Random rb = new Random(seed);
+    byte val[] = new byte[length];
+    rb.nextBytes(val);
+    return val;
+  }
+  
   /** check if the files have been copied correctly. */
   public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
     Path root = new Path(topdir);
@@ -549,8 +572,12 @@ public class DFSTestUtil {
   
   public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
     HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path);
-    in.readByte();
-    return in.getCurrentBlock();
+    try {
+      in.readByte();
+      return in.getCurrentBlock();
+    } finally {
+      in.close();
+    }
   }  
 
   public static List<LocatedBlock> getAllBlocks(FSDataInputStream in)

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1567720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Wed Feb 12 19:08:52 2014
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
+
+public class TestBlockReaderFactory {
+  static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class);
+
+  @Before
+  public void init() {
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @After
+  public void cleanup() {
+    DFSInputStream.tcpReadsDisabledForTesting = false;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
+  }
+
+  private static Configuration createShortCircuitConf(String testName,
+      TemporarySocketDirectory sockDir) {
+    Configuration conf = new Configuration();
+    conf.set(DFS_CLIENT_CONTEXT, testName);
+    conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
+    conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+        testName + "._PORT").getAbsolutePath());
+    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+        false);
+    conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
+    return conf;
+  }
+
+  /**
+   * If we have a UNIX domain socket configured,
+   * and we have dfs.client.domain.socket.data.traffic set to true,
+   * and short-circuit access fails, we should still be able to pass
+   * data traffic over the UNIX domain socket.  Test this.
+   */
+  @Test(timeout=60000)
+  public void testFallbackFromShortCircuitToUnixDomainTraffic()
+      throws Exception {
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+
+    // The server is NOT configured with short-circuit local reads;
+    // the client is.  Both support UNIX domain reads.
+    Configuration clientConf = createShortCircuitConf(
+        "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir);
+    clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
+    Configuration serverConf = new Configuration(clientConf);
+    serverConf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    FileSystem dfs = FileSystem.get(cluster.getURI(0), clientConf);
+    String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 8193;
+    final int SEED = 0xFADED;
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    cluster.shutdown();
+    sockDir.close();
+  }
+  
+  /**
+   * Test the case where we have multiple threads waiting on the
+   * ShortCircuitCache delivering a certain ShortCircuitReplica.
+   *
+   * In this case, there should only be one call to
+   * createShortCircuitReplicaInfo.  This one replica should be shared
+   * by all threads.
+   */
+  @Test(timeout=60000)
+  public void testMultipleWaitersOnShortCircuitCache()
+      throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
+      new ShortCircuitCache.ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          Uninterruptibles.awaitUninterruptibly(latch);
+          if (!creationIsBlocked.compareAndSet(true, false)) {
+            Assert.fail("there were multiple calls to "
+                + "createShortCircuitReplicaInfo.  Only one was expected.");
+          }
+          return null;
+        }
+      };
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testMultipleWaitersOnShortCircuitCache", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADED;
+    final int NUM_THREADS = 10;
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
+          Assert.assertFalse(creationIsBlocked.get());
+          byte expected[] = DFSTestUtil.
+              calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+          Assert.assertTrue(Arrays.equals(contents, expected));
+        } catch (Throwable e) {
+          LOG.error("readerRunnable error", e);
+          testFailed.set(true);
+        }
+      }
+    };
+    Thread threads[] = new Thread[NUM_THREADS];
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i] = new Thread(readerRunnable);
+      threads[i].start();
+    }
+    Thread.sleep(500);
+    latch.countDown();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Uninterruptibles.joinUninterruptibly(threads[i]);
+    }
+    cluster.shutdown();
+    sockDir.close();
+    Assert.assertFalse(testFailed.get());
+  }
+
+  /**
+   * Test the case where we have a failure to complete a short circuit read
+   * that occurs, and then later on, we have a success.
+   * Any thread waiting on a cache load should receive the failure (if it
+   * occurs);  however, the failure result should not be cached.  We want 
+   * to be able to retry later and succeed.
+   */
+  @Test(timeout=60000)
+  public void testShortCircuitCacheTemporaryFailure()
+      throws Exception {
+    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
+    final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
+      new ShortCircuitCache.ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          if (replicaCreationShouldFail.get()) {
+            // Insert a short delay to increase the chance that one client
+            // thread waits for the other client thread's failure via
+            // a condition variable.
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+            return new ShortCircuitReplicaInfo();
+          }
+          return null;
+        }
+      };
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testShortCircuitCacheTemporaryFailure", sockDir);
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int NUM_THREADS = 2;
+    final int SEED = 0xFADED;
+    final CountDownLatch gotFailureLatch = new CountDownLatch(NUM_THREADS);
+    final CountDownLatch shouldRetryLatch = new CountDownLatch(1);
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // First time should fail.
+          List<LocatedBlock> locatedBlocks = 
+              cluster.getNameNode().getRpcServer().getBlockLocations(
+              TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
+          LocatedBlock lblock = locatedBlocks.get(0); // first block
+          BlockReader blockReader = null;
+          try {
+            blockReader = BlockReaderTestUtil.
+                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+            Assert.fail("expected getBlockReader to fail the first time.");
+          } catch (Throwable t) { 
+            Assert.assertTrue("expected to see 'TCP reads were disabled " +
+                "for testing' in exception " + t, t.getMessage().contains(
+                "TCP reads were disabled for testing"));
+          } finally {
+            if (blockReader != null) blockReader.close(); // keep findbugs happy
+          }
+          gotFailureLatch.countDown();
+          shouldRetryLatch.await();
+
+          // Second time should succeed.
+          try {
+            blockReader = BlockReaderTestUtil.
+                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+          } catch (Throwable t) { 
+            LOG.error("error trying to retrieve a block reader " +
+                "the second time.", t);
+            throw t;
+          } finally {
+            if (blockReader != null) blockReader.close();
+          }
+        } catch (Throwable t) {
+          LOG.error("getBlockReader failure", t);
+          testFailed.set(true);
+        }
+      }
+    };
+    Thread threads[] = new Thread[NUM_THREADS];
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i] = new Thread(readerRunnable);
+      threads[i].start();
+    }
+    gotFailureLatch.await();
+    replicaCreationShouldFail.set(false);
+    shouldRetryLatch.countDown();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Uninterruptibles.joinUninterruptibly(threads[i]);
+    }
+    cluster.shutdown();
+    sockDir.close();
+    Assert.assertFalse(testFailed.get());
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Wed Feb 12 19:08:52 2014
@@ -30,13 +30,17 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -155,6 +159,8 @@ public class TestBlockReaderLocal {
       File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
 
       DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+      ShortCircuitCache shortCircuitCache =
+          ClientContext.getFromConf(conf).getShortCircuitCache();
       cluster.shutdown();
       cluster = null;
       test.setup(dataFile, checksum);
@@ -164,16 +170,17 @@ public class TestBlockReaderLocal {
       };
       dataIn = streams[0];
       metaIn = streams[1];
+      Key key = new Key(block.getBlockId(), block.getBlockPoolId());
+      ShortCircuitReplica replica = new ShortCircuitReplica(
+          key, dataIn, metaIn, shortCircuitCache, Time.now());
       blockReaderLocal = new BlockReaderLocal.Builder(
               new DFSClient.Conf(conf)).
           setFilename(TEST_PATH.getName()).
           setBlock(block).
-          setStreams(streams).
+          setShortCircuitReplica(replica).
           setDatanodeID(datanodeID).
           setCachingStrategy(new CachingStrategy(false, readahead)).
           setVerifyChecksum(checksum).
-          setBlockMetadataHeader(BlockMetadataHeader.preadHeader(
-              metaIn.getChannel())).
           build();
       dataIn = null;
       metaIn = null;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Feb 12 19:08:52 2014
@@ -25,18 +25,8 @@ import java.net.InetSocketAddress;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 /**
  * This class tests the client connection caching in a single node
@@ -49,30 +39,6 @@ public class TestConnCache {
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
 
   /**
-   * A mock Answer to remember the BlockReader used.
-   *
-   * It verifies that all invocation to DFSInputStream.getBlockReader()
-   * use the same peer.
-   */
-  private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
-    public RemoteBlockReader2 reader = null;
-    private Peer peer = null;
-
-    @Override
-    public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
-      RemoteBlockReader2 prevReader = reader;
-      reader = (RemoteBlockReader2) invocation.callRealMethod();
-      if (peer == null) {
-        peer = reader.getPeer();
-      } else if (prevReader != null) {
-        Assert.assertSame("DFSInputStream should use the same peer",
-                   peer, reader.getPeer());
-      }
-      return reader;
-    }
-  }
-
-  /**
    * (Optionally) seek to position, read and verify data.
    *
    * Seek to specified position if pos is non-negative.
@@ -115,33 +81,29 @@ public class TestConnCache {
    * @throws Exception 
    */
   @Test
-  @SuppressWarnings("unchecked")
   public void testReadFromOneDN() throws Exception {
-    BlockReaderTestUtil util = new BlockReaderTestUtil(1,
-        new HdfsConfiguration());
+    HdfsConfiguration configuration = new HdfsConfiguration();
+    // One of the goals of this test is to verify that we don't open more
+    // than one socket.  So use a different client context, so that we
+    // get our own socket cache, rather than sharing with the other test 
+    // instances.  Also use a really long socket timeout so that nothing
+    // gets closed before we get around to checking the cache size at the end.
+    final String contextName = "testReadFromOneDNContext";
+    configuration.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, contextName);
+    configuration.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+        100000000L);
+    BlockReaderTestUtil util = new BlockReaderTestUtil(1, configuration);
     final Path testFile = new Path("/testConnCache.dat");
     byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
     DFSClient client = new DFSClient(
         new InetSocketAddress("localhost",
             util.getCluster().getNameNodePort()), util.getConf());
-    DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
+    ClientContext cacheContext =
+        ClientContext.get(contextName, client.getConf());
+    DFSInputStream in = client.open(testFile.toString());
     LOG.info("opened " + testFile.toString());
     byte[] dataBuf = new byte[BLOCK_SIZE];
 
-    MockGetBlockReader answer = new MockGetBlockReader();
-    Mockito.doAnswer(answer).when(in).getBlockReader(
-                           (InetSocketAddress) Matchers.anyObject(),
-                           (DatanodeInfo) Matchers.anyObject(),
-                           Matchers.anyString(),
-                           (ExtendedBlock) Matchers.anyObject(),
-                           (Token<BlockTokenIdentifier>) Matchers.anyObject(),
-                           Matchers.anyLong(),
-                           Matchers.anyLong(),
-                           Matchers.anyInt(),
-                           Matchers.anyBoolean(),
-                           Matchers.anyString(),
-                           (CachingStrategy)Matchers.anyObject());
-
     // Initial read
     pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
     // Read again and verify that the socket is the same
@@ -153,5 +115,8 @@ public class TestConnCache {
     pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
 
     in.close();
+    client.close();
+    Assert.assertEquals(1,
+        ClientContext.getFromConf(configuration).getPeerCache().size());
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Wed Feb 12 19:08:52 2014
@@ -22,7 +22,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -86,21 +86,22 @@ public class TestDataTransferKeepalive {
     // the datanode-side expiration time.
     final long CLIENT_EXPIRY_MS = 60000L;
     clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testDatanodeRespectsKeepAliveTimeout");
     DistributedFileSystem fs =
         (DistributedFileSystem)FileSystem.get(cluster.getURI(),
             clientConf);
+    PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache();
 
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Clients that write aren't currently re-used.
-    assertEquals(0, fs.dfs.peerCache.size());
+    assertEquals(0, peerCache.size());
     assertXceiverCount(0);
 
     // Reads the file, so we should get a
     // cached socket, and should have an xceiver on the other side.
     DFSTestUtil.readFile(fs, TEST_FILE);
-    assertEquals(1, fs.dfs.peerCache.size());
+    assertEquals(1, peerCache.size());
     assertXceiverCount(1);
 
     // Sleep for a bit longer than the keepalive timeout
@@ -111,15 +112,13 @@ public class TestDataTransferKeepalive {
     // The socket is still in the cache, because we don't
     // notice that it's closed until we try to read
     // from it again.
-    assertEquals(1, fs.dfs.peerCache.size());
+    assertEquals(1, peerCache.size());
     
     // Take it out of the cache - reading should
     // give an EOF.
-    Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
+    Peer peer = peerCache.get(dn.getDatanodeId(), false);
     assertNotNull(peer);
     assertEquals(-1, peer.getInputStream().read());
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
-        DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
   }
 
   /**
@@ -132,34 +131,33 @@ public class TestDataTransferKeepalive {
     // the datanode-side expiration time.
     final long CLIENT_EXPIRY_MS = 10L;
     clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testClientResponsesKeepAliveTimeout");
     DistributedFileSystem fs =
         (DistributedFileSystem)FileSystem.get(cluster.getURI(),
             clientConf);
+    PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache();
 
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Clients that write aren't currently re-used.
-    assertEquals(0, fs.dfs.peerCache.size());
+    assertEquals(0, peerCache.size());
     assertXceiverCount(0);
 
     // Reads the file, so we should get a
     // cached socket, and should have an xceiver on the other side.
     DFSTestUtil.readFile(fs, TEST_FILE);
-    assertEquals(1, fs.dfs.peerCache.size());
+    assertEquals(1, peerCache.size());
     assertXceiverCount(1);
 
     // Sleep for a bit longer than the client keepalive timeout.
     Thread.sleep(CLIENT_EXPIRY_MS + 1);
     
     // Taking out a peer which is expired should give a null.
-    Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false);
+    Peer peer = peerCache.get(dn.getDatanodeId(), false);
     assertTrue(peer == null);
 
     // The socket cache is now empty.
-    assertEquals(0, fs.dfs.peerCache.size());
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT,
-        DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
+    assertEquals(0, peerCache.size());
   }
 
   /**
@@ -174,7 +172,7 @@ public class TestDataTransferKeepalive {
     final long CLIENT_EXPIRY_MS = 600000L;
     Configuration clientConf = new Configuration(conf);
     clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS);
-    PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testSlowReader");
     DistributedFileSystem fs =
         (DistributedFileSystem)FileSystem.get(cluster.getURI(),
             clientConf);
@@ -209,7 +207,12 @@ public class TestDataTransferKeepalive {
   @Test(timeout=30000)
   public void testManyClosedSocketsInCache() throws Exception {
     // Make a small file
-    DistributedFileSystem fs = cluster.getFileSystem();
+    Configuration clientConf = new Configuration(conf);
+    clientConf.set(DFS_CLIENT_CONTEXT, "testManyClosedSocketsInCache");
+    DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(),
+            clientConf);
+    PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache();
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Insert a bunch of dead sockets in the cache, by opening
@@ -227,15 +230,14 @@ public class TestDataTransferKeepalive {
       IOUtils.cleanup(null, stms);
     }
     
-    DFSClient client = ((DistributedFileSystem)fs).dfs;
-    assertEquals(5, client.peerCache.size());
+    assertEquals(5, peerCache.size());
     
     // Let all the xceivers timeout
     Thread.sleep(1500);
     assertXceiverCount(0);
 
     // Client side still has the sockets cached
-    assertEquals(5, client.peerCache.size());
+    assertEquals(5, peerCache.size());
 
     // Reading should not throw an exception.
     DFSTestUtil.readFile(fs, TEST_FILE);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java Wed Feb 12 19:08:52 2014
@@ -53,7 +53,8 @@ public class TestDisableConnCache {
     FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf());
     try {
       DFSTestUtil.readFile(fsWithoutCache, testFile);
-      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size());
+      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).
+          dfs.getClientContext().getPeerCache().size());
     } finally {
       fsWithoutCache.close();
       util.shutdown();

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java?rev=1567720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java Wed Feb 12 19:08:52 2014
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+public class TestShortCircuitCache {
+  static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
+  
+  private static class TestFileDescriptorPair {
+    TemporarySocketDirectory dir = new TemporarySocketDirectory();
+    FileInputStream fis[];
+
+    public TestFileDescriptorPair() throws IOException {
+      fis = new FileInputStream[2];
+      for (int i = 0; i < 2; i++) {
+        String name = dir.getDir() + "/file" + i;
+        FileOutputStream fos = new FileOutputStream(name);
+        if (i == 0) {
+          // write 'data' file
+          fos.write(1);
+        } else {
+          // write 'metadata' file
+          BlockMetadataHeader header =
+              new BlockMetadataHeader((short)1,
+                  DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4));
+          DataOutputStream dos = new DataOutputStream(fos);
+          BlockMetadataHeader.writeHeader(dos, header);
+          dos.close();
+        }
+        fos.close();
+        fis[i] = new FileInputStream(name);
+      }
+    }
+
+    public FileInputStream[] getFileInputStreams() {
+      return fis;
+    }
+
+    public void close() throws IOException {
+      IOUtils.cleanup(LOG, fis);
+      dir.close();
+    }
+
+    public boolean compareWith(FileInputStream data, FileInputStream meta) {
+      return ((data == fis[0]) && (meta == fis[1]));
+    }
+  }
+
+  private static class SimpleReplicaCreator
+      implements ShortCircuitReplicaCreator {
+    private final int blockId;
+    private final ShortCircuitCache cache;
+    private final TestFileDescriptorPair pair;
+
+    SimpleReplicaCreator(int blockId, ShortCircuitCache cache,
+        TestFileDescriptorPair pair) {
+      this.blockId = blockId;
+      this.cache = cache;
+      this.pair = pair;
+    }
+
+    @Override
+    public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+      try {
+        Key key = new Key(blockId, "test_bp1");
+        return new ShortCircuitReplicaInfo(
+            new ShortCircuitReplica(key,
+                pair.getFileInputStreams()[0], pair.getFileInputStreams()[1],
+                cache, Time.monotonicNow()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testCreateAndDestroy() throws Exception {
+    ShortCircuitCache cache =
+        new ShortCircuitCache(10, 1, 10, 1, 1, 10000);
+    cache.close();
+  }
+  
+  @Test(timeout=60000)
+  public void testAddAndRetrieve() throws Exception {
+    final ShortCircuitCache cache =
+        new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000);
+    final TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    ShortCircuitReplicaInfo replicaInfo1 =
+      cache.fetchOrCreate(new Key(123, "test_bp1"),
+        new SimpleReplicaCreator(123, cache, pair));
+    Preconditions.checkNotNull(replicaInfo1.getReplica());
+    Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
+    pair.compareWith(replicaInfo1.getReplica().getDataStream(),
+                     replicaInfo1.getReplica().getMetaStream());
+    ShortCircuitReplicaInfo replicaInfo2 =
+      cache.fetchOrCreate(new Key(123, "test_bp1"),
+          new ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          Assert.fail("expected to use existing entry.");
+          return null;
+        }
+      });
+    Preconditions.checkNotNull(replicaInfo2.getReplica());
+    Preconditions.checkState(replicaInfo2.getInvalidTokenException() == null);
+    Preconditions.checkState(replicaInfo1 == replicaInfo2);
+    pair.compareWith(replicaInfo2.getReplica().getDataStream(),
+                     replicaInfo2.getReplica().getMetaStream());
+    replicaInfo1.getReplica().unref();
+    replicaInfo2.getReplica().unref();
+    
+    // Even after the reference count falls to 0, we still keep the replica
+    // around for a while (we have configured the expiry period to be really,
+    // really long here)
+    ShortCircuitReplicaInfo replicaInfo3 =
+      cache.fetchOrCreate(
+          new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          Assert.fail("expected to use existing entry.");
+          return null;
+        }
+      });
+    Preconditions.checkNotNull(replicaInfo3.getReplica());
+    Preconditions.checkState(replicaInfo3.getInvalidTokenException() == null);
+    replicaInfo3.getReplica().unref();
+    
+    pair.close();
+    cache.close();
+  }
+
+  @Test(timeout=60000)
+  public void testExpiry() throws Exception {
+    final ShortCircuitCache cache =
+        new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000);
+    final TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    ShortCircuitReplicaInfo replicaInfo1 =
+      cache.fetchOrCreate(
+        new Key(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair));
+    Preconditions.checkNotNull(replicaInfo1.getReplica());
+    Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
+    pair.compareWith(replicaInfo1.getReplica().getDataStream(),
+                     replicaInfo1.getReplica().getMetaStream());
+    replicaInfo1.getReplica().unref();
+    final MutableBoolean triedToCreate = new MutableBoolean(false);
+    do {
+      Thread.sleep(10);
+      ShortCircuitReplicaInfo replicaInfo2 =
+        cache.fetchOrCreate(
+          new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() {
+          @Override
+          public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+            triedToCreate.setValue(true);
+            return null;
+          }
+        });
+      if ((replicaInfo2 != null) && (replicaInfo2.getReplica() != null)) {
+        replicaInfo2.getReplica().unref();
+      }
+    } while (triedToCreate.isFalse());
+    cache.close();
+  }
+  
+  
+  @Test(timeout=60000)
+  public void testEviction() throws Exception {
+    final ShortCircuitCache cache =
+        new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000);
+    final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
+      new TestFileDescriptorPair(),
+      new TestFileDescriptorPair(),
+      new TestFileDescriptorPair(),
+    };
+    ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
+      null,
+      null,
+      null
+    };
+    for (int i = 0; i < pairs.length; i++) {
+      replicaInfos[i] = cache.fetchOrCreate(
+          new Key(i, "test_bp1"), 
+            new SimpleReplicaCreator(i, cache, pairs[i]));
+      Preconditions.checkNotNull(replicaInfos[i].getReplica());
+      Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
+      pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
+                           replicaInfos[i].getReplica().getMetaStream());
+    }
+    // At this point, we have 3 replicas in use.
+    // Let's close them all.
+    for (int i = 0; i < pairs.length; i++) {
+      replicaInfos[i].getReplica().unref();
+    }
+    // The last two replicas should still be cached.
+    for (int i = 1; i < pairs.length; i++) {
+      final Integer iVal = new Integer(i);
+      replicaInfos[i] = cache.fetchOrCreate(
+          new Key(i, "test_bp1"),
+            new ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          Assert.fail("expected to use existing entry for " + iVal);
+          return null;
+        }
+      });
+      Preconditions.checkNotNull(replicaInfos[i].getReplica());
+      Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
+      pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
+                           replicaInfos[i].getReplica().getMetaStream());
+    }
+    // The first (oldest) replica should not be cached.
+    final MutableBoolean calledCreate = new MutableBoolean(false);
+    replicaInfos[0] = cache.fetchOrCreate(
+        new Key(0, "test_bp1"),
+          new ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          calledCreate.setValue(true);
+          return null;
+        }
+      });
+    Preconditions.checkState(replicaInfos[0].getReplica() == null);
+    Assert.assertTrue(calledCreate.isTrue());
+    // Clean up
+    for (int i = 1; i < pairs.length; i++) {
+      replicaInfos[i].getReplica().unref();
+    }
+    for (int i = 0; i < pairs.length; i++) {
+      pairs[i].close();
+    }
+    cache.close();
+  }
+  
+  @Test(timeout=60000)
+  public void testStaleness() throws Exception {
+    // Set up the cache with a short staleness time.
+    final ShortCircuitCache cache =
+        new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10);
+    final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
+      new TestFileDescriptorPair(),
+      new TestFileDescriptorPair(),
+    };
+    ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
+      null,
+      null
+    };
+    final long HOUR_IN_MS = 60 * 60 * 1000;
+    for (int i = 0; i < pairs.length; i++) {
+      final Integer iVal = new Integer(i);
+      final Key key = new Key(i, "test_bp1");
+      replicaInfos[i] = cache.fetchOrCreate(key,
+          new ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          try {
+            return new ShortCircuitReplicaInfo(
+                new ShortCircuitReplica(key,
+                    pairs[iVal].getFileInputStreams()[0],
+                    pairs[iVal].getFileInputStreams()[1],
+                    cache, Time.monotonicNow() + (iVal * HOUR_IN_MS)));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+      Preconditions.checkNotNull(replicaInfos[i].getReplica());
+      Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
+      pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
+                           replicaInfos[i].getReplica().getMetaStream());
+    }
+
+    // Keep trying to getOrCreate block 0 until it goes stale (and we must re-create.)
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        ShortCircuitReplicaInfo info = cache.fetchOrCreate(
+          new Key(0, "test_bp1"), new ShortCircuitReplicaCreator() {
+          @Override
+          public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+            return null;
+          }
+        });
+        if (info.getReplica() != null) {
+          info.getReplica().unref();
+          return false;
+        }
+        return true;
+      }
+    }, 500, 60000);
+
+    // Make sure that second replica did not go stale.
+    ShortCircuitReplicaInfo info = cache.fetchOrCreate(
+        new Key(1, "test_bp1"), new ShortCircuitReplicaCreator() {
+      @Override
+      public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+        Assert.fail("second replica went stale, despite 1 " +
+            "hour staleness time.");
+        return null;
+      }
+    });
+    info.getReplica().unref();
+
+    // Clean up
+    for (int i = 1; i < pairs.length; i++) {
+      replicaInfos[i].getReplica().unref();
+    }
+    cache.close();
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Wed Feb 12 19:08:52 2014
@@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
+import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -35,8 +36,9 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -125,8 +127,9 @@ public class TestShortCircuitLocalRead {
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
+    ClientContext getClientContext = ClientContext.getFromConf(conf);
     if (legacyShortCircuitFails) {
-      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+      assertFalse(getClientContext.getDisableLegacyBlockReaderLocal());
     }
     
     FSDataInputStream stm = fs.open(name);
@@ -155,7 +158,7 @@ public class TestShortCircuitLocalRead {
     checkData(actual, readOffset, expected, "Read 3");
     
     if (legacyShortCircuitFails) {
-      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
+      assertTrue(getClientContext.getDisableLegacyBlockReaderLocal());
     }
     stm.close();
   }
@@ -175,8 +178,9 @@ public class TestShortCircuitLocalRead {
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
+    ClientContext clientContext = ClientContext.getFromConf(conf);
     if (legacyShortCircuitFails) {
-      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+      assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
     }
     
     HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
@@ -209,7 +213,7 @@ public class TestShortCircuitLocalRead {
     }
     checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
     if (legacyShortCircuitFails) {
-      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
+      assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
     }
     stm.close();
   }
@@ -223,7 +227,6 @@ public class TestShortCircuitLocalRead {
 
   public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
       int readOffset) throws IOException, InterruptedException {
-    String shortCircuitUser = getCurrentUser();
     doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
         null, getCurrentUser(), false);
   }
@@ -239,6 +242,10 @@ public class TestShortCircuitLocalRead {
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         ignoreChecksum);
+    // Set a random client context name so that we don't share a cache with
+    // other invocations of this function.
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+        UUID.randomUUID().toString());
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
         new File(sockDir.getDir(),
           "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
@@ -322,18 +329,6 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(true, 10*blockSize+100, 777);
   }
 
-  private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
-      final DatanodeID dnInfo, final Configuration conf) throws IOException,
-      InterruptedException {
-    return ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-      @Override
-      public ClientDatanodeProtocol run() throws Exception {
-        return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000,
-            false);
-      }
-    });
-  }
-  
   private static DistributedFileSystem getFileSystem(String user, final URI uri,
       final Configuration conf) throws InterruptedException, IOException {
     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
@@ -555,8 +550,7 @@ public class TestShortCircuitLocalRead {
           for (int i = 0; i < iteration; i++) {
             try {
               String user = getCurrentUser();
-              checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf,
-                  true);
+              checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true);
             } catch (IOException e) {
               e.printStackTrace();
             } catch (InterruptedException e) {
@@ -608,7 +602,8 @@ public class TestShortCircuitLocalRead {
     stm.write(fileData);
     stm.close();
     try {
-      checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf, shortCircuitFails);
+      checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, 
+          conf, shortCircuitFails);
       //RemoteBlockReader have unsupported method read(ByteBuffer bf)
       assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error",
                     checkUnsupportedMethod(fs, file1, fileData, readOffset));

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Wed Feb 12 19:08:52 2014
@@ -38,10 +38,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.client.ShortCircuitCache;
+import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,10 +61,13 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestBlockTokenWithDFS {
@@ -131,50 +140,70 @@ public class TestBlockTokenWithDFS {
   }
 
   // try reading a block using a BlockReader directly
-  private static void tryRead(Configuration conf, LocatedBlock lblock,
+  private static void tryRead(final Configuration conf, LocatedBlock lblock,
       boolean shouldSucceed) {
     InetSocketAddress targetAddr = null;
-    Socket s = null;
+    IOException ioe = null;
     BlockReader blockReader = null;
     ExtendedBlock block = lblock.getBlock();
     try {
       DatanodeInfo[] nodes = lblock.getLocations();
       targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
-      s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-      s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-      s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-
-      String file = BlockReaderFactory.getFileName(targetAddr, 
-          "test-blockpoolid", block.getBlockId());
-      blockReader = BlockReaderFactory.newBlockReader(
-          new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
-          true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
-          nodes[0], null, null, null, false,
-          CachingStrategy.newDefaultStrategy());
 
+      blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+          setFileName(BlockReaderFactory.getFileName(targetAddr, 
+                        "test-blockpoolid", block.getBlockId())).
+          setBlock(block).
+          setBlockToken(lblock.getBlockToken()).
+          setInetSocketAddress(targetAddr).
+          setStartOffset(0).
+          setLength(-1).
+          setVerifyChecksum(true).
+          setClientName("TestBlockTokenWithDFS").
+          setDatanodeInfo(nodes[0]).
+          setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+          setClientCacheContext(ClientContext.getFromConf(conf)).
+          setConfiguration(conf).
+          setRemotePeerFactory(new RemotePeerFactory() {
+            @Override
+            public Peer newConnectedPeer(InetSocketAddress addr)
+                throws IOException {
+              Peer peer = null;
+              Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+              try {
+                sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+                sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+                peer = TcpPeerServer.peerFromSocket(sock);
+              } finally {
+                if (peer == null) {
+                  IOUtils.closeSocket(sock);
+                }
+              }
+              return peer;
+            }
+          }).
+          build();
     } catch (IOException ex) {
-      if (ex instanceof InvalidBlockTokenException) {
-        assertFalse("OP_READ_BLOCK: access token is invalid, "
-            + "when it is expected to be valid", shouldSucceed);
-        return;
-      }
-      fail("OP_READ_BLOCK failed due to reasons other than access token: "
-          + StringUtils.stringifyException(ex));
+      ioe = ex;
     } finally {
-      if (s != null) {
+      if (blockReader != null) {
         try {
-          s.close();
-        } catch (IOException iex) {
-        } finally {
-          s = null;
+          blockReader.close();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
         }
       }
     }
-    if (blockReader == null) {
-      fail("OP_READ_BLOCK failed due to reasons other than access token");
+    if (shouldSucceed) {
+      Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, "
+            + "when it is expected to be valid", blockReader);
+    } else {
+      Assert.assertNotNull("OP_READ_BLOCK: access token is valid, "
+          + "when it is expected to be invalid", ioe);
+      Assert.assertTrue(
+          "OP_READ_BLOCK failed due to reasons other than access token: ",
+          ioe instanceof InvalidBlockTokenException);
     }
-    assertTrue("OP_READ_BLOCK: access token is valid, "
-        + "when it is expected to be invalid", shouldSucceed);
   }
 
   // get a conf for testing
@@ -347,9 +376,13 @@ public class TestBlockTokenWithDFS {
       /*
        * testing READ interface on DN using a BlockReader
        */
-
-      new DFSClient(new InetSocketAddress("localhost",
+      DFSClient client = null;
+      try {
+        client = new DFSClient(new InetSocketAddress("localhost",
           cluster.getNameNodePort()), conf);
+      } finally {
+        if (client != null) client.close();
+      }
       List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
           FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks();
       LocatedBlock lblock = locatedBlocks.get(0); // first block

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1567720&r1=1567719&r2=1567720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed Feb 12 19:08:52 2014
@@ -35,11 +35,14 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -48,13 +51,14 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -284,23 +288,43 @@ public class TestDataNodeVolumeFailure {
   private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
     throws IOException {
     InetSocketAddress targetAddr = null;
-    Socket s = null;
     ExtendedBlock block = lblock.getBlock(); 
    
     targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
-      
-    s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-    s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-    s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-
-    String file = BlockReaderFactory.getFileName(targetAddr, 
-        "test-blockpoolid",
-        block.getBlockId());
-    BlockReader blockReader =
-      BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
-        lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
-        TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false,
-        CachingStrategy.newDefaultStrategy());
+
+    BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+      setInetSocketAddress(targetAddr).
+      setBlock(block).
+      setFileName(BlockReaderFactory.getFileName(targetAddr,
+                    "test-blockpoolid", block.getBlockId())).
+      setBlockToken(lblock.getBlockToken()).
+      setStartOffset(0).
+      setLength(-1).
+      setVerifyChecksum(true).
+      setClientName("TestDataNodeVolumeFailure").
+      setDatanodeInfo(datanode).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setClientCacheContext(ClientContext.getFromConf(conf)).
+      setConfiguration(conf).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+          try {
+            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+            peer = TcpPeerServer.peerFromSocket(sock);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeSocket(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
     blockReader.close();
   }
   



Mime
View raw message