hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject [2/6] hadoop git commit: HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Date Wed, 25 May 2016 19:26:54 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
deleted file mode 100644
index 5ff343a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestClientBlockVerification {
-
-  static BlockReaderTestUtil util = null;
-  static final Path TEST_FILE = new Path("/test.file");
-  static final int FILE_SIZE_K = 256;
-  static LocatedBlock testBlock = null;
-
-  static {
-    GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL);
-  }
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    final int REPLICATION_FACTOR = 1;
-    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
-    util.writeFile(TEST_FILE, FILE_SIZE_K);
-    List<LocatedBlock> blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K);
-    testBlock = blkList.get(0);     // Use the first block to test
-  }
-
-  /**
-   * Verify that if we read an entire block, we send CHECKSUM_OK
-   */
-  @Test
-  public void testBlockVerification() throws Exception {
-    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
-        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
-    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
-    verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
-  }
-
-  /**
-   * Test that if we do an incomplete read, we don't call CHECKSUM_OK
-   */
-  @Test
-  public void testIncompleteRead() throws Exception {
-    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
-        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
-    util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
-
-    // We asked the blockreader for the whole file, and only read
-    // half of it, so no CHECKSUM_OK
-    verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
-  }
-
-  /**
-   * Test that if we ask for a half block, and read it all, we *do*
-   * send CHECKSUM_OK. The DN takes care of knowing whether it was
-   * the whole block or not.
-   */
-  @Test
-  public void testCompletePartialRead() throws Exception {
-    // Ask for half the file
-    RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
-        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
-    // And read half the file
-    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
-    verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
-  }
-
-  /**
-   * Test various unaligned reads to make sure that we properly
-   * account even when we don't start or end on a checksum boundary
-   */
-  @Test
-  public void testUnalignedReads() throws Exception {
-    int startOffsets[] = new int[] { 0, 3, 129 };
-    int lengths[] = new int[] { 30, 300, 512, 513, 1025 };
-    for (int startOffset : startOffsets) {
-      for (int length : lengths) {
-        DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
-                           " len=" + length);
-        RemoteBlockReader2 reader = (RemoteBlockReader2)spy(
-            util.getBlockReader(testBlock, startOffset, length));
-        util.readAndCheckEOS(reader, length, true);
-        verify(reader).sendReadResult(Status.CHECKSUM_OK);
-        reader.close();
-      }
-    }
-  }
-
-
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    util.shutdown();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
index afa5d27..72ae7fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
index f2043fb..c9d831a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
index 05698ec..d54164f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
@@ -28,6 +28,7 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
deleted file mode 100644
index cef1d6d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-
-public class TestRemoteBlockReader extends TestBlockReaderBase {
-
-  HdfsConfiguration createConf() {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
-    return conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
deleted file mode 100644
index c23b4b7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-public class TestRemoteBlockReader2 extends TestBlockReaderBase {
-  HdfsConfiguration createConf() {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    return conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java
new file mode 100644
index 0000000..89bb67e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsTracer;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+/**
+ * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
+ */
+public class BlockReaderTestUtil {
+  /**
+   * Returns true if we should run tests that generate large files (> 1GB)
+   */
+  static public boolean shouldTestLargeFiles() {
+    String property = System.getProperty("hdfs.test.large.files");
+    if (property == null) return false;
+    if (property.isEmpty()) return true;
+    return Boolean.parseBoolean(property);
+  }
+
+  private HdfsConfiguration conf = null;
+  private MiniDFSCluster cluster = null;
+
+  /**
+   * Setup the cluster
+   */
+  public BlockReaderTestUtil(int replicationFactor) throws Exception {
+    this(replicationFactor, new HdfsConfiguration());
+  }
+
+  public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception {
+    this.conf = config;
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);
+    cluster = new MiniDFSCluster.Builder(conf).format(true).build();
+    cluster.waitActive();
+  }
+
+  /**
+   * Shutdown cluster
+   */
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
+  public HdfsConfiguration getConf() {
+    return conf;
+  }
+
+  /**
+   * Create a file of the given size filled with random data.
+   * @return  File data.
+   */
+  public byte[] writeFile(Path filepath, int sizeKB)
+      throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+
+    // Write a file with the specified amount of data
+    DataOutputStream os = fs.create(filepath);
+    byte data[] = new byte[1024 * sizeKB];
+    new Random().nextBytes(data);
+    os.write(data);
+    os.close();
+    return data;
+  }
+
+  /**
+   * Get the list of Blocks for a file.
+   */
+  public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB)
+      throws IOException {
+    // Return the blocks we just wrote
+    DFSClient dfsclient = getDFSClient();
+    return dfsclient.getNamenode().getBlockLocations(
+      filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
+  }
+
+  /**
+   * Get the DFSClient.
+   */
+  public DFSClient getDFSClient() throws IOException {
+    InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort());
+    return new DFSClient(nnAddr, conf);
+  }
+
+  /**
+   * Exercise the BlockReader and read length bytes.
+   *
+   * It does not verify the bytes read.
+   */
+  public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof)
+      throws IOException {
+    byte buf[] = new byte[1024];
+    int nRead = 0;
+    while (nRead < length) {
+      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
+      int n = reader.read(buf, 0, buf.length);
+      assertTrue(n > 0);
+      nRead += n;
+    }
+
+    if (expectEof) {
+      DFSClient.LOG.info("Done reading, expect EOF for next read.");
+      assertEquals(-1, reader.read(buf, 0, buf.length));
+    }
+  }
+
+  /**
+   * Get a BlockReader for the given block.
+   */
+  public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
+      throws IOException {
+    return getBlockReader(cluster, testBlock, offset, lenToRead);
+  }
+
+  /**
+   * Get a BlockReader for the given block.
+   */
+  public static BlockReader getBlockReader(MiniDFSCluster cluster,
+      LocatedBlock testBlock, int offset, int lenToRead) throws IOException {
+    InetSocketAddress targetAddr = null;
+    ExtendedBlock block = testBlock.getBlock();
+    DatanodeInfo[] nodes = testBlock.getLocations();
+    targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
+
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    return new BlockReaderFactory(fs.getClient().getConf()).
+      setInetSocketAddress(targetAddr).
+      setBlock(block).
+      setFileName(targetAddr.toString()+ ":" + block.getBlockId()).
+      setBlockToken(testBlock.getBlockToken()).
+      setStartOffset(offset).
+      setLength(lenToRead).
+      setVerifyChecksum(true).
+      setClientName("BlockReaderTestUtil").
+      setDatanodeInfo(nodes[0]).
+      setClientCacheContext(ClientContext.getFromConf(fs.getConf())).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setConfiguration(fs.getConf()).
+      setAllowShortCircuitLocalReads(true).
+      setTracer(FsTracer.get(fs.getConf())).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr,
+            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.
+              getDefaultSocketFactory(fs.getConf()).createSocket();
+          try {
+            sock.connect(addr, HdfsConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+            peer = DFSUtilClient.peerFromSocket(sock);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeQuietly(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
+  }
+
+  /**
+   * Get a DataNode that serves our testBlock.
+   */
+  public DataNode getDataNode(LocatedBlock testBlock) {
+    DatanodeInfo[] nodes = testBlock.getLocations();
+    int ipcport = nodes[0].getIpcPort();
+    return cluster.getDataNode(ipcport);
+  }
+
+  public static void enableHdfsCachingTracing() {
+    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(CacheManager.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(FsDatasetCache.class.getName()).setLevel(
+        Level.TRACE);
+  }
+
+  public static void enableBlockReaderFactoryTracing() {
+    LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
+        Level.TRACE);
+  }
+
+  public static void enableShortCircuitShmTracing() {
+    LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel(
+        Level.TRACE);
+    LogManager.getLogger(DataNode.class.getName()).setLevel(
+        Level.TRACE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java
new file mode 100644
index 0000000..70ed428
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+abstract public class TestBlockReaderBase {
+  private BlockReaderTestUtil util;
+  private byte[] blockData;
+  private BlockReader reader;
+
+  /**
+   * if override this, make sure return array length is less than
+   * block size.
+   */
+  byte [] getBlockData() {
+    int length = 1 << 22;
+    byte[] data = new byte[length];
+    for (int i = 0; i < length; i++) {
+      data[i] = (byte) (i % 133);
+    }
+    return data;
+  }
+
+  private BlockReader getBlockReader(LocatedBlock block) throws Exception {
+    return util.getBlockReader(block, 0, blockData.length);
+  }
+
+  abstract HdfsConfiguration createConf();
+
+  @Before
+  public void setup() throws Exception {
+    util = new BlockReaderTestUtil(1, createConf());
+    blockData = getBlockData();
+    DistributedFileSystem fs = util.getCluster().getFileSystem();
+    Path testfile = new Path("/testfile");
+    FSDataOutputStream fout = fs.create(testfile);
+    fout.write(blockData);
+    fout.close();
+    LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
+    reader = getBlockReader(blk);
+  }
+
+  @After
+  public void shutdown() throws Exception {
+    util.shutdown();
+  }
+
+  @Test(timeout=60000)
+  public void testSkip() throws IOException {
+    Random random = new Random();
+    byte [] buf = new byte[1];
+    for (int pos = 0; pos < blockData.length;) {
+      long skip = random.nextInt(100) + 1;
+      long skipped = reader.skip(skip);
+      if (pos + skip >= blockData.length) {
+        assertEquals(blockData.length, pos + skipped);
+        break;
+      } else {
+        assertEquals(skip, skipped);
+        pos += skipped;
+        assertEquals(1, reader.read(buf, 0, 1));
+
+        assertEquals(blockData[pos], buf[0]);
+        pos += 1;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
new file mode 100644
index 0000000..4b7def6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class TestBlockReaderFactory {
+  static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class);
+
+  @Before
+  public void init() {
+    DomainSocket.disableBindPathValidation();
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+  }
+
+  @After
+  public void cleanup() {
+    DFSInputStream.tcpReadsDisabledForTesting = false;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
+  }
+
+  public static Configuration createShortCircuitConf(String testName,
+      TemporarySocketDirectory sockDir) {
+    Configuration conf = new Configuration();
+    conf.set(DFS_CLIENT_CONTEXT, testName);
+    conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
+    conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+        testName + "._PORT").getAbsolutePath());
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
+        false);
+    conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
+    return conf;
+  }
+
+  /**
+   * If we have a UNIX domain socket configured,
+   * and we have dfs.client.domain.socket.data.traffic set to true,
+   * and short-circuit access fails, we should still be able to pass
+   * data traffic over the UNIX domain socket.  Test this.
+   */
+  @Test(timeout=60000)
+  public void testFallbackFromShortCircuitToUnixDomainTraffic()
+      throws Exception {
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+
+    // The server is NOT configured with short-circuit local reads;
+    // the client is.  Both support UNIX domain reads.
+    Configuration clientConf = createShortCircuitConf(
+        "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir);
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
+    clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
+    Configuration serverConf = new Configuration(clientConf);
+    serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
+
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    FileSystem dfs = FileSystem.get(cluster.getURI(0), clientConf);
+    String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 8193;
+    final int SEED = 0xFADED;
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    cluster.shutdown();
+    sockDir.close();
+  }
+
+  /**
+   * Test the case where we have multiple threads waiting on the
+   * ShortCircuitCache delivering a certain ShortCircuitReplica.
+   *
+   * In this case, there should only be one call to
+   * createShortCircuitReplicaInfo.  This one replica should be shared
+   * by all threads.
+   */
+  @Test(timeout=60000)
+  public void testMultipleWaitersOnShortCircuitCache()
+      throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
+      new ShortCircuitCache.ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          Uninterruptibles.awaitUninterruptibly(latch);
+          if (!creationIsBlocked.compareAndSet(true, false)) {
+            Assert.fail("there were multiple calls to "
+                + "createShortCircuitReplicaInfo.  Only one was expected.");
+          }
+          return null;
+        }
+      };
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testMultipleWaitersOnShortCircuitCache", sockDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADED;
+    final int NUM_THREADS = 10;
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
+          Assert.assertFalse(creationIsBlocked.get());
+          byte expected[] = DFSTestUtil.
+              calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+          Assert.assertTrue(Arrays.equals(contents, expected));
+        } catch (Throwable e) {
+          LOG.error("readerRunnable error", e);
+          testFailed.set(true);
+        }
+      }
+    };
+    Thread threads[] = new Thread[NUM_THREADS];
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i] = new Thread(readerRunnable);
+      threads[i].start();
+    }
+    Thread.sleep(500);
+    latch.countDown();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Uninterruptibles.joinUninterruptibly(threads[i]);
+    }
+    cluster.shutdown();
+    sockDir.close();
+    Assert.assertFalse(testFailed.get());
+  }
+
+  /**
+   * Test the case where we have a failure to complete a short circuit read
+   * that occurs, and then later on, we have a success.
+   * Any thread waiting on a cache load should receive the failure (if it
+   * occurs);  however, the failure result should not be cached.  We want
+   * to be able to retry later and succeed.
+   */
+  @Test(timeout=60000)
+  public void testShortCircuitCacheTemporaryFailure()
+      throws Exception {
+    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
+    final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
+      new ShortCircuitCache.ShortCircuitReplicaCreator() {
+        @Override
+        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+          if (replicaCreationShouldFail.get()) {
+            // Insert a short delay to increase the chance that one client
+            // thread waits for the other client thread's failure via
+            // a condition variable.
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+            return new ShortCircuitReplicaInfo();
+          }
+          return null;
+        }
+      };
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testShortCircuitCacheTemporaryFailure", sockDir);
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int NUM_THREADS = 2;
+    final int SEED = 0xFADED;
+    final CountDownLatch gotFailureLatch = new CountDownLatch(NUM_THREADS);
+    final CountDownLatch shouldRetryLatch = new CountDownLatch(1);
+    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // First time should fail.
+          List<LocatedBlock> locatedBlocks =
+              cluster.getNameNode().getRpcServer().getBlockLocations(
+              TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
+          LocatedBlock lblock = locatedBlocks.get(0); // first block
+          BlockReader blockReader = null;
+          try {
+            blockReader = BlockReaderTestUtil.
+                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+            Assert.fail("expected getBlockReader to fail the first time.");
+          } catch (Throwable t) {
+            Assert.assertTrue("expected to see 'TCP reads were disabled " +
+                "for testing' in exception " + t, t.getMessage().contains(
+                "TCP reads were disabled for testing"));
+          } finally {
+            if (blockReader != null) blockReader.close(); // keep findbugs happy
+          }
+          gotFailureLatch.countDown();
+          shouldRetryLatch.await();
+
+          // Second time should succeed.
+          try {
+            blockReader = BlockReaderTestUtil.
+                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+          } catch (Throwable t) {
+            LOG.error("error trying to retrieve a block reader " +
+                "the second time.", t);
+            throw t;
+          } finally {
+            if (blockReader != null) blockReader.close();
+          }
+        } catch (Throwable t) {
+          LOG.error("getBlockReader failure", t);
+          testFailed.set(true);
+        }
+      }
+    };
+    Thread threads[] = new Thread[NUM_THREADS];
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i] = new Thread(readerRunnable);
+      threads[i].start();
+    }
+    gotFailureLatch.await();
+    replicaCreationShouldFail.set(false);
+    shouldRetryLatch.countDown();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Uninterruptibles.joinUninterruptibly(threads[i]);
+    }
+    cluster.shutdown();
+    sockDir.close();
+    Assert.assertFalse(testFailed.get());
+  }
+
+   /**
+   * Test that a client which supports short-circuit reads using
+   * shared memory can fall back to not using shared memory when
+   * the server doesn't support it.
+   */
+  @Test
+  public void testShortCircuitReadFromServerWithoutShm() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration clientConf = createShortCircuitConf(
+        "testShortCircuitReadFromServerWithoutShm", sockDir);
+    Configuration serverConf = new Configuration(clientConf);
+    serverConf.setInt(
+        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testShortCircuitReadFromServerWithoutShm_clientContext");
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    final DatanodeInfo datanode =
+        new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
+    cache.getDfsClientShmManager().visit(new Visitor() {
+      @Override
+      public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+          throws IOException {
+        Assert.assertEquals(1,  info.size());
+        PerDatanodeVisitorInfo vinfo = info.get(datanode);
+        Assert.assertTrue(vinfo.disabled);
+        Assert.assertEquals(0, vinfo.full.size());
+        Assert.assertEquals(0, vinfo.notFull.size());
+      }
+    });
+    cluster.shutdown();
+    sockDir.close();
+  }
+
+  /**
+   * Test that a client which does not support short-circuit reads using
+   * shared memory can talk with a server which supports it.
+   */
+  @Test
+  public void testShortCircuitReadFromClientWithoutShm() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration clientConf = createShortCircuitConf(
+        "testShortCircuitReadWithoutShm", sockDir);
+    Configuration serverConf = new Configuration(clientConf);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    clientConf.setInt(
+        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
+    clientConf.set(DFS_CLIENT_CONTEXT,
+        "testShortCircuitReadFromClientWithoutShm_clientContext");
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    Assert.assertEquals(null, cache.getDfsClientShmManager());
+    cluster.shutdown();
+    sockDir.close();
+  }
+
+  /**
+   * Test shutting down the ShortCircuitCache while there are things in it.
+   */
+  @Test
+  public void testShortCircuitCacheShutdown() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testShortCircuitCacheShutdown", sockDir);
+    conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
+    Configuration serverConf = new Configuration(conf);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4000;
+    final int SEED = 0xFADEC;
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(contents, expected));
+    final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+    cache.close();
+    Assert.assertTrue(cache.getDfsClientShmManager().
+        getDomainSocketWatcher().isClosed());
+    cluster.shutdown();
+    sockDir.close();
+  }
+
+  /**
+   * When an InterruptedException is sent to a thread calling
+   * FileChannel#read, the FileChannel is immediately closed and the
+   * thread gets an exception.  This effectively means that we might have
+   * someone asynchronously calling close() on the file descriptors we use
+   * in BlockReaderLocal.  So when unreferencing a ShortCircuitReplica in
+   * ShortCircuitCache#unref, we should check if the FileChannel objects
+   * are still open.  If not, we should purge the replica to avoid giving
+   * it out to any future readers.
+   *
+   * This is a regression test for HDFS-6227: Short circuit read failed
+   * due to ClosedChannelException.
+   *
+   * Note that you may still get ClosedChannelException errors if two threads
+   * are reading from the same replica and an InterruptedException is delivered
+   * to one of them.
+   */
+  @Test(timeout=120000)
+  public void testPurgingClosedReplicas() throws Exception {
+    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
+    final AtomicInteger replicasCreated = new AtomicInteger(0);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
+        new ShortCircuitCache.ShortCircuitReplicaCreator() {
+          @Override
+          public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+            replicasCreated.incrementAndGet();
+            return null;
+          }
+        };
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testPurgingClosedReplicas", sockDir);
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4095;
+    final int SEED = 0xFADE0;
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+
+    final Semaphore sem = new Semaphore(0);
+    final List<LocatedBlock> locatedBlocks =
+        cluster.getNameNode().getRpcServer().getBlockLocations(
+            TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
+    final LocatedBlock lblock = locatedBlocks.get(0); // first block
+    final byte[] buf = new byte[TEST_FILE_LEN];
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (true) {
+            BlockReader blockReader = null;
+            try {
+              blockReader = BlockReaderTestUtil.
+                  getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+              sem.release();
+              try {
+                blockReader.readAll(buf, 0, TEST_FILE_LEN);
+              } finally {
+                sem.acquireUninterruptibly();
+              }
+            } catch (ClosedByInterruptException e) {
+              LOG.info("got the expected ClosedByInterruptException", e);
+              sem.release();
+              break;
+            } finally {
+              if (blockReader != null) blockReader.close();
+            }
+            LOG.info("read another " + TEST_FILE_LEN + " bytes.");
+          }
+        } catch (Throwable t) {
+          LOG.error("getBlockReader failure", t);
+          testFailed.set(true);
+          sem.release();
+        }
+      }
+    };
+    Thread thread = new Thread(readerRunnable);
+    thread.start();
+
+    // While the thread is reading, send it interrupts.
+    // These should trigger a ClosedChannelException.
+    while (thread.isAlive()) {
+      sem.acquireUninterruptibly();
+      thread.interrupt();
+      sem.release();
+    }
+    Assert.assertFalse(testFailed.get());
+
+    // We should be able to read from the file without
+    // getting a ClosedChannelException.
+    BlockReader blockReader = null;
+    try {
+      blockReader = BlockReaderTestUtil.
+          getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+      blockReader.readFully(buf, 0, TEST_FILE_LEN);
+    } finally {
+      if (blockReader != null) blockReader.close();
+    }
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(buf, expected));
+
+    // Another ShortCircuitReplica object should have been created.
+    Assert.assertEquals(2, replicasCreated.get());
+
+    dfs.close();
+    cluster.shutdown();
+    sockDir.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
new file mode 100644
index 0000000..71d30a4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
+import org.apache.hadoop.fs.FsTracer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBlockReaderLocal {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
+
+  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+      int off2, int len) {
+    for (int i = 0; i < len; i++) {
+      if (buf1[off1 + i] != buf2[off2 + i]) {
+        Assert.fail("arrays differ at byte " +  i + ". " +
+          "The first array has " + (int)buf1[off1 + i] +
+          ", but the second array has " + (int)buf2[off2 + i]);
+      }
+    }
+  }
+
+  /**
+   * Similar to IOUtils#readFully(). Reads bytes in a loop.
+   *
+   * @param reader           The BlockReaderLocal to read bytes from
+   * @param buf              The ByteBuffer to read into
+   * @param off              The offset in the buffer to read into
+   * @param len              The number of bytes to read.
+   *
+   * @throws IOException     If it could not read the requested number of bytes
+   */
+  private static void readFully(BlockReaderLocal reader,
+      ByteBuffer buf, int off, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      buf.limit(off + len);
+      buf.position(off);
+      long ret = reader.read(buf);
+      if (ret < 0) {
+        throw new EOFException( "Premature EOF from BlockReaderLocal " +
+            "after reading " + (len - amt) + " byte(s).");
+      }
+      amt -= ret;
+      off += ret;
+    }
+  }
+
+  private static class BlockReaderLocalTest {
+    final static int TEST_LENGTH = 12345;
+    final static int BYTES_PER_CHECKSUM = 512;
+
+    public void setConfiguration(HdfsConfiguration conf) {
+      // default: no-op
+    }
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      // default: no-op
+    }
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      // default: no-op
+    }
+  }
+
+  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum, long readahead) throws IOException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
+        !checksum);
+    conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+        BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
+    test.setConfiguration(conf);
+    FileInputStream dataIn = null, metaIn = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    BlockReaderLocal blockReaderLocal = null;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+
+    FileSystem fs = null;
+    ShortCircuitShm shm = null;
+    RandomAccessFile raf = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = cluster.getBlockFile(0, block);
+      File metaFile = cluster.getBlockMetadataFile(0, block);
+
+      ShortCircuitCache shortCircuitCache =
+          ClientContext.getFromConf(conf).getShortCircuitCache();
+      cluster.shutdown();
+      cluster = null;
+      test.setup(dataFile, checksum);
+      FileInputStream streams[] = {
+          new FileInputStream(dataFile),
+          new FileInputStream(metaFile)
+      };
+      dataIn = streams[0];
+      metaIn = streams[1];
+      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+          block.getBlockPoolId());
+      raf = new RandomAccessFile(
+          new File(sockDir.getDir().getAbsolutePath(),
+            UUID.randomUUID().toString()), "rw");
+      raf.setLength(8192);
+      FileInputStream shmStream = new FileInputStream(raf.getFD());
+      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
+      ShortCircuitReplica replica =
+          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
+              Time.now(), shm.allocAndRegisterSlot(
+                  ExtendedBlockId.fromExtendedBlock(block)));
+      blockReaderLocal = new BlockReaderLocal.Builder(
+              new DfsClientConf.ShortCircuitConf(conf)).
+          setFilename(TEST_PATH.getName()).
+          setBlock(block).
+          setShortCircuitReplica(replica).
+          setCachingStrategy(new CachingStrategy(false, readahead)).
+          setVerifyChecksum(checksum).
+          setTracer(FsTracer.get(conf)).
+          build();
+      dataIn = null;
+      metaIn = null;
+      test.doTest(blockReaderLocal, original);
+      // BlockReaderLocal should not alter the file position.
+      Assert.assertEquals(0, streams[0].getChannel().position());
+      Assert.assertEquals(0, streams[1].getChannel().position());
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+      if (dataIn != null) dataIn.close();
+      if (metaIn != null) metaIn.close();
+      if (blockReaderLocal != null) blockReaderLocal.close();
+      if (shm != null) shm.free();
+      if (raf != null) raf.close();
+    }
+  }
+
+  private static class TestBlockReaderLocalImmediateClose
+      extends BlockReaderLocalTest {
+  }
+
+  @Test
+  public void testBlockReaderLocalImmediateClose() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
+  }
+
+  private static class TestBlockReaderSimpleReads
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 512);
+      assertArrayRegionsEqual(original, 0, buf, 0, 512);
+      reader.readFully(buf, 512, 512);
+      assertArrayRegionsEqual(original, 512, buf, 512, 512);
+      reader.readFully(buf, 1024, 513);
+      assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+      reader.readFully(buf, 1537, 514);
+      assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+      // Readahead is always at least the size of one chunk in this test.
+      Assert.assertTrue(reader.getMaxReadaheadLength() >=
+          BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+    }
+  }
+
+  @Test
+  public void testBlockReaderSimpleReads() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+        BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
+  }
+
+  private static class TestBlockReaderLocalArrayReads2
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf, 0, 10);
+      reader.readFully(buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf, 10, 100);
+      reader.readFully(buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf, 110, 700);
+      reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+      reader.readFully(buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf, 811, 5);
+      reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+      reader.readFully(buf, 1716, 5);
+      assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
+  }
+
+  private static class TestBlockReaderLocalByteBufferReads
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      readFully(reader, buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReads()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        false, 0);
+  }
+
+  /**
+   * Test reads that bypass the bounce buffer (because they are aligned
+   * and bigger than the readahead).
+   */
+  private static class TestBlockReaderLocalByteBufferFastLaneReads
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
+      readFully(reader, buf, 0, 5120);
+      buf.flip();
+      assertArrayRegionsEqual(original, 0,
+          DFSTestUtil.asArray(buf), 0,
+          5120);
+      reader.skip(1537);
+      readFully(reader, buf, 0, 1);
+      buf.flip();
+      assertArrayRegionsEqual(original, 6657,
+          DFSTestUtil.asArray(buf), 0,
+          1);
+      reader.forceAnchorable();
+      readFully(reader, buf, 0, 5120);
+      buf.flip();
+      assertArrayRegionsEqual(original, 6658,
+          DFSTestUtil.asArray(buf), 0,
+          5120);
+      reader.forceUnanchorable();
+      readFully(reader, buf, 0, 513);
+      buf.flip();
+      assertArrayRegionsEqual(original, 11778,
+          DFSTestUtil.asArray(buf), 0,
+          513);
+      reader.skip(3);
+      readFully(reader, buf, 0, 50);
+      buf.flip();
+      assertArrayRegionsEqual(original, 12294,
+          DFSTestUtil.asArray(buf), 0,
+          50);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReads()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferFastLaneReads(),
+        false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        false, 0);
+  }
+
+  private static class TestBlockReaderLocalReadCorruptStart
+      extends BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
+    }
+
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      if (usingChecksums) {
+        try {
+          reader.readFully(buf, 0, 10);
+          Assert.fail("did not detect corruption");
+        } catch (IOException e) {
+          // expected
+        }
+      } else {
+        reader.readFully(buf, 0, 10);
+      }
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptStart()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  private static class TestBlockReaderLocalReadCorrupt
+      extends BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.seek(1539);
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
+    }
+
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      try {
+        reader.readFully(buf, 0, 10);
+        assertArrayRegionsEqual(original, 0, buf, 0, 10);
+        reader.readFully(buf, 10, 100);
+        assertArrayRegionsEqual(original, 10, buf, 10, 100);
+        reader.readFully(buf, 110, 700);
+        assertArrayRegionsEqual(original, 110, buf, 110, 700);
+        reader.skip(1); // skip from offset 810 to offset 811
+        reader.readFully(buf, 811, 5);
+        assertArrayRegionsEqual(original, 811, buf, 811, 5);
+        reader.readFully(buf, 816, 900);
+        if (usingChecksums) {
+          // We should detect the corruption when using a checksum file.
+          Assert.fail("did not detect corruption");
+        }
+      } catch (ChecksumException e) {
+        if (!usingChecksums) {
+          Assert.fail("didn't expect to get ChecksumException: not " +
+              "using checksums.");
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorrupt()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
+  }
+
+  private static class TestBlockReaderLocalWithMlockChanges
+      extends BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+    }
+
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      reader.skip(1);
+      readFully(reader, buf, 1, 9);
+      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      reader.forceAnchorable();
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.forceUnanchorable();
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChanges()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        false, 0);
+  }
+
+  private static class TestBlockReaderLocalOnFileWithoutChecksum
+      extends BlockReaderLocalTest {
+    @Override
+    public void setConfiguration(HdfsConfiguration conf) {
+      conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
+    }
+
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      Assert.assertTrue(!reader.getVerifyChecksum());
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      reader.skip(1);
+      readFully(reader, buf, 1, 9);
+      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      reader.forceAnchorable();
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.forceUnanchorable();
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  private static class TestBlockReaderLocalReadZeroBytes
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte emptyArr[] = new byte[0];
+      Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
+      ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr);
+      Assert.assertEquals(0, reader.read(emptyBuf));
+      reader.skip(1);
+      Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
+      Assert.assertEquals(0, reader.read(emptyBuf));
+      reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1);
+      Assert.assertEquals(-1, reader.read(emptyArr, 0, 0));
+      Assert.assertEquals(-1, reader.read(emptyBuf));
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        false, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytes()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytesNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytesNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        false, 0);
+  }
+
+
+  @Test(timeout=60000)
+  public void TestStatisticsForShortCircuitLocalRead() throws Exception {
+    testStatistics(true);
+  }
+
+  @Test(timeout=60000)
+  public void TestStatisticsForLocalRead() throws Exception {
+    testStatistics(false);
+  }
+
+  private void testStatistics(boolean isShortCircuit) throws Exception {
+    Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    TemporarySocketDirectory sockDir = null;
+    if (isShortCircuit) {
+      DFSInputStream.tcpReadsDisabledForTesting = true;
+      sockDir = new TemporarySocketDirectory();
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
+          getAbsolutePath());
+      conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+      DomainSocket.disableBindPathValidation();
+    } else {
+      conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
+    }
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
+          dfsIn.getReadStatistics().getTotalLocalBytesRead());
+      if (isShortCircuit) {
+        Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
+            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
+      } else {
+        Assert.assertEquals(0,
+            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
+      }
+      fsIn.close();
+      fsIn = null;
+    } finally {
+      DFSInputStream.tcpReadsDisabledForTesting = false;
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+      if (sockDir != null) sockDir.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java
new file mode 100644
index 0000000..273619c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBlockReaderLocalLegacy {
+  @BeforeClass
+  public static void setupCluster() throws IOException {
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    DomainSocket.disableBindPathValidation();
+  }
+
+  private static HdfsConfiguration getConfiguration(
+      TemporarySocketDirectory socketDir) throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    if (socketDir == null) {
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
+    } else {
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock").
+          getAbsolutePath());
+    }
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
+        false);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+    return conf;
+  }
+
+  /**
+   * Test that, in the case of an error, the position and limit of a ByteBuffer
+   * are left unchanged. This is not mandated by ByteBufferReadable, but clients
+   * of this class might immediately issue a retry on failure, so it's polite.
+   */
+  @Test
+  public void testStablePositionAfterCorruptRead() throws Exception {
+    final short REPL_FACTOR = 1;
+    final long FILE_LENGTH = 512L;
+
+    HdfsConfiguration conf = getConfiguration(null);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+
+    Path path = new Path("/corrupted");
+
+    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
+    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
+    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+    assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
+
+    FSDataInputStream dis = cluster.getFileSystem().open(path);
+    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
+    boolean sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(0, buf.position());
+    assertEquals(buf.capacity(), buf.limit());
+
+    dis = cluster.getFileSystem().open(path);
+    buf.position(3);
+    buf.limit(25);
+    sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(3, buf.position());
+    assertEquals(25, buf.limit());
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testBothOldAndNewShortCircuitConfigured() throws Exception {
+    final short REPL_FACTOR = 1;
+    final int FILE_LENGTH = 512;
+    Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason());
+    TemporarySocketDirectory socketDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = getConfiguration(socketDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    socketDir.close();
+    FileSystem fs = cluster.getFileSystem();
+
+    Path path = new Path("/foo");
+    byte orig[] = new byte[FILE_LENGTH];
+    for (int i = 0; i < orig.length; i++) {
+      orig[i] = (byte)(i%10);
+    }
+    FSDataOutputStream fos = fs.create(path, (short)1);
+    fos.write(orig);
+    fos.close();
+    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+    FSDataInputStream fis = cluster.getFileSystem().open(path);
+    byte buf[] = new byte[FILE_LENGTH];
+    IOUtils.readFully(fis, buf, 0, FILE_LENGTH);
+    fis.close();
+    Assert.assertArrayEquals(orig, buf);
+    Arrays.equals(orig, buf);
+    cluster.shutdown();
+  }
+
+  @Test(timeout=20000)
+  public void testBlockReaderLocalLegacyWithAppend() throws Exception {
+    final short REPL_FACTOR = 1;
+    final HdfsConfiguration conf = getConfiguration(null);
+    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final Path path = new Path("/testBlockReaderLocalLegacy");
+    DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0);
+    DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR);
+
+    final ClientDatanodeProtocol proxy;
+    final Token<BlockTokenIdentifier> token;
+    final ExtendedBlock originalBlock;
+    final long originalGS;
+    {
+      final LocatedBlock lb = cluster.getNameNode().getRpcServer()
+          .getBlockLocations(path.toString(), 0, 1).get(0);
+      proxy = DFSUtilClient.createClientDatanodeProtocolProxy(
+          lb.getLocations()[0], conf, 60000, false);
+      token = lb.getBlockToken();
+
+      // get block and generation stamp
+      final ExtendedBlock blk = new ExtendedBlock(lb.getBlock());
+      originalBlock = new ExtendedBlock(blk);
+      originalGS = originalBlock.getGenerationStamp();
+
+      // test getBlockLocalPathInfo
+      final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token);
+      Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp());
+    }
+
+    { // append one byte
+      FSDataOutputStream out = dfs.append(path);
+      out.write(1);
+      out.close();
+    }
+
+    {
+      // get new generation stamp
+      final LocatedBlock lb = cluster.getNameNode().getRpcServer()
+          .getBlockLocations(path.toString(), 0, 1).get(0);
+      final long newGS = lb.getBlock().getGenerationStamp();
+      Assert.assertTrue(newGS > originalGS);
+
+      // getBlockLocalPathInfo using the original block.
+      Assert.assertEquals(originalGS, originalBlock.getGenerationStamp());
+      final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(
+          originalBlock, token);
+      Assert.assertEquals(newGS, info.getBlock().getGenerationStamp());
+    }
+    cluster.shutdown();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message