hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [05/18] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS.
Date Mon, 10 Aug 2015 21:03:52 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
new file mode 100644
index 0000000..baf6106
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestDFSStripedInputStream {
+
+  public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf = new Configuration();
+  private DistributedFileSystem fs;
+  private final Path dirPath = new Path("/striped");
+  private Path filePath = new Path(dirPath, "file");
+  private final ECSchema schema = ErasureCodingSchemaManager.getSystemDefaultSchema();
+  private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int NUM_STRIPE_PER_BLOCK = 2;
+  private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
+  private final int BLOCK_GROUP_SIZE =  DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
+    SimulatedFSDataset.setFactory(conf);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        DATA_BLK_NUM + PARITY_BLK_NUM).build();
+    cluster.waitActive();
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+    }
+    fs = cluster.getFileSystem();
+    fs.mkdirs(dirPath);
+    fs.getClient().createErasureCodingZone(dirPath.toString(), null, CELLSIZE);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test {@link DFSStripedInputStream#getBlockAt(long)}
+   */
+  @Test
+  public void testRefreshBlock() throws Exception {
+    final int numBlocks = 4;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
+    final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
+        filePath.toString(), false, schema, CELLSIZE, null);
+
+    List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+    for (LocatedBlock aLbList : lbList) {
+      LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
+      LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
+          CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        LocatedBlock refreshed = in.refreshLocatedBlock(blks[j]);
+        assertEquals(blks[j].getBlock(), refreshed.getBlock());
+        assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
+        assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
+      }
+    }
+  }
+
+  @Test
+  public void testPread() throws Exception {
+    final int numBlocks = 2;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
+    int fileLen = BLOCK_GROUP_SIZE * numBlocks;
+
+    byte[] expected = new byte[fileLen];
+    assertEquals(numBlocks, lbs.getLocatedBlocks().size());
+    for (int bgIdx = 0; bgIdx < numBlocks; bgIdx++) {
+      LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(bgIdx));
+      for (int i = 0; i < DATA_BLK_NUM; i++) {
+        Block blk = new Block(bg.getBlock().getBlockId() + i,
+            NUM_STRIPE_PER_BLOCK * CELLSIZE,
+            bg.getBlock().getGenerationStamp());
+        blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+        cluster.injectBlocks(i, Arrays.asList(blk),
+            bg.getBlock().getBlockPoolId());
+      }
+
+      /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+      for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+        for (int j = 0; j < DATA_BLK_NUM; j++) {
+          for (int k = 0; k < CELLSIZE; k++) {
+            int posInBlk = i * CELLSIZE + k;
+            int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+            expected[bgIdx*BLOCK_GROUP_SIZE + posInFile] =
+                SimulatedFSDataset.simulatedByte(
+                    new Block(bg.getBlock().getBlockId() + j), posInBlk);
+          }
+        }
+      }
+    }
+    DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
+        filePath.toString(), false, schema, CELLSIZE, null);
+
+    int[] startOffsets = {0, 1, CELLSIZE - 102, CELLSIZE, CELLSIZE + 102,
+        CELLSIZE*DATA_BLK_NUM, CELLSIZE*DATA_BLK_NUM + 102,
+        BLOCK_GROUP_SIZE - 102, BLOCK_GROUP_SIZE, BLOCK_GROUP_SIZE + 102,
+        fileLen - 1};
+    for (int startOffset : startOffsets) {
+      startOffset = Math.max(0, Math.min(startOffset, fileLen - 1));
+      int remaining = fileLen - startOffset;
+      byte[] buf = new byte[fileLen];
+      int ret = in.read(startOffset, buf, 0, fileLen);
+      assertEquals(remaining, ret);
+      for (int i = 0; i < remaining; i++) {
+        Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
+                "same",
+            expected[startOffset + i], buf[i]);
+      }
+    }
+    in.close();
+  }
+
+  @Test
+  public void testPreadWithDNFailure() throws Exception {
+    final int numBlocks = 4;
+    final int failedDNIdx = 2;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCK_GROUP_SIZE);
+
+    assert lbs.get(0) instanceof LocatedStripedBlock;
+    LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+    for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
+      Block blk = new Block(bg.getBlock().getBlockId() + i,
+          NUM_STRIPE_PER_BLOCK * CELLSIZE,
+          bg.getBlock().getGenerationStamp());
+      blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+      cluster.injectBlocks(i, Arrays.asList(blk),
+          bg.getBlock().getBlockPoolId());
+    }
+    DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
+            ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE, null);
+    int readSize = BLOCK_GROUP_SIZE;
+    byte[] readBuffer = new byte[readSize];
+    byte[] expected = new byte[readSize];
+    cluster.stopDataNode(failedDNIdx);
+    /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+    for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        for (int k = 0; k < CELLSIZE; k++) {
+          int posInBlk = i * CELLSIZE + k;
+          int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+          expected[posInFile] = SimulatedFSDataset.simulatedByte(
+              new Block(bg.getBlock().getBlockId() + j), posInBlk);
+        }
+      }
+    }
+
+    // Update the expected content for decoded data
+    for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+      byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
+      int[] missingBlkIdx = new int[]{failedDNIdx + PARITY_BLK_NUM, 1, 2};
+      byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
+        if (j != failedDNIdx) {
+          System.arraycopy(expected, posInBuf, decodeInputs[j + PARITY_BLK_NUM],
+              0, CELLSIZE);
+        }
+      }
+      for (int k = 0; k < CELLSIZE; k++) {
+        int posInBlk = i * CELLSIZE + k;
+        decodeInputs[0][k] = SimulatedFSDataset.simulatedByte(
+            new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
+      }
+      for (int m : missingBlkIdx) {
+        decodeInputs[m] = null;
+      }
+      RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf,
+          DATA_BLK_NUM, PARITY_BLK_NUM);
+      rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+      int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
+      System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
+    }
+    int delta = 10;
+    int done = 0;
+    // read a small delta, shouldn't trigger decode
+    // |cell_0 |
+    // |10     |
+    done += in.read(0, readBuffer, 0, delta);
+    assertEquals(delta, done);
+    // both head and trail cells are partial
+    // |c_0      |c_1    |c_2 |c_3 |c_4      |c_5         |
+    // |256K - 10|missing|256K|256K|256K - 10|not in range|
+    done += in.read(delta, readBuffer, delta,
+        CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
+    assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+    // read the rest
+    done += in.read(done, readBuffer, done, readSize - done);
+    assertEquals(readSize, done);
+    assertArrayEquals(expected, readBuffer);
+  }
+
+  @Test
+  public void testStatefulRead() throws Exception {
+    testStatefulRead(false, false);
+    testStatefulRead(true, false);
+    testStatefulRead(true, true);
+  }
+
+  private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
+      throws Exception {
+    final int numBlocks = 2;
+    final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
+    if (cellMisalignPacket) {
+      conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
+      tearDown();
+      setup();
+    }
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize);
+
+    assert lbs.getLocatedBlocks().size() == numBlocks;
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
+      LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
+      for (int i = 0; i < DATA_BLK_NUM; i++) {
+        Block blk = new Block(bg.getBlock().getBlockId() + i,
+            NUM_STRIPE_PER_BLOCK * CELLSIZE,
+            bg.getBlock().getGenerationStamp());
+        blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+        cluster.injectBlocks(i, Arrays.asList(blk),
+            bg.getBlock().getBlockPoolId());
+      }
+    }
+
+    DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(),
+            false, schema, CELLSIZE, null);
+
+    byte[] expected = new byte[fileSize];
+
+    for (LocatedBlock bg : lbs.getLocatedBlocks()) {
+      /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+      for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+        for (int j = 0; j < DATA_BLK_NUM; j++) {
+          for (int k = 0; k < CELLSIZE; k++) {
+            int posInBlk = i * CELLSIZE + k;
+            int posInFile = (int) bg.getStartOffset() +
+                i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+            expected[posInFile] = SimulatedFSDataset.simulatedByte(
+                new Block(bg.getBlock().getBlockId() + j), posInBlk);
+          }
+        }
+      }
+    }
+
+    if (useByteBuffer) {
+      ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
+      int done = 0;
+      while (done < fileSize) {
+        int ret = in.read(readBuffer);
+        assertTrue(ret > 0);
+        done += ret;
+      }
+      assertArrayEquals(expected, readBuffer.array());
+    } else {
+      byte[] readBuffer = new byte[fileSize];
+      int done = 0;
+      while (done < fileSize) {
+        int ret = in.read(readBuffer, done, fileSize - done);
+        assertTrue(ret > 0);
+        done += ret;
+      }
+      assertArrayEquals(expected, readBuffer);
+    }
+    fs.delete(filePath, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
new file mode 100644
index 0000000..3f40dee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDFSStripedOutputStream {
+  public static final Log LOG = LogFactory.getLog(
+      TestDFSStripedOutputStream.class);
+
+  static {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+  }
+
+  private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private Configuration conf;
+  private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int stripesPerBlock = 4;
+  private final int blockSize = cellSize * stripesPerBlock;
+
+  @Before
+  public void setup() throws IOException {
+    int numDNs = dataBlocks + parityBlocks + 2;
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0);
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileEmpty() throws IOException {
+    testOneFile("/EmptyFile", 0);
+  }
+
+  @Test
+  public void testFileSmallerThanOneCell1() throws IOException {
+    testOneFile("/SmallerThanOneCell", 1);
+  }
+
+  @Test
+  public void testFileSmallerThanOneCell2() throws IOException {
+    testOneFile("/SmallerThanOneCell", cellSize - 1);
+  }
+
+  @Test
+  public void testFileEqualsWithOneCell() throws IOException {
+    testOneFile("/EqualsWithOneCell", cellSize);
+  }
+
+  @Test
+  public void testFileSmallerThanOneStripe1() throws IOException {
+    testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
+  }
+
+  @Test
+  public void testFileSmallerThanOneStripe2() throws IOException {
+    testOneFile("/SmallerThanOneStripe", cellSize + 123);
+  }
+
+  @Test
+  public void testFileEqualsWithOneStripe() throws IOException {
+    testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
+  }
+
+  @Test
+  public void testFileMoreThanOneStripe1() throws IOException {
+    testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void testFileMoreThanOneStripe2() throws IOException {
+    testOneFile("/MoreThanOneStripe2", cellSize * dataBlocks
+            + cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void testFileLessThanFullBlockGroup() throws IOException {
+    testOneFile("/LessThanFullBlockGroup",
+        cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
+  }
+
+  @Test
+  public void testFileFullBlockGroup() throws IOException {
+    testOneFile("/FullBlockGroup", blockSize * dataBlocks);
+  }
+
+  @Test
+  public void testFileMoreThanABlockGroup1() throws IOException {
+    testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void testFileMoreThanABlockGroup2() throws IOException {
+    testOneFile("/MoreThanABlockGroup2",
+        blockSize * dataBlocks + cellSize+ 123);
+  }
+
+
+  @Test
+  public void testFileMoreThanABlockGroup3() throws IOException {
+    testOneFile("/MoreThanABlockGroup3",
+        blockSize * dataBlocks * 3 + cellSize * dataBlocks
+        + cellSize + 123);
+  }
+
+  private byte[] generateBytes(int cnt) {
+    byte[] bytes = new byte[cnt];
+    for (int i = 0; i < cnt; i++) {
+      bytes[i] = getByte(i);
+    }
+    return bytes;
+  }
+
+  private byte getByte(long pos) {
+    int mod = 29;
+    return (byte) (pos % mod + 1);
+  }
+
+  private void testOneFile(String src, int writeBytes) throws IOException {
+    Path testPath = new Path(src);
+
+    byte[] bytes = generateBytes(writeBytes);
+    DFSTestUtil.writeFile(fs, testPath, new String(bytes));
+
+    // check file length
+    FileStatus status = fs.getFileStatus(testPath);
+    Assert.assertEquals(writeBytes, status.getLen());
+
+    checkData(src, writeBytes);
+  }
+
+  void checkData(String src, int writeBytes) throws IOException {
+    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+    LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
+
+    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+      LocatedBlock[] blocks = StripedBlockUtil.
+          parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
+              cellSize, dataBlocks, parityBlocks);
+      List<LocatedBlock> oneGroup = Arrays.asList(blocks);
+      blockGroupList.add(oneGroup);
+    }
+
+    // test each block group
+    for (int group = 0; group < blockGroupList.size(); group++) {
+      //get the data of this block
+      List<LocatedBlock> blockList = blockGroupList.get(group);
+      byte[][] dataBlockBytes = new byte[dataBlocks][];
+      byte[][] parityBlockBytes = new byte[parityBlocks][];
+
+      // for each block, use BlockReader to read data
+      for (int i = 0; i < blockList.size(); i++) {
+        LocatedBlock lblock = blockList.get(i);
+        if (lblock == null) {
+          continue;
+        }
+        ExtendedBlock block = lblock.getBlock();
+        byte[] blockBytes = new byte[(int)block.getNumBytes()];
+        if (i < dataBlocks) {
+          dataBlockBytes[i] = blockBytes;
+        } else {
+          parityBlockBytes[i - dataBlocks] = blockBytes;
+        }
+
+        if (block.getNumBytes() == 0) {
+          continue;
+        }
+
+        final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+            fs, lblock, 0, block.getNumBytes());
+        blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+        blockReader.close();
+      }
+
+      // check if we write the data correctly
+      for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
+           blkIdxInGroup++) {
+        final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
+        if (actualBlkBytes == null) {
+          continue;
+        }
+        for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
+          // calculate the position of this byte in the file
+          long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
+              dataBlocks, posInBlk, blkIdxInGroup) +
+              group * blockSize * dataBlocks;
+          Assert.assertTrue(posInFile < writeBytes);
+          final byte expected = getByte(posInFile);
+
+          String s = "Unexpected byte " + actualBlkBytes[posInBlk]
+              + ", expect " + expected
+              + ". Block group index is " + group
+              + ", stripe index is " + posInBlk / cellSize
+              + ", cell index is " + blkIdxInGroup
+              + ", byte index is " + posInBlk % cellSize;
+          Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
+        }
+      }
+
+      verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(),
+          cellSize, dataBlockBytes, parityBlockBytes);
+    }
+  }
+
+  void verifyParity(final long size, final int cellSize,
+      byte[][] dataBytes, byte[][] parityBytes) {
+    verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1);
+  }
+
+  static void verifyParity(Configuration conf, final long size,
+                           final int cellSize, byte[][] dataBytes,
+                           byte[][] parityBytes, int killedDnIndex) {
+    // verify the parity blocks
+    int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
+        size, cellSize, dataBytes.length, dataBytes.length);
+    final byte[][] expectedParityBytes = new byte[parityBytes.length][];
+    for (int i = 0; i < parityBytes.length; i++) {
+      expectedParityBytes[i] = new byte[parityBlkSize];
+    }
+    for (int i = 0; i < dataBytes.length; i++) {
+      if (dataBytes[i] == null) {
+        dataBytes[i] = new byte[dataBytes[0].length];
+      } else if (dataBytes[i].length < dataBytes[0].length) {
+        final byte[] tmp = dataBytes[i];
+        dataBytes[i] = new byte[dataBytes[0].length];
+        System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
+      }
+    }
+    final RawErasureEncoder encoder =
+            CodecUtil.createRSRawEncoder(conf,
+                dataBytes.length, parityBytes.length);
+    encoder.encode(dataBytes, expectedParityBytes);
+    for (int i = 0; i < parityBytes.length; i++) {
+      if (i != killedDnIndex) {
+        Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
+            expectedParityBytes[i], parityBytes[i]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
new file mode 100644
index 0000000..6594ae1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+public class TestDFSStripedOutputStreamWithFailure {
+  public static final Log LOG = LogFactory.getLog(
+      TestDFSStripedOutputStreamWithFailure.class);
+  static {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+  }
+
+  private static final int NUM_DATA_BLOCKS = HdfsConstants.NUM_DATA_BLOCKS;
+  private static final int NUM_PARITY_BLOCKS = HdfsConstants.NUM_PARITY_BLOCKS;
+  private static final int CELL_SIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private static final int STRIPES_PER_BLOCK = 4;
+  private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
+  private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
+
+  private static final int FLUSH_POS
+      = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+  private final Path dir = new Path("/"
+      + TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
+
+  private void setup(Configuration conf) throws IOException {
+    final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+    dfs.mkdirs(dir);
+    dfs.createErasureCodingZone(dir, null, 0);
+  }
+
+  private void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private static byte getByte(long pos) {
+    return (byte)pos;
+  }
+
+  private void initConf(Configuration conf){
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+  }
+
+  private void initConfWithBlockToken(Configuration conf) {
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setInt("ipc.client.connect.max.retries", 0);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+  }
+
+  @Test(timeout=240000)
+  public void testDatanodeFailure() throws Exception {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    initConf(conf);
+    for (int dn = 0; dn < 9; dn++) {
+      try {
+        setup(conf);
+        cluster.startDataNodes(conf, 1, true, null, null);
+        cluster.waitActive();
+        runTest(new Path(dir, "file" + dn), length, length / 2, dn, false);
+      } catch (Exception e) {
+        LOG.error("failed, dn=" + dn + ", length=" + length);
+        throw e;
+      } finally {
+        tearDown();
+      }
+    }
+  }
+
+  @Test(timeout=240000)
+  public void testBlockTokenExpired() throws Exception {
+    final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    initConf(conf);
+    initConfWithBlockToken(conf);
+    for (int dn = 0; dn < 9; dn += 2) {
+      try {
+        setup(conf);
+        cluster.startDataNodes(conf, 1, true, null, null);
+        cluster.waitActive();
+        runTest(new Path(dir, "file" + dn), length, length / 2, dn, true);
+      } catch (Exception e) {
+        LOG.error("failed, dn=" + dn + ", length=" + length);
+        throw e;
+      } finally {
+        tearDown();
+      }
+    }
+  }
+
+  private void runTest(final Path p, final int length, final int killPos,
+      final int dnIndex, final boolean tokenExpire) throws Exception {
+    LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos
+        + ", dnIndex=" + dnIndex);
+    Preconditions.checkArgument(killPos < length);
+    Preconditions.checkArgument(killPos > FLUSH_POS);
+    final String fullPath = p.toString();
+
+    final NameNode nn = cluster.getNameNode();
+    final BlockManager bm = nn.getNamesystem().getBlockManager();
+    final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
+
+    if (tokenExpire) {
+      // set a short token lifetime (1 second)
+      SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
+    }
+
+    final AtomicInteger pos = new AtomicInteger();
+    final FSDataOutputStream out = dfs.create(p);
+    final DFSStripedOutputStream stripedOut
+        = (DFSStripedOutputStream)out.getWrappedStream();
+
+    long oldGS = -1;
+    boolean killed = false;
+    for(; pos.get() < length; ) {
+      final int i = pos.getAndIncrement();
+      if (i == killPos) {
+        final long gs = getGenerationStamp(stripedOut);
+        Assert.assertTrue(oldGS != -1);
+        Assert.assertEquals(oldGS, gs);
+
+        if (tokenExpire) {
+          DFSTestUtil.flushInternal(stripedOut);
+          waitTokenExpires(out);
+        }
+
+        StripedFileTestUtil.killDatanode(cluster, stripedOut, dnIndex, pos);
+        killed = true;
+      }
+
+      write(out, i);
+
+      if (i == FLUSH_POS) {
+        oldGS = getGenerationStamp(stripedOut);
+      }
+    }
+    out.close();
+    Assert.assertTrue(killed);
+
+    // check file length
+    final FileStatus status = dfs.getFileStatus(p);
+    Assert.assertEquals(length, status.getLen());
+
+    checkData(dfs, fullPath, length, dnIndex, oldGS);
+  }
+
+  static void write(FSDataOutputStream out, int i) throws IOException {
+    try {
+      out.write(getByte(i));
+    } catch(IOException ioe) {
+      throw new IOException("Failed at i=" + i, ioe);
+    }
+  }
+
+  static long getGenerationStamp(DFSStripedOutputStream out)
+      throws IOException {
+    final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp();
+    LOG.info("getGenerationStamp returns " + gs);
+    return gs;
+
+  }
+
+  static void checkData(DistributedFileSystem dfs, String src, int length,
+      int killedDnIndex, long oldGS) throws IOException {
+    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+    LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
+    final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
+    Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
+
+    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+
+      final long gs = firstBlock.getBlock().getGenerationStamp();
+      final String s = "gs=" + gs + ", oldGS=" + oldGS;
+      LOG.info(s);
+      Assert.assertTrue(s, gs > oldGS);
+
+      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+          (LocatedStripedBlock) firstBlock,
+          CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+      blockGroupList.add(Arrays.asList(blocks));
+    }
+
+    // test each block group
+    for (int group = 0; group < blockGroupList.size(); group++) {
+      final boolean isLastGroup = group == blockGroupList.size() - 1;
+      final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
+          : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
+      final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1;
+      final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
+      final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
+
+      //get the data of this block
+      List<LocatedBlock> blockList = blockGroupList.get(group);
+      byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
+      byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
+
+      // for each block, use BlockReader to read data
+      for (int i = 0; i < blockList.size(); i++) {
+        final int j = i >= NUM_DATA_BLOCKS? 0: i;
+        final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+            + (j <= lastCellIndex? 1: 0);
+        final int blockSize = numCellInBlock*CELL_SIZE
+            + (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0);
+
+        final byte[] blockBytes = new byte[blockSize];
+        if (i < NUM_DATA_BLOCKS) {
+          dataBlockBytes[i] = blockBytes;
+        } else {
+          parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
+        }
+
+        final LocatedBlock lb = blockList.get(i);
+        LOG.info("XXX i=" + i + ", lb=" + lb);
+        if (lb == null) {
+          continue;
+        }
+        final ExtendedBlock block = lb.getBlock();
+        Assert.assertEquals(blockSize, block.getNumBytes());
+
+
+        if (block.getNumBytes() == 0) {
+          continue;
+        }
+
+        if (i != killedDnIndex) {
+          final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+              dfs, lb, 0, block.getNumBytes());
+          blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+          blockReader.close();
+        }
+      }
+
+      // check data
+      final int groupPosInFile = group*BLOCK_GROUP_SIZE;
+      for (int i = 0; i < dataBlockBytes.length; i++) {
+        final byte[] actual = dataBlockBytes[i];
+        for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
+          final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
+              CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
+          Assert.assertTrue(posInFile < length);
+          final byte expected = getByte(posInFile);
+
+          if (i == killedDnIndex) {
+            actual[posInBlk] = expected;
+          } else {
+            String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+                + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+                + ". group=" + group + ", i=" + i;
+            Assert.assertEquals(s, expected, actual[posInBlk]);
+          }
+        }
+      }
+
+      // check parity
+      TestDFSStripedOutputStream.verifyParity(dfs.getConf(),
+          lbs.getLocatedBlocks().get(group).getBlockSize(),
+          CELL_SIZE, dataBlockBytes, parityBlockBytes,
+          killedDnIndex - dataBlockBytes.length);
+    }
+  }
+
+  private void waitTokenExpires(FSDataOutputStream out) throws IOException {
+    Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
+    while (!SecurityTestUtil.isBlockTokenExpired(token)) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ignored) {
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index a821c30..4233a1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -110,7 +110,7 @@ public class TestDFSUtil {
     l2.setCorrupt(true);
 
     List<LocatedBlock> ls = Arrays.asList(l1, l2);
-    LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null);
+    LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null, null, 0);
 
     BlockLocation[] bs = DFSUtilClient.locatedBlocks2Locations(lbs);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
new file mode 100644
index 0000000..88198c9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestECSchemas {
+  private MiniDFSCluster cluster;
+
+  @Before
+  public void before() throws IOException {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(0)
+        .build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void after() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testGetECSchemas() throws Exception {
+    ECSchema[] ecSchemas = cluster.getFileSystem().getClient().getECSchemas();
+    assertNotNull(ecSchemas);
+    assertTrue("Should have at least one schema", ecSchemas.length > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
index 567a70a..8724ed5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java
@@ -743,7 +743,7 @@ public class TestEncryptionZones {
             version, new byte[suite.getAlgorithmBlockSize()],
             new byte[suite.getAlgorithmBlockSize()],
             "fakeKey", "fakeVersion"),
-            (byte) 0))
+            (byte) 0, null, 0))
         .when(mcp)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
new file mode 100644
index 0000000..1a10ebf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.junit.Assert.*;
+
+public class TestErasureCodingZones {
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private static final int BLOCK_SIZE = 1024;
+  private FSNamesystem namesystem;
+
+  @Before
+  public void setupCluster() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).
+        numDataNodes(1).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    namesystem = cluster.getNamesystem();
+  }
+
+  @After
+  public void shutdownCluster() throws IOException {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testCreateECZone()
+      throws IOException, InterruptedException {
+    final Path testDir = new Path("/ec");
+    fs.mkdir(testDir, FsPermission.getDirDefault());
+
+    /* Normal creation of an erasure coding zone */
+    fs.getClient().createErasureCodingZone(testDir.toString(), null, 0);
+
+    /* Verify files under the zone are striped */
+    final Path ECFilePath = new Path(testDir, "foo");
+    fs.create(ECFilePath);
+    INode inode = namesystem.getFSDirectory().getINode(ECFilePath.toString());
+    assertTrue(inode.asFile().isStriped());
+
+    /* Verify that EC zone cannot be created on non-empty dir */
+    final Path notEmpty = new Path("/nonEmpty");
+    fs.mkdir(notEmpty, FsPermission.getDirDefault());
+    fs.create(new Path(notEmpty, "foo"));
+    try {
+      fs.getClient().createErasureCodingZone(notEmpty.toString(), null, 0);
+      fail("Erasure coding zone on non-empty dir");
+    } catch (IOException e) {
+      assertExceptionContains("erasure coding zone for a non-empty directory", e);
+    }
+
+    /* Verify that nested EC zones cannot be created */
+    final Path zone1 = new Path("/zone1");
+    final Path zone2 = new Path(zone1, "zone2");
+    fs.mkdir(zone1, FsPermission.getDirDefault());
+    fs.getClient().createErasureCodingZone(zone1.toString(), null, 0);
+    fs.mkdir(zone2, FsPermission.getDirDefault());
+    try {
+      fs.getClient().createErasureCodingZone(zone2.toString(), null, 0);
+      fail("Nested erasure coding zones");
+    } catch (IOException e) {
+      assertExceptionContains("already in an erasure coding zone", e);
+    }
+
+    /* Verify that EC zone cannot be created on a file */
+    final Path fPath = new Path("/file");
+    fs.create(fPath);
+    try {
+      fs.getClient().createErasureCodingZone(fPath.toString(), null, 0);
+      fail("Erasure coding zone on file");
+    } catch (IOException e) {
+      assertExceptionContains("erasure coding zone for a file", e);
+    }
+  }
+
+  @Test
+  public void testMoveValidity() throws IOException, InterruptedException {
+    final Path srcECDir = new Path("/srcEC");
+    final Path dstECDir = new Path("/dstEC");
+    fs.mkdir(srcECDir, FsPermission.getDirDefault());
+    fs.mkdir(dstECDir, FsPermission.getDirDefault());
+    fs.getClient().createErasureCodingZone(srcECDir.toString(), null, 0);
+    fs.getClient().createErasureCodingZone(dstECDir.toString(), null, 0);
+    final Path srcFile = new Path(srcECDir, "foo");
+    fs.create(srcFile);
+
+    // Test move dir
+    // Move EC dir under non-EC dir
+    final Path newDir = new Path("/srcEC_new");
+    fs.rename(srcECDir, newDir);
+    fs.rename(newDir, srcECDir); // move back
+
+    // Move EC dir under another EC dir
+    fs.rename(srcECDir, dstECDir);
+    fs.rename(new Path("/dstEC/srcEC"), srcECDir); // move back
+
+    // Test move file
+    /* Verify that a file can be moved between 2 EC zones */
+    fs.rename(srcFile, dstECDir);
+    fs.rename(new Path(dstECDir, "foo"), srcECDir); // move back
+
+    /* Verify that a file cannot be moved from a non-EC dir to an EC zone */
+    final Path nonECDir = new Path("/nonEC");
+    fs.mkdir(nonECDir, FsPermission.getDirDefault());
+    try {
+      fs.rename(srcFile, nonECDir);
+      fail("A file shouldn't be able to move from a non-EC dir to an EC zone");
+    } catch (IOException e) {
+      assertExceptionContains("can't be moved because the source and " +
+          "destination have different erasure coding policies", e);
+    }
+
+    /* Verify that a file cannot be moved from an EC zone to a non-EC dir */
+    final Path nonECFile = new Path(nonECDir, "nonECFile");
+    fs.create(nonECFile);
+    try {
+      fs.rename(nonECFile, dstECDir);
+    } catch (IOException e) {
+      assertExceptionContains("can't be moved because the source and " +
+          "destination have different erasure coding policies", e);
+    }
+  }
+
+  @Test
+  public void testReplication() throws IOException {
+    final Path testDir = new Path("/ec");
+    fs.mkdir(testDir, FsPermission.getDirDefault());
+    fs.createErasureCodingZone(testDir, null, 0);
+    final Path fooFile = new Path(testDir, "foo");
+    // create ec file with replication=0
+    fs.create(fooFile, FsPermission.getFileDefault(), true,
+        conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short)0, fs.getDefaultBlockSize(fooFile), null);
+
+    try {
+      fs.setReplication(fooFile, (short) 3);
+      fail("Shouldn't allow to set replication to a file with striped blocks");
+    } catch (IOException e) {
+      assertExceptionContains(
+          "Cannot set replication to a file with striped blocks", e);
+    }
+  }
+
+  @Test
+  public void testGetErasureCodingInfoWithSystemDefaultSchema() throws Exception {
+    String src = "/ec";
+    final Path ecDir = new Path(src);
+    fs.mkdir(ecDir, FsPermission.getDirDefault());
+    // dir ECInfo before creating ec zone
+    assertNull(fs.getClient().getFileInfo(src).getECSchema());
+    // dir ECInfo after creating ec zone
+    fs.getClient().createErasureCodingZone(src, null, 0); //Default one will be used.
+    ECSchema sysDefaultSchema = ErasureCodingSchemaManager.getSystemDefaultSchema();
+    verifyErasureCodingInfo(src, sysDefaultSchema);
+    fs.create(new Path(ecDir, "child1")).close();
+    // verify for the files in ec zone
+    verifyErasureCodingInfo(src + "/child1", sysDefaultSchema);
+  }
+
+  @Test
+  public void testGetErasureCodingInfo() throws Exception {
+    ECSchema[] sysSchemas = ErasureCodingSchemaManager.getSystemSchemas();
+    assertTrue("System schemas should be of only 1 for now",
+        sysSchemas.length == 1);
+
+    ECSchema usingSchema = sysSchemas[0];
+    String src = "/ec2";
+    final Path ecDir = new Path(src);
+    fs.mkdir(ecDir, FsPermission.getDirDefault());
+    // dir ECInfo before creating ec zone
+    assertNull(fs.getClient().getFileInfo(src).getECSchema());
+    // dir ECInfo after creating ec zone
+    fs.getClient().createErasureCodingZone(src, usingSchema, 0);
+    verifyErasureCodingInfo(src, usingSchema);
+    fs.create(new Path(ecDir, "child1")).close();
+    // verify for the files in ec zone
+    verifyErasureCodingInfo(src + "/child1", usingSchema);
+  }
+
+  private void verifyErasureCodingInfo(
+      String src, ECSchema usingSchema) throws IOException {
+    HdfsFileStatus hdfsFileStatus = fs.getClient().getFileInfo(src);
+    ECSchema schema = hdfsFileStatus.getECSchema();
+    assertNotNull(schema);
+    assertEquals("Actually used schema should be equal with target schema",
+        usingSchema, schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java
new file mode 100644
index 0000000..3c400b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFileStatusWithECschema {
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private DFSClient client;
+
+  @Before
+  public void before() throws IOException {
+    cluster =
+        new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+  }
+
+  @After
+  public void after() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileStatusWithECschema() throws Exception {
+    // test directory not in EC zone
+    final Path dir = new Path("/foo");
+    assertTrue(fs.mkdir(dir, FsPermission.getDirDefault()));
+    assertNull(client.getFileInfo(dir.toString()).getECSchema());
+    // test file not in EC zone
+    final Path file = new Path(dir, "foo");
+    fs.create(file).close();
+    assertNull(client.getFileInfo(file.toString()).getECSchema());
+    fs.delete(file, true);
+
+    final ECSchema schema1 = ErasureCodingSchemaManager.getSystemDefaultSchema();
+    // create EC zone on dir
+    fs.createErasureCodingZone(dir, schema1, 0);
+    final ECSchema schame2 = client.getFileInfo(dir.toUri().getPath()).getECSchema();
+    assertNotNull(schame2);
+    assertTrue(schema1.equals(schame2));
+
+    // test file in EC zone
+    fs.create(file).close();
+    final ECSchema schame3 =
+        fs.getClient().getFileInfo(file.toUri().getPath()).getECSchema();
+    assertNotNull(schame3);
+    assertTrue(schema1.equals(schame3));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
index 985f43e..d0cd335 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java
@@ -354,12 +354,12 @@ public class TestLease {
     Mockito.doReturn(
         new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
             (short) 777), "owner", "group", new byte[0], new byte[0],
-            1010, 0, null, (byte) 0)).when(mcp).getFileInfo(anyString());
+            1010, 0, null, (byte) 0, null, 0)).when(mcp).getFileInfo(anyString());
     Mockito
         .doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0, null, (byte) 0))
+                1010, 0, null, (byte) 0, null, 0))
         .when(mcp)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
new file mode 100644
index 0000000..1719d3f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
+
+public class TestReadStripedFileWithDecoding {
+  static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private final int smallFileLength = blockSize * dataBlocks - 123;
+  private final int largeFileLength = blockSize * dataBlocks + 123;
+  private final int[] fileLengths = {smallFileLength, largeFileLength};
+  private final int[] dnFailureNums = {1, 2, 3};
+
+  @Before
+  public void setup() throws IOException {
+    cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
+        .numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Shutdown tolerable number of Datanode before reading.
+   * Verify the decoding works correctly.
+   */
+  @Test(timeout=300000)
+  public void testReadWithDNFailure() throws IOException {
+    for (int fileLength : fileLengths) {
+      for (int dnFailureNum : dnFailureNums) {
+        try {
+          // setup a new cluster with no dead datanode
+          setup();
+          testReadWithDNFailure(fileLength, dnFailureNum);
+        } catch (IOException ioe) {
+          String fileType = fileLength < (blockSize * dataBlocks) ?
+              "smallFile" : "largeFile";
+          LOG.error("Failed to read file with DN failure:"
+              + " fileType = "+ fileType
+              + ", dnFailureNum = " + dnFailureNum);
+        } finally {
+          // tear down the cluster
+          tearDown();
+        }
+      }
+    }
+  }
+
+  /**
+   * Corrupt tolerable number of block before reading.
+   * Verify the decoding works correctly.
+   */
+  @Test(timeout=300000)
+  public void testReadCorruptedData() throws IOException {
+    for (int fileLength : fileLengths) {
+      for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
+        for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
+          String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
+          testReadWithBlockCorrupted(src, fileLength,
+              dataDelNum, parityDelNum, false);
+        }
+      }
+    }
+  }
+
+  /**
+   * Delete tolerable number of block before reading.
+   * Verify the decoding works correctly.
+   */
+  @Test(timeout=300000)
+  public void testReadCorruptedDataByDeleting() throws IOException {
+    for (int fileLength : fileLengths) {
+      for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
+        for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
+          String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
+          testReadWithBlockCorrupted(src, fileLength,
+              dataDelNum, parityDelNum, true);
+        }
+      }
+    }
+  }
+
+  private int findFirstDataNode(Path file, long length) throws IOException {
+    BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length);
+    String name = (locs[0].getNames())[0];
+    int dnIndex = 0;
+    for (DataNode dn : cluster.getDataNodes()) {
+      int port = dn.getXferPort();
+      if (name.contains(Integer.toString(port))) {
+        return dnIndex;
+      }
+      dnIndex++;
+    }
+    return -1;
+  }
+
+  private void verifyRead(Path testPath, int length, byte[] expected)
+      throws IOException {
+    byte[] buffer = new byte[length + 100];
+    StripedFileTestUtil.verifyLength(fs, testPath, length);
+    StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer);
+    StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer);
+    StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected,
+        ByteBuffer.allocate(length + 100));
+    StripedFileTestUtil.verifySeek(fs, testPath, length);
+  }
+
+  private void testReadWithDNFailure(int fileLength, int dnFailureNum)
+      throws IOException {
+    String fileType = fileLength < (blockSize * dataBlocks) ?
+        "smallFile" : "largeFile";
+    String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
+    LOG.info("testReadWithDNFailure: file = " + src
+        + ", fileSize = " + fileLength
+        + ", dnFailureNum = " + dnFailureNum);
+
+    Path testPath = new Path(src);
+    final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
+    DFSTestUtil.writeFile(fs, testPath, bytes);
+
+    // shut down the DN that holds an internal data block
+    BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
+        cellSize);
+    for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
+      String name = (locs[0].getNames())[failedDnIdx];
+      for (DataNode dn : cluster.getDataNodes()) {
+        int port = dn.getXferPort();
+        if (name.contains(Integer.toString(port))) {
+          dn.shutdown();
+        }
+      }
+    }
+
+    // check file length, pread, stateful read and seek
+    verifyRead(testPath, fileLength, bytes);
+  }
+
+  /**
+   * After reading a corrupted block, make sure the client can correctly report
+   * the corruption to the NameNode.
+   */
+  @Test
+  public void testReportBadBlock() throws IOException {
+    // create file
+    final Path file = new Path("/corrupted");
+    final int length = 10; // length of "corruption"
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(fs, file, bytes);
+
+    // corrupt the first data block
+    int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
+    Assert.assertNotEquals(-1, dnIndex);
+    LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
+        .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        cellSize, dataBlocks, parityBlocks);
+    // find the first block file
+    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
+    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile.exists());
+    // corrupt the block file
+    LOG.info("Deliberately corrupting file " + blkFile.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile)) {
+      out.write("corruption".getBytes());
+    }
+
+    // disable the heartbeat from DN so that the corrupted block record is kept
+    // in NameNode
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+    }
+
+    try {
+      // do stateful read
+      StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes,
+          ByteBuffer.allocate(1024));
+
+      // check whether the corruption has been reported to the NameNode
+      final FSNamesystem ns = cluster.getNamesystem();
+      final BlockManager bm = ns.getBlockManager();
+      BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
+          .asFile().getBlocks())[0];
+      Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
+    } finally {
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+      }
+    }
+  }
+
+  @Test
+  public void testInvalidateBlock() throws IOException {
+    final Path file = new Path("/invalidate");
+    final int length = 10;
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(fs, file, bytes);
+
+    int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
+    Assert.assertNotEquals(-1, dnIndex);
+    LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
+        .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        cellSize, dataBlocks, parityBlocks);
+    final Block b = blks[0].getBlock().getLocalBlock();
+
+    DataNode dn = cluster.getDataNodes().get(dnIndex);
+    // disable the heartbeat from DN so that the invalidated block record is kept
+    // in NameNode until heartbeat expires and NN mark the dn as dead
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+
+    try {
+      // delete the file
+      fs.delete(file, true);
+      // check the block is added to invalidateBlocks
+      final FSNamesystem fsn = cluster.getNamesystem();
+      final BlockManager bm = fsn.getBlockManager();
+      DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+      Assert.assertTrue(bm.containsInvalidateBlock(
+          blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
+    } finally {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    }
+  }
+
+  /**
+   * Test reading a file with some blocks(data blocks or parity blocks or both)
+   * deleted or corrupted.
+   * @param src file path
+   * @param fileLength file length
+   * @param dataBlkDelNum the deleted or corrupted number of data blocks.
+   * @param parityBlkDelNum the deleted or corrupted number of parity blocks.
+   * @param deleteBlockFile whether block file is deleted or corrupted.
+   *                        true is to delete the block file.
+   *                        false is to corrupt the content of the block file.
+   * @throws IOException
+   */
+  private void testReadWithBlockCorrupted(String src, int fileLength,
+      int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
+      throws IOException {
+    LOG.info("testReadWithBlockCorrupted: file = " + src
+        + ", dataBlkDelNum = " + dataBlkDelNum
+        + ", parityBlkDelNum = " + parityBlkDelNum
+        + ", deleteBlockFile? " + deleteBlockFile);
+    int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
+    Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
+        dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
+    Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
+        "should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks);
+
+    // write a file with the length of writeLen
+    Path srcPath = new Path(src);
+    final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
+    DFSTestUtil.writeFile(fs, srcPath, bytes);
+
+    // delete or corrupt some blocks
+    corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile);
+
+    // check the file can be read after some blocks were deleted
+    verifyRead(srcPath, fileLength, bytes);
+  }
+
+  private void corruptBlocks(Path srcPath, int dataBlkDelNum,
+      int parityBlkDelNum, boolean deleteBlockFile) throws IOException {
+    int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
+
+    LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath);
+    LocatedStripedBlock lastBlock =
+        (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+
+    int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
+        dataBlkDelNum);
+    Assert.assertNotNull(delDataBlkIndices);
+    int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks,
+        dataBlocks + parityBlocks, parityBlkDelNum);
+    Assert.assertNotNull(delParityBlkIndices);
+
+    int[] delBlkIndices = new int[recoverBlkNum];
+    System.arraycopy(delDataBlkIndices, 0,
+        delBlkIndices, 0, delDataBlkIndices.length);
+    System.arraycopy(delParityBlkIndices, 0,
+        delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
+
+    ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
+    for (int i = 0; i < recoverBlkNum; i++) {
+      delBlocks[i] = StripedBlockUtil
+          .constructInternalBlock(lastBlock.getBlock(),
+              cellSize, dataBlocks, delBlkIndices[i]);
+      if (deleteBlockFile) {
+        // delete the block file
+        cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
+      } else {
+        // corrupt the block file
+        cluster.corruptBlockOnDataNodes(delBlocks[i]);
+      }
+    }
+  }
+
+  private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException {
+    return fs.getClient().getLocatedBlocks(filePath.toString(),
+        0, Long.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java
new file mode 100644
index 0000000..4c2438d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+
+/**
+ * Test reading a striped file when some of its blocks are missing (not included
+ * in the block locations returned by the NameNode).
+ */
+public class TestReadStripedFileWithMissingBlocks {
+  public static final Log LOG = LogFactory
+      .getLog(TestReadStripedFileWithMissingBlocks.class);
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static Configuration conf = new HdfsConfiguration();
+  private final int fileLength = blockSize * dataBlocks + 123;
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks1() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 0);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks2() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 1);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks3() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 2);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks4() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 0);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks5() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 1);
+  }
+
+  @Test
+  public void testReadFileWithMissingBlocks6() throws IOException {
+    readFileWithMissingBlocks(new Path("/foo"), fileLength, 3, 0);
+  }
+
+  private void readFileWithMissingBlocks(Path srcPath, int fileLength,
+      int missingDataNum, int missingParityNum)
+      throws IOException {
+    LOG.info("readFileWithMissingBlocks: (" + missingDataNum + ","
+        + missingParityNum + ")");
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+    DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+    int dataBlocks = (fileLength - 1) / cellSize + 1;
+    BlockLocation[] locs = fs.getFileBlockLocations(srcPath, 0, cellSize);
+
+    int[] missingDataNodes = new int[missingDataNum + missingParityNum];
+    for (int i = 0; i < missingDataNum; i++) {
+      missingDataNodes[i] = i;
+    }
+    for (int i = 0; i < missingParityNum; i++) {
+      missingDataNodes[i + missingDataNum] = i +
+          Math.min(StripedFileTestUtil.dataBlocks, dataBlocks);
+    }
+    stopDataNodes(locs, missingDataNodes);
+
+    // make sure there are missing block locations
+    BlockLocation[] newLocs = fs.getFileBlockLocations(srcPath, 0, cellSize);
+    Assert.assertTrue(newLocs[0].getNames().length < locs[0].getNames().length);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+    // delete the file
+    fs.delete(srcPath, true);
+  }
+
+  private void stopDataNodes(BlockLocation[] locs, int[] datanodes)
+      throws IOException {
+    if (locs != null && locs.length > 0) {
+      for (int failedDNIdx : datanodes) {
+        String name = (locs[0].getNames())[failedDNIdx];
+        for (DataNode dn : cluster.getDataNodes()) {
+          int port = dn.getXferPort();
+          if (name.contains(Integer.toString(port))) {
+            dn.shutdown();
+            cluster.setDataNodeDead(dn.getDatanodeId());
+            LOG.info("stop datanode " + failedDNIdx);
+            break;
+          }
+        }
+      }
+    }
+  }
+}


Mime
View raw message