hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject [1/6] hadoop git commit: HDFS-8057 Move BlockReader implementation to the client implementation package. Contributed by Takanobu Asanuma
Date Wed, 25 May 2016 19:13:53 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 0ac8c098e -> e5dab6806


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
new file mode 100644
index 0000000..bd0c7c3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
@@ -0,0 +1,786 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
+import org.apache.hadoop.fs.FsTracer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBlockReaderLocal {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
+
+  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+      int off2, int len) {
+    for (int i = 0; i < len; i++) {
+      if (buf1[off1 + i] != buf2[off2 + i]) {
+        Assert.fail("arrays differ at byte " +  i + ". " +
+          "The first array has " + (int)buf1[off1 + i] +
+          ", but the second array has " + (int)buf2[off2 + i]);
+      }
+    }
+  }
+
+  /**
+   * Similar to IOUtils#readFully(). Reads bytes in a loop.
+   *
+   * @param reader           The BlockReaderLocal to read bytes from
+   * @param buf              The ByteBuffer to read into
+   * @param off              The offset in the buffer to read into
+   * @param len              The number of bytes to read.
+   *
+   * @throws IOException     If it could not read the requested number of bytes
+   */
+  private static void readFully(BlockReaderLocal reader,
+      ByteBuffer buf, int off, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      buf.limit(off + len);
+      buf.position(off);
+      long ret = reader.read(buf);
+      if (ret < 0) {
+        throw new EOFException( "Premature EOF from BlockReaderLocal " +
+            "after reading " + (len - amt) + " byte(s).");
+      }
+      amt -= ret;
+      off += ret;
+    }
+  }
+
+  private static class BlockReaderLocalTest {
+    final static int TEST_LENGTH = 12345;
+    final static int BYTES_PER_CHECKSUM = 512;
+
+    public void setConfiguration(HdfsConfiguration conf) {
+      // default: no-op
+    }
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      // default: no-op
+    }
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      // default: no-op
+    }
+  }
+
+  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum, long readahead) throws IOException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
+        !checksum);
+    conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
+        BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
+    test.setConfiguration(conf);
+    FileInputStream dataIn = null, metaIn = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    BlockReaderLocal blockReaderLocal = null;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+
+    FileSystem fs = null;
+    ShortCircuitShm shm = null;
+    RandomAccessFile raf = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = cluster.getBlockFile(0, block);
+      File metaFile = cluster.getBlockMetadataFile(0, block);
+
+      ShortCircuitCache shortCircuitCache =
+          ClientContext.getFromConf(conf).getShortCircuitCache();
+      cluster.shutdown();
+      cluster = null;
+      test.setup(dataFile, checksum);
+      FileInputStream streams[] = {
+          new FileInputStream(dataFile),
+          new FileInputStream(metaFile)
+      };
+      dataIn = streams[0];
+      metaIn = streams[1];
+      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+          block.getBlockPoolId());
+      raf = new RandomAccessFile(
+          new File(sockDir.getDir().getAbsolutePath(),
+            UUID.randomUUID().toString()), "rw");
+      raf.setLength(8192);
+      FileInputStream shmStream = new FileInputStream(raf.getFD());
+      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
+      ShortCircuitReplica replica =
+          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
+              Time.now(), shm.allocAndRegisterSlot(
+                  ExtendedBlockId.fromExtendedBlock(block)));
+      blockReaderLocal = new BlockReaderLocal.Builder(
+              new DfsClientConf.ShortCircuitConf(conf)).
+          setFilename(TEST_PATH.getName()).
+          setBlock(block).
+          setShortCircuitReplica(replica).
+          setCachingStrategy(new CachingStrategy(false, readahead)).
+          setVerifyChecksum(checksum).
+          setTracer(FsTracer.get(conf)).
+          build();
+      dataIn = null;
+      metaIn = null;
+      test.doTest(blockReaderLocal, original);
+      // BlockReaderLocal should not alter the file position.
+      Assert.assertEquals(0, streams[0].getChannel().position());
+      Assert.assertEquals(0, streams[1].getChannel().position());
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+      if (dataIn != null) dataIn.close();
+      if (metaIn != null) metaIn.close();
+      if (blockReaderLocal != null) blockReaderLocal.close();
+      if (shm != null) shm.free();
+      if (raf != null) raf.close();
+    }
+  }
+
+  private static class TestBlockReaderLocalImmediateClose
+      extends BlockReaderLocalTest {
+  }
+
+  @Test
+  public void testBlockReaderLocalImmediateClose() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
+  }
+
+  private static class TestBlockReaderSimpleReads
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 512);
+      assertArrayRegionsEqual(original, 0, buf, 0, 512);
+      reader.readFully(buf, 512, 512);
+      assertArrayRegionsEqual(original, 512, buf, 512, 512);
+      reader.readFully(buf, 1024, 513);
+      assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+      reader.readFully(buf, 1537, 514);
+      assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+      // Readahead is always at least the size of one chunk in this test.
+      Assert.assertTrue(reader.getMaxReadaheadLength() >=
+          BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+    }
+  }
+
+  @Test
+  public void testBlockReaderSimpleReads() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
+        BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
+  }
+
+  private static class TestBlockReaderLocalArrayReads2
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf, 0, 10);
+      reader.readFully(buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf, 10, 100);
+      reader.readFully(buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf, 110, 700);
+      reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+      reader.readFully(buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf, 811, 5);
+      reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+      reader.readFully(buf, 1716, 5);
+      assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
+  }
+
+  private static class TestBlockReaderLocalByteBufferReads
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      readFully(reader, buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReads()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
+        false, 0);
+  }
+
+  /**
+   * Test reads that bypass the bounce buffer (because they are aligned
+   * and bigger than the readahead).
+   */
+  private static class TestBlockReaderLocalByteBufferFastLaneReads
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
+      readFully(reader, buf, 0, 5120);
+      buf.flip();
+      assertArrayRegionsEqual(original, 0,
+          DFSTestUtil.asArray(buf), 0,
+          5120);
+      reader.skip(1537);
+      readFully(reader, buf, 0, 1);
+      buf.flip();
+      assertArrayRegionsEqual(original, 6657,
+          DFSTestUtil.asArray(buf), 0,
+          1);
+      reader.forceAnchorable();
+      readFully(reader, buf, 0, 5120);
+      buf.flip();
+      assertArrayRegionsEqual(original, 6658,
+          DFSTestUtil.asArray(buf), 0,
+          5120);
+      reader.forceUnanchorable();
+      readFully(reader, buf, 0, 513);
+      buf.flip();
+      assertArrayRegionsEqual(original, 11778,
+          DFSTestUtil.asArray(buf), 0,
+          513);
+      reader.skip(3);
+      readFully(reader, buf, 0, 50);
+      buf.flip();
+      assertArrayRegionsEqual(original, 12294,
+          DFSTestUtil.asArray(buf), 0,
+          50);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReads()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferFastLaneReads(),
+        false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
+        false, 0);
+  }
+
+  private static class TestBlockReaderLocalReadCorruptStart
+      extends BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
+    }
+
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      if (usingChecksums) {
+        try {
+          reader.readFully(buf, 0, 10);
+          Assert.fail("did not detect corruption");
+        } catch (IOException e) {
+          // expected
+        }
+      } else {
+        reader.readFully(buf, 0, 10);
+      }
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptStart()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  private static class TestBlockReaderLocalReadCorrupt
+      extends BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.seek(1539);
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
+    }
+
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      try {
+        reader.readFully(buf, 0, 10);
+        assertArrayRegionsEqual(original, 0, buf, 0, 10);
+        reader.readFully(buf, 10, 100);
+        assertArrayRegionsEqual(original, 10, buf, 10, 100);
+        reader.readFully(buf, 110, 700);
+        assertArrayRegionsEqual(original, 110, buf, 110, 700);
+        reader.skip(1); // skip from offset 810 to offset 811
+        reader.readFully(buf, 811, 5);
+        assertArrayRegionsEqual(original, 811, buf, 811, 5);
+        reader.readFully(buf, 816, 900);
+        if (usingChecksums) {
+          // We should detect the corruption when using a checksum file.
+          Assert.fail("did not detect corruption");
+        }
+      } catch (ChecksumException e) {
+        if (!usingChecksums) {
+          Assert.fail("didn't expect to get ChecksumException: not " +
+              "using checksums.");
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorrupt()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
+        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
+  }
+
+  private static class TestBlockReaderLocalWithMlockChanges
+      extends BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+    }
+
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      reader.skip(1);
+      readFully(reader, buf, 1, 9);
+      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      reader.forceAnchorable();
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.forceUnanchorable();
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChanges()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
+        false, 0);
+  }
+
+  private static class TestBlockReaderLocalOnFileWithoutChecksum
+      extends BlockReaderLocalTest {
+    @Override
+    public void setConfiguration(HdfsConfiguration conf) {
+      conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
+    }
+
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      Assert.assertTrue(!reader.getVerifyChecksum());
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      reader.skip(1);
+      readFully(reader, buf, 1, 9);
+      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      reader.forceAnchorable();
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.forceUnanchorable();
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+
+  private static class TestBlockReaderLocalReadZeroBytes
+      extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException {
+      byte emptyArr[] = new byte[0];
+      Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
+      ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr);
+      Assert.assertEquals(0, reader.read(emptyBuf));
+      reader.skip(1);
+      Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
+      Assert.assertEquals(0, reader.read(emptyBuf));
+      reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1);
+      Assert.assertEquals(-1, reader.read(emptyArr, 0, 0));
+      Assert.assertEquals(-1, reader.read(emptyBuf));
+    }
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
+        false, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytes()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytesNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytesNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        true, 0);
+  }
+
+  @Test
+  public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
+        false, 0);
+  }
+
+
+  @Test(timeout=60000)
+  public void TestStatisticsForShortCircuitLocalRead() throws Exception {
+    testStatistics(true);
+  }
+
+  @Test(timeout=60000)
+  public void TestStatisticsForLocalRead() throws Exception {
+    testStatistics(false);
+  }
+
+  private void testStatistics(boolean isShortCircuit) throws Exception {
+    Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    TemporarySocketDirectory sockDir = null;
+    if (isShortCircuit) {
+      DFSInputStream.tcpReadsDisabledForTesting = true;
+      sockDir = new TemporarySocketDirectory();
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
+          getAbsolutePath());
+      conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+      DomainSocket.disableBindPathValidation();
+    } else {
+      conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
+    }
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).
+          hosts(new String[] {NetUtils.getLocalHostname()}).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
+          dfsIn.getReadStatistics().getTotalLocalBytesRead());
+      if (isShortCircuit) {
+        Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH,
+            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
+      } else {
+        Assert.assertEquals(0,
+            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
+      }
+      fsIn.close();
+      fsIn = null;
+    } finally {
+      DFSInputStream.tcpReadsDisabledForTesting = false;
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+      if (sockDir != null) sockDir.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java
new file mode 100644
index 0000000..273619c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBlockReaderLocalLegacy {
+  @BeforeClass
+  public static void setupCluster() throws IOException {
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    DomainSocket.disableBindPathValidation();
+  }
+
+  private static HdfsConfiguration getConfiguration(
+      TemporarySocketDirectory socketDir) throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    if (socketDir == null) {
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
+    } else {
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock").
+          getAbsolutePath());
+    }
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
+        false);
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+    return conf;
+  }
+
+  /**
+   * Test that, in the case of an error, the position and limit of a ByteBuffer
+   * are left unchanged. This is not mandated by ByteBufferReadable, but clients
+   * of this class might immediately issue a retry on failure, so it's polite.
+   */
+  @Test
+  public void testStablePositionAfterCorruptRead() throws Exception {
+    final short REPL_FACTOR = 1;
+    final long FILE_LENGTH = 512L;
+
+    HdfsConfiguration conf = getConfiguration(null);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+
+    Path path = new Path("/corrupted");
+
+    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
+    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+
+    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
+    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+    assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
+
+    FSDataInputStream dis = cluster.getFileSystem().open(path);
+    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
+    boolean sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(0, buf.position());
+    assertEquals(buf.capacity(), buf.limit());
+
+    dis = cluster.getFileSystem().open(path);
+    buf.position(3);
+    buf.limit(25);
+    sawException = false;
+    try {
+      dis.read(buf);
+    } catch (ChecksumException ex) {
+      sawException = true;
+    }
+
+    assertTrue(sawException);
+    assertEquals(3, buf.position());
+    assertEquals(25, buf.limit());
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testBothOldAndNewShortCircuitConfigured() throws Exception {
+    final short REPL_FACTOR = 1;
+    final int FILE_LENGTH = 512;
+    Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason());
+    TemporarySocketDirectory socketDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = getConfiguration(socketDir);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    socketDir.close();
+    FileSystem fs = cluster.getFileSystem();
+
+    Path path = new Path("/foo");
+    byte orig[] = new byte[FILE_LENGTH];
+    for (int i = 0; i < orig.length; i++) {
+      orig[i] = (byte)(i%10);
+    }
+    FSDataOutputStream fos = fs.create(path, (short)1);
+    fos.write(orig);
+    fos.close();
+    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+    FSDataInputStream fis = cluster.getFileSystem().open(path);
+    byte buf[] = new byte[FILE_LENGTH];
+    IOUtils.readFully(fis, buf, 0, FILE_LENGTH);
+    fis.close();
+    Assert.assertArrayEquals(orig, buf);
+    Arrays.equals(orig, buf);
+    cluster.shutdown();
+  }
+
+  @Test(timeout=20000)
+  public void testBlockReaderLocalLegacyWithAppend() throws Exception {
+    final short REPL_FACTOR = 1;
+    final HdfsConfiguration conf = getConfiguration(null);
+    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final Path path = new Path("/testBlockReaderLocalLegacy");
+    DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0);
+    DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR);
+
+    final ClientDatanodeProtocol proxy;
+    final Token<BlockTokenIdentifier> token;
+    final ExtendedBlock originalBlock;
+    final long originalGS;
+    {
+      final LocatedBlock lb = cluster.getNameNode().getRpcServer()
+          .getBlockLocations(path.toString(), 0, 1).get(0);
+      proxy = DFSUtilClient.createClientDatanodeProtocolProxy(
+          lb.getLocations()[0], conf, 60000, false);
+      token = lb.getBlockToken();
+
+      // get block and generation stamp
+      final ExtendedBlock blk = new ExtendedBlock(lb.getBlock());
+      originalBlock = new ExtendedBlock(blk);
+      originalGS = originalBlock.getGenerationStamp();
+
+      // test getBlockLocalPathInfo
+      final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token);
+      Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp());
+    }
+
+    { // append one byte
+      FSDataOutputStream out = dfs.append(path);
+      out.write(1);
+      out.close();
+    }
+
+    {
+      // get new generation stamp
+      final LocatedBlock lb = cluster.getNameNode().getRpcServer()
+          .getBlockLocations(path.toString(), 0, 1).get(0);
+      final long newGS = lb.getBlock().getGenerationStamp();
+      Assert.assertTrue(newGS > originalGS);
+
+      // getBlockLocalPathInfo using the original block.
+      Assert.assertEquals(originalGS, originalBlock.getGenerationStamp());
+      final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(
+          originalBlock, token);
+      Assert.assertEquals(newGS, info.getBlock().getGenerationStamp());
+    }
+    cluster.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java
new file mode 100644
index 0000000..c30aac8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+
+public class TestBlockReaderRemote extends TestBlockReaderBase {
+
+  HdfsConfiguration createConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java
new file mode 100644
index 0000000..34a1539
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+
+public class TestBlockReaderRemote2 extends TestBlockReaderBase {
+  HdfsConfiguration createConf() {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java
new file mode 100644
index 0000000..b6ec1bf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.client.impl;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestClientBlockVerification {
+
+  static BlockReaderTestUtil util = null;
+  static final Path TEST_FILE = new Path("/test.file");
+  static final int FILE_SIZE_K = 256;
+  static LocatedBlock testBlock = null;
+
+  static {
+    GenericTestUtils.setLogLevel(BlockReaderRemote2.LOG, Level.ALL);
+  }
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    final int REPLICATION_FACTOR = 1;
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+    util.writeFile(TEST_FILE, FILE_SIZE_K);
+    List<LocatedBlock> blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K);
+    testBlock = blkList.get(0);     // Use the first block to test
+  }
+
+  /**
+   * Verify that if we read an entire block, we send CHECKSUM_OK
+   */
+  @Test
+  public void testBlockVerification() throws Exception {
+    BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
+    verify(reader).sendReadResult(Status.CHECKSUM_OK);
+    reader.close();
+  }
+
+  /**
+   * Test that if we do an incomplete read, we don't call CHECKSUM_OK
+   */
+  @Test
+  public void testIncompleteRead() throws Exception {
+    BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
+
+    // We asked the blockreader for the whole file, and only read
+    // half of it, so no CHECKSUM_OK
+    verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
+    reader.close();
+  }
+
+  /**
+   * Test that if we ask for a half block, and read it all, we *do*
+   * send CHECKSUM_OK. The DN takes care of knowing whether it was
+   * the whole block or not.
+   */
+  @Test
+  public void testCompletePartialRead() throws Exception {
+    // Ask for half the file
+    BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+        util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
+    // And read half the file
+    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
+    verify(reader).sendReadResult(Status.CHECKSUM_OK);
+    reader.close();
+  }
+
+  /**
+   * Test various unaligned reads to make sure that we properly
+   * account even when we don't start or end on a checksum boundary
+   */
+  @Test
+  public void testUnalignedReads() throws Exception {
+    int startOffsets[] = new int[] { 0, 3, 129 };
+    int lengths[] = new int[] { 30, 300, 512, 513, 1025 };
+    for (int startOffset : startOffsets) {
+      for (int length : lengths) {
+        DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
+                           " len=" + length);
+        BlockReaderRemote2 reader = (BlockReaderRemote2)spy(
+            util.getBlockReader(testBlock, startOffset, length));
+        util.readAndCheckEOS(reader, length, true);
+        verify(reader).sendReadResult(Status.CHECKSUM_OK);
+        reader.close();
+      }
+    }
+  }
+
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index 763aeb5..4f74c7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
-import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 5acbeb0..379e9e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
-import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index e5acb1c..6557055 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
-import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
index a6d972a..ce37abd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
index 588f209..5dd192f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
@@ -55,12 +55,11 @@ import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index e395fca..f788613 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -39,8 +39,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.BlockReaderFactory;
-import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5dab680/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
index ca8ba52..1abc44d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.TestBlockReaderLocal;
+import org.apache.hadoop.hdfs.client.impl.TestBlockReaderLocal;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message