hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [28/52] [abbrv] hadoop git commit: HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.
Date Wed, 30 Sep 2015 18:23:02 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 8d4a0cf..12453fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -20,23 +20,35 @@ package org.apache.hadoop.hdfs;
 import com.google.common.base.Joiner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.junit.Assert;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
+
 public class StripedFileTestUtil {
   public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
   /*
@@ -50,8 +62,8 @@ public class StripedFileTestUtil {
   static final int stripesPerBlock = 4;
   static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
   static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
+  static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
 
-  static final Random random = new Random();
 
   static byte[] generateBytes(int cnt) {
     byte[] bytes = new byte[cnt];
@@ -61,6 +73,11 @@ public class StripedFileTestUtil {
     return bytes;
   }
 
+  static byte getByte(long pos) {
+    final int mod = 29;
+    return (byte) (pos % mod + 1);
+  }
+
   static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
     int readLen = 0;
     int ret;
@@ -71,15 +88,10 @@ public class StripedFileTestUtil {
     return readLen;
   }
 
-  static byte getByte(long pos) {
-    final int mod = 29;
-    return (byte) (pos % mod + 1);
-  }
-
   static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
       throws IOException {
     FileStatus status = fs.getFileStatus(srcPath);
-    Assert.assertEquals("File length should be the same", fileLength, status.getLen());
+    assertEquals("File length should be the same", fileLength, status.getLen());
   }
 
   static void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
@@ -101,9 +113,7 @@ public class StripedFileTestUtil {
           offset += target;
         }
         for (int i = 0; i < fileLength - startOffset; i++) {
-          Assert.assertEquals("Byte at " + (startOffset + i) + " is different, "
-                  + "the startOffset is " + startOffset,
-              expected[startOffset + i], result[i]);
+          assertEquals("Byte at " + (startOffset + i) + " is different, " + "the startOffset
is " + startOffset, expected[startOffset + i], result[i]);
         }
       }
     }
@@ -119,8 +129,7 @@ public class StripedFileTestUtil {
         System.arraycopy(buf, 0, result, readLen, ret);
         readLen += ret;
       }
-      Assert.assertEquals("The length of file should be the same to write size",
-          fileLength, readLen);
+      assertEquals("The length of file should be the same to write size", fileLength, readLen);
       Assert.assertArrayEquals(expected, result);
     }
   }
@@ -137,8 +146,7 @@ public class StripedFileTestUtil {
         result.put(buf);
         buf.clear();
       }
-      Assert.assertEquals("The length of file should be the same to write size",
-          fileLength, readLen);
+      assertEquals("The length of file should be the same to write size", fileLength, readLen);
       Assert.assertArrayEquals(expected, result.array());
     }
   }
@@ -199,10 +207,9 @@ public class StripedFileTestUtil {
     fsdis.seek(pos);
     byte[] buf = new byte[writeBytes];
     int readLen = StripedFileTestUtil.readAll(fsdis, buf);
-    Assert.assertEquals(readLen, writeBytes - pos);
+    assertEquals(readLen, writeBytes - pos);
     for (int i = 0; i < readLen; i++) {
-      Assert.assertEquals("Byte at " + i + " should be the same",
-          StripedFileTestUtil.getByte(pos + i), buf[i]);
+      assertEquals("Byte at " + i + " should be the same", StripedFileTestUtil.getByte(pos
+ i), buf[i]);
     }
   }
 
@@ -210,6 +217,7 @@ public class StripedFileTestUtil {
       final int dnIndex, final AtomicInteger pos) {
     final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
     final DatanodeInfo datanode = getDatanodes(s);
+    assert datanode != null;
     LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
     cluster.stopDataNode(datanode.getXferAddr());
   }
@@ -218,7 +226,7 @@ public class StripedFileTestUtil {
     for(;;) {
       final DatanodeInfo[] datanodes = streamer.getNodes();
       if (datanodes != null) {
-        Assert.assertEquals(1, datanodes.length);
+        assertEquals(1, datanodes.length);
         Assert.assertNotNull(datanodes[0]);
         return datanodes[0];
       }
@@ -287,7 +295,6 @@ public class StripedFileTestUtil {
    * @param min minimum of the range
    * @param max maximum of the range
    * @param n number to be generated
-   * @return
    */
   public static int[] randomArray(int min, int max, int n){
     if (n > (max - min + 1) || max < min || min < 0 || max < 0) {
@@ -315,4 +322,170 @@ public class StripedFileTestUtil {
     }
     return result;
   }
+
+  /**
+   * Verify that blocks in striped block group are on different nodes, and every
+   * internal blocks exists.
+   */
+  public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, int groupSize) {
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
+      HashSet<DatanodeInfo> locs = new HashSet<>();
+      Collections.addAll(locs, lb.getLocations());
+      assertEquals(groupSize, lb.getLocations().length);
+      assertEquals(groupSize, locs.size());
+
+      // verify that every internal blocks exists
+      int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+      assertEquals(groupSize, blockIndices.length);
+      HashSet<Integer> found = new HashSet<>();
+      for (int index : blockIndices) {
+        assert index >=0;
+        found.add(index);
+      }
+      assertEquals(groupSize, found.size());
+    }
+  }
+
+  static void checkData(DistributedFileSystem dfs, Path srcPath, int length,
+      int[] killedDnIndex, long oldGS) throws IOException {
+
+    StripedFileTestUtil.verifyLength(dfs, srcPath, length);
+    Arrays.sort(killedDnIndex);
+    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+    LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(srcPath.toString(), 0L,
+        Long.MAX_VALUE);
+    int expectedNumGroup = 0;
+    if (length > 0) {
+      expectedNumGroup = (length - 1) / BLOCK_GROUP_SIZE + 1;
+    }
+    assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
+
+    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+
+      final long gs = firstBlock.getBlock().getGenerationStamp();
+      final String s = "gs=" + gs + ", oldGS=" + oldGS;
+      LOG.info(s);
+      Assert.assertTrue(s, gs >= oldGS);
+
+      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+          (LocatedStripedBlock) firstBlock, BLOCK_STRIPED_CELL_SIZE,
+          NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+      blockGroupList.add(Arrays.asList(blocks));
+    }
+
+    // test each block group
+    for (int group = 0; group < blockGroupList.size(); group++) {
+      final boolean isLastGroup = group == blockGroupList.size() - 1;
+      final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
+          : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
+      final int numCellInGroup = (groupSize - 1)/BLOCK_STRIPED_CELL_SIZE + 1;
+      final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
+      final int lastCellSize = groupSize - (numCellInGroup - 1)*BLOCK_STRIPED_CELL_SIZE;
+
+      //get the data of this block
+      List<LocatedBlock> blockList = blockGroupList.get(group);
+      byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
+      byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
+
+      // for each block, use BlockReader to read data
+      for (int i = 0; i < blockList.size(); i++) {
+        final int j = i >= NUM_DATA_BLOCKS? 0: i;
+        final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+            + (j <= lastCellIndex? 1: 0);
+        final int blockSize = numCellInBlock*BLOCK_STRIPED_CELL_SIZE
+            + (isLastGroup && j == lastCellIndex? lastCellSize - BLOCK_STRIPED_CELL_SIZE:
0);
+
+        final byte[] blockBytes = new byte[blockSize];
+        if (i < NUM_DATA_BLOCKS) {
+          dataBlockBytes[i] = blockBytes;
+        } else {
+          parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
+        }
+
+        final LocatedBlock lb = blockList.get(i);
+        LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
+            + ", blockSize=" + blockSize + ", lb=" + lb);
+        if (lb == null) {
+          continue;
+        }
+        final ExtendedBlock block = lb.getBlock();
+        assertEquals(blockSize, block.getNumBytes());
+
+        if (block.getNumBytes() == 0) {
+          continue;
+        }
+
+        if (Arrays.binarySearch(killedDnIndex, i) < 0) {
+          final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+              dfs, lb, 0, block.getNumBytes());
+          blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+          blockReader.close();
+        }
+      }
+
+      // check data
+      final int groupPosInFile = group*BLOCK_GROUP_SIZE;
+      for (int i = 0; i < dataBlockBytes.length; i++) {
+        boolean killed = false;
+        if (Arrays.binarySearch(killedDnIndex, i) >= 0){
+          killed = true;
+        }
+        final byte[] actual = dataBlockBytes[i];
+        for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
+          final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
+              BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
+          Assert.assertTrue(posInFile < length);
+          final byte expected = getByte(posInFile);
+
+          if (killed) {
+            actual[posInBlk] = expected;
+          } else {
+            if(expected != actual[posInBlk]){
+              String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+                  + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+                  + ". group=" + group + ", i=" + i;
+              Assert.fail(s);
+            }
+          }
+        }
+      }
+
+      // check parity
+      verifyParityBlocks(dfs.getConf(), lbs.getLocatedBlocks().get(group)
+              .getBlockSize(),
+          BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex);
+    }
+  }
+
+  static void verifyParityBlocks(Configuration conf, final long size, final int cellSize,
+      byte[][] dataBytes, byte[][] parityBytes, int[] killedDnIndex) {
+    Arrays.sort(killedDnIndex);
+    // verify the parity blocks
+    int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
+        size, cellSize, dataBytes.length, dataBytes.length);
+    final byte[][] expectedParityBytes = new byte[parityBytes.length][];
+    for (int i = 0; i < parityBytes.length; i++) {
+      expectedParityBytes[i] = new byte[parityBlkSize];
+    }
+    for (int i = 0; i < dataBytes.length; i++) {
+      if (dataBytes[i] == null) {
+        dataBytes[i] = new byte[dataBytes[0].length];
+      } else if (dataBytes[i].length < dataBytes[0].length) {
+        final byte[] tmp = dataBytes[i];
+        dataBytes[i] = new byte[dataBytes[0].length];
+        System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
+      }
+    }
+    final RawErasureEncoder encoder =
+        CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length);
+    encoder.encode(dataBytes, expectedParityBytes);
+    for (int i = 0; i < parityBytes.length; i++) {
+      if (Arrays.binarySearch(killedDnIndex, dataBytes.length + i) < 0){
+        Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + Arrays.toString(killedDnIndex),
+            expectedParityBytes[i], parityBytes[i]);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 0641e8e..d78e88b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -18,26 +18,14 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.io.erasurecode.CodecUtil;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -154,141 +142,15 @@ public class TestDFSStripedOutputStream {
         + cellSize + 123);
   }
 
-  private byte[] generateBytes(int cnt) {
-    byte[] bytes = new byte[cnt];
-    for (int i = 0; i < cnt; i++) {
-      bytes[i] = getByte(i);
-    }
-    return bytes;
-  }
-
-  private byte getByte(long pos) {
-    int mod = 29;
-    return (byte) (pos % mod + 1);
-  }
-
   private void testOneFile(String src, int writeBytes) throws Exception {
     src += "_" + writeBytes;
     Path testPath = new Path(src);
 
-    byte[] bytes = generateBytes(writeBytes);
+    byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
     DFSTestUtil.writeFile(fs, testPath, new String(bytes));
     StripedFileTestUtil.waitBlockGroupsReported(fs, src);
 
-    // check file length
-    FileStatus status = fs.getFileStatus(testPath);
-    Assert.assertEquals(writeBytes, status.getLen());
-
-    checkData(src, writeBytes);
-  }
-
-  void checkData(String src, int writeBytes) throws IOException {
-    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
-    LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
-
-    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
-      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
-      LocatedBlock[] blocks = StripedBlockUtil.
-          parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
-              cellSize, dataBlocks, parityBlocks);
-      List<LocatedBlock> oneGroup = Arrays.asList(blocks);
-      blockGroupList.add(oneGroup);
-    }
-
-    // test each block group
-    for (int group = 0; group < blockGroupList.size(); group++) {
-      //get the data of this block
-      List<LocatedBlock> blockList = blockGroupList.get(group);
-      byte[][] dataBlockBytes = new byte[dataBlocks][];
-      byte[][] parityBlockBytes = new byte[parityBlocks][];
-
-      // for each block, use BlockReader to read data
-      for (int i = 0; i < blockList.size(); i++) {
-        LocatedBlock lblock = blockList.get(i);
-        if (lblock == null) {
-          continue;
-        }
-        ExtendedBlock block = lblock.getBlock();
-        byte[] blockBytes = new byte[(int)block.getNumBytes()];
-        if (i < dataBlocks) {
-          dataBlockBytes[i] = blockBytes;
-        } else {
-          parityBlockBytes[i - dataBlocks] = blockBytes;
-        }
-
-        if (block.getNumBytes() == 0) {
-          continue;
-        }
-
-        final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
-            fs, lblock, 0, block.getNumBytes());
-        blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
-        blockReader.close();
-      }
-
-      // check if we write the data correctly
-      for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
-           blkIdxInGroup++) {
-        final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
-        if (actualBlkBytes == null) {
-          continue;
-        }
-        for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
-          // calculate the position of this byte in the file
-          long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
-              dataBlocks, posInBlk, blkIdxInGroup) +
-              group * blockSize * dataBlocks;
-          Assert.assertTrue(posInFile < writeBytes);
-          final byte expected = getByte(posInFile);
-
-          String s = "Unexpected byte " + actualBlkBytes[posInBlk]
-              + ", expect " + expected
-              + ". Block group index is " + group
-              + ", stripe index is " + posInBlk / cellSize
-              + ", cell index is " + blkIdxInGroup
-              + ", byte index is " + posInBlk % cellSize;
-          Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
-        }
-      }
-
-      verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(),
-          cellSize, dataBlockBytes, parityBlockBytes);
-    }
-  }
-
-  void verifyParity(final long size, final int cellSize,
-      byte[][] dataBytes, byte[][] parityBytes) {
-    verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1);
-  }
-
-  static void verifyParity(Configuration conf, final long size,
-                           final int cellSize, byte[][] dataBytes,
-                           byte[][] parityBytes, int killedDnIndex) {
-    // verify the parity blocks
-    int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
-        size, cellSize, dataBytes.length, dataBytes.length);
-    final byte[][] expectedParityBytes = new byte[parityBytes.length][];
-    for (int i = 0; i < parityBytes.length; i++) {
-      expectedParityBytes[i] = new byte[parityBlkSize];
-    }
-    for (int i = 0; i < dataBytes.length; i++) {
-      if (dataBytes[i] == null) {
-        dataBytes[i] = new byte[dataBytes[0].length];
-      } else if (dataBytes[i].length < dataBytes[0].length) {
-        final byte[] tmp = dataBytes[i];
-        dataBytes[i] = new byte[dataBytes[0].length];
-        System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
-      }
-    }
-    final RawErasureEncoder encoder =
-            CodecUtil.createRSRawEncoder(conf,
-                dataBytes.length, parityBytes.length);
-    encoder.encode(dataBytes, expectedParityBytes);
-    for (int i = 0; i < parityBytes.length; i++) {
-      if (i != killedDnIndex) {
-        Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
-            expectedParityBytes[i], parityBytes[i]);
-      }
-    }
+    StripedFileTestUtil.checkData(fs, testPath, writeBytes,
+        new int[]{}, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 44a29e6..f6c2566 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,23 +31,18 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -74,6 +70,7 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   private static final int FLUSH_POS
       = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
+
   static {
     System.out.println("NUM_DATA_BLOCKS  = " + NUM_DATA_BLOCKS);
     System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS);
@@ -101,6 +98,32 @@ public class TestDFSStripedOutputStreamWithFailure {
     return lengths;
   }
 
+  private static final int[][] dnIndexSuite = {
+      {0, 1},
+      {0, 5},
+      {0, 6},
+      {0, 8},
+      {1, 5},
+      {1, 6},
+      {6, 8},
+      {0, 1, 2},
+      {3, 4, 5},
+      {0, 1, 6},
+      {0, 5, 6},
+      {0, 5, 8},
+      {0, 6, 7},
+      {5, 6, 7},
+      {6, 7, 8},
+  };
+
+  private int[] getKillPositions(int fileLen, int num) {
+    int[] positions = new int[num];
+    for (int i = 0; i < num; i++) {
+      positions[i] = fileLen * (i + 1) / (num + 1);
+    }
+    return positions;
+  }
+
   private static final List<Integer> LENGTHS = newLengths();
 
   static int getLength(int i) {
@@ -127,42 +150,26 @@ public class TestDFSStripedOutputStreamWithFailure {
     }
   }
 
-  private static byte getByte(long pos) {
-    return (byte)pos;
-  }
-
   private HdfsConfiguration newHdfsConfiguration() {
     final HdfsConfiguration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
     return conf;
   }
 
-  void runTest(final int length) {
-    final HdfsConfiguration conf = newHdfsConfiguration();
-    for (int dn = 0; dn < 9; dn++) {
-      try {
-        setup(conf);
-        runTest(length, dn, false, conf);
-      } catch (Exception e) {
-        final String err = "failed, dn=" + dn + ", length=" + length
-            + StringUtils.stringifyException(e);
-        LOG.error(err);
-        Assert.fail(err);
-      } finally {
-        tearDown();
-      }
-    }
-  }
-
   @Test(timeout=240000)
   public void testDatanodeFailure56() throws Exception {
     runTest(getLength(56));
   }
 
   @Test(timeout=240000)
+  public void testMultipleDatanodeFailure56() throws Exception {
+    runTestWithMultipleFailure(getLength(56));
+  }
+
+  @Test(timeout=240000)
   public void testBlockTokenExpired() throws Exception {
     final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
     final HdfsConfiguration conf = newHdfsConfiguration();
@@ -174,7 +181,7 @@ public class TestDFSStripedOutputStreamWithFailure {
     for (int dn = 0; dn < 9; dn += 2) {
       try {
         setup(conf);
-        runTest(length, dn, true, conf);
+        runTest(length, new int[]{length/2}, new int[]{dn}, true);
       } catch (Exception e) {
         LOG.error("failed, dn=" + dn + ", length=" + length);
         throw e;
@@ -214,22 +221,8 @@ public class TestDFSStripedOutputStreamWithFailure {
         Assert.fail("Failed to validate available dns against blkGroupSize");
       } catch (IOException ioe) {
         // expected
-        GenericTestUtils.assertExceptionContains("Failed: the number of "
-            + "remaining blocks = 5 < the number of data blocks = 6", ioe);
-        DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out
-            .getWrappedStream();
-
-        // get leading streamer and verify the last exception
-        StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0);
-        try {
-          datastreamer.getLastException().check(true);
-          Assert.fail("Failed to validate available dns against blkGroupSize");
-        } catch (IOException le) {
-          GenericTestUtils.assertExceptionContains(
-              "Failed to get datablocks number of nodes from"
-                  + " namenode: blockGroupSize= 9, blocks.length= "
-                  + numDatanodes, le);
-        }
+        GenericTestUtils.assertExceptionContains("Failed to get 6 nodes from" +
+            " namenode: blockGroupSize= 9, blocks.length= 5", ioe);
       }
     } finally {
       tearDown();
@@ -258,42 +251,73 @@ public class TestDFSStripedOutputStreamWithFailure {
       int fileLength = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE - 1000;
       final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
       DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
+      LOG.info("writing finished. Seek and read the file to verify.");
       StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
     } finally {
       tearDown();
     }
   }
 
-  private void runTest(final int length, final int dnIndex,
-      final boolean tokenExpire, final HdfsConfiguration conf) {
-    try {
-      runTest(length, length/2, dnIndex, tokenExpire, conf);
-    } catch(Exception e) {
-      LOG.info("FAILED", e);
-      Assert.fail(StringUtils.stringifyException(e));
+  void runTest(final int length) {
+    final HdfsConfiguration conf = newHdfsConfiguration();
+    for (int dn = 0; dn < 9; dn++) {
+      try {
+        setup(conf);
+        runTest(length, new int[]{length/2}, new int[]{dn}, false);
+      } catch (Throwable e) {
+        final String err = "failed, dn=" + dn + ", length=" + length
+            + StringUtils.stringifyException(e);
+        LOG.error(err);
+        Assert.fail(err);
+      } finally {
+        tearDown();
+      }
     }
   }
 
-  private void runTest(final int length, final int killPos,
-      final int dnIndex, final boolean tokenExpire,
-      final HdfsConfiguration conf) throws Exception {
-    if (killPos <= FLUSH_POS) {
-      LOG.warn("killPos=" + killPos + " <= FLUSH_POS=" + FLUSH_POS
-          + ", length=" + length + ", dnIndex=" + dnIndex);
-      return; //skip test
+  void runTestWithMultipleFailure(final int length) throws Exception {
+    final HdfsConfiguration conf = newHdfsConfiguration();
+    for(int i=0;i<dnIndexSuite.length;i++){
+      int[] dnIndex = dnIndexSuite[i];
+      int[] killPos = getKillPositions(length, dnIndex.length);
+      try {
+        setup(conf);
+        runTest(length, killPos, dnIndex, false);
+      } catch (Throwable e) {
+        final String err = "failed, killPos=" + Arrays.toString(killPos)
+            + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
+        LOG.error(err);
+        throw e;
+      } finally {
+        tearDown();
+      }
     }
-    Preconditions.checkArgument(length > killPos,
-        "length=%s <= killPos=%s", length, killPos);
+  }
 
-    // start a datanode now, will kill one later
-    cluster.startDataNodes(conf, 1, true, null, null);
-    cluster.waitActive();
+  /**
+   * runTest implementation
+   * @param length file length
+   * @param killPos killing positions in ascending order
+   * @param dnIndex DN index to kill when meets killing positions
+   * @param tokenExpire wait token to expire when kill a DN
+   * @throws Exception
+   */
+  private void runTest(final int length, final int[] killPos,
+      final int[] dnIndex, final boolean tokenExpire) throws Exception {
+    if (killPos[0] <= FLUSH_POS) {
+      LOG.warn("killPos=" + Arrays.toString(killPos) + " <= FLUSH_POS=" + FLUSH_POS
+          + ", length=" + length + ", dnIndex=" + Arrays.toString(dnIndex));
+      return; //skip test
+    }
+    Preconditions.checkArgument(length > killPos[0], "length=%s <= killPos=%s",
+        length, killPos);
+    Preconditions.checkArgument(killPos.length == dnIndex.length);
 
-    final Path p = new Path(dir, "dn" + dnIndex + "len" + length + "kill" +  killPos);
+    final Path p = new Path(dir, "dn" + Arrays.toString(dnIndex)
+        + "len" + length + "kill" +  Arrays.toString(killPos));
     final String fullPath = p.toString();
     LOG.info("fullPath=" + fullPath);
 
-
     if (tokenExpire) {
       final NameNode nn = cluster.getNameNode();
       final BlockManager bm = nn.getNamesystem().getBlockManager();
@@ -308,50 +332,56 @@ public class TestDFSStripedOutputStreamWithFailure {
     final DFSStripedOutputStream stripedOut
         = (DFSStripedOutputStream)out.getWrappedStream();
 
-    long oldGS = -1;
-    boolean killed = false;
+    long firstGS = -1;  // first GS of this block group which never proceeds blockRecovery
+    long oldGS = -1; // the old GS before bumping
+    int numKilled=0;
     for(; pos.get() < length; ) {
       final int i = pos.getAndIncrement();
-      if (i == killPos) {
+      if (numKilled < killPos.length &&  i == killPos[numKilled]) {
+        assertTrue(firstGS != -1);
         final long gs = getGenerationStamp(stripedOut);
-        Assert.assertTrue(oldGS != -1);
-        Assert.assertEquals(oldGS, gs);
+        if (numKilled == 0) {
+          assertEquals(firstGS, gs);
+        } else {
+          //TODO: implement hflush/hsync and verify gs strict greater than oldGS
+          assertTrue(gs >= oldGS);
+        }
+        oldGS = gs;
 
         if (tokenExpire) {
           DFSTestUtil.flushInternal(stripedOut);
           waitTokenExpires(out);
         }
 
-        killDatanode(cluster, stripedOut, dnIndex, pos);
-        killed = true;
+        killDatanode(cluster, stripedOut, dnIndex[numKilled], pos);
+        numKilled++;
       }
 
       write(out, i);
 
-      if (i == FLUSH_POS) {
-        oldGS = getGenerationStamp(stripedOut);
+      if (i % BLOCK_GROUP_SIZE == FLUSH_POS) {
+        firstGS = getGenerationStamp(stripedOut);
+        oldGS = firstGS;
       }
     }
     out.close();
+    assertEquals(dnIndex.length, numKilled);
 
     short expectedReported = StripedFileTestUtil.getRealTotalBlockNum(length);
-    if (length > dnIndex * CELL_SIZE || dnIndex >= NUM_DATA_BLOCKS) {
-      expectedReported--;
+    for(int idx :dnIndex) {
+      if (length > idx * CELL_SIZE || idx >= NUM_DATA_BLOCKS) {
+        expectedReported--;
+      }
     }
     DFSTestUtil.waitReplication(dfs, p, expectedReported);
 
-    Assert.assertTrue(killed);
-
-    // check file length
-    final FileStatus status = dfs.getFileStatus(p);
-    Assert.assertEquals(length, status.getLen());
-
-    checkData(dfs, fullPath, length, dnIndex, oldGS);
+    cluster.triggerBlockReports();
+    StripedFileTestUtil.checkData(dfs, p, length, dnIndex, oldGS);
   }
 
   static void write(FSDataOutputStream out, int i) throws IOException {
     try {
-      out.write(getByte(i));
+      out.write(StripedFileTestUtil.getByte(i));
     } catch(IOException ioe) {
       throw new IOException("Failed at i=" + i, ioe);
     }
@@ -359,10 +389,10 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   static long getGenerationStamp(DFSStripedOutputStream out)
       throws IOException {
+    DFSTestUtil.flushBuffer(out);
     final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp();
     LOG.info("getGenerationStamp returns " + gs);
     return gs;
-
   }
 
   static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
@@ -399,106 +429,6 @@ public class TestDFSStripedOutputStreamWithFailure {
     cluster.stopDataNode(datanode.getXferAddr());
   }
 
-  static void checkData(DistributedFileSystem dfs, String src, int length,
-      int killedDnIndex, long oldGS) throws IOException {
-    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
-    LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
-    final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
-    Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
-
-    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
-      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
-
-      final long gs = firstBlock.getBlock().getGenerationStamp();
-      final String s = "gs=" + gs + ", oldGS=" + oldGS;
-      LOG.info(s);
-      Assert.assertTrue(s, gs >= oldGS);
-
-      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
-          (LocatedStripedBlock) firstBlock,
-          CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
-      blockGroupList.add(Arrays.asList(blocks));
-    }
-
-    // test each block group
-    for (int group = 0; group < blockGroupList.size(); group++) {
-      final boolean isLastGroup = group == blockGroupList.size() - 1;
-      final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
-          : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
-      final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1;
-      final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
-      final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
-
-      //get the data of this block
-      List<LocatedBlock> blockList = blockGroupList.get(group);
-      byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
-      byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
-
-      // for each block, use BlockReader to read data
-      for (int i = 0; i < blockList.size(); i++) {
-        final int j = i >= NUM_DATA_BLOCKS? 0: i;
-        final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
-            + (j <= lastCellIndex? 1: 0);
-        final int blockSize = numCellInBlock*CELL_SIZE
-            + (isLastGroup && j == lastCellIndex? lastCellSize - CELL_SIZE: 0);
-
-        final byte[] blockBytes = new byte[blockSize];
-        if (i < NUM_DATA_BLOCKS) {
-          dataBlockBytes[i] = blockBytes;
-        } else {
-          parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
-        }
-
-        final LocatedBlock lb = blockList.get(i);
-        LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
-            + ", blockSize=" + blockSize + ", lb=" + lb);
-        if (lb == null) {
-          continue;
-        }
-        final ExtendedBlock block = lb.getBlock();
-        Assert.assertEquals(blockSize, block.getNumBytes());
-
-
-        if (block.getNumBytes() == 0) {
-          continue;
-        }
-
-        if (i != killedDnIndex) {
-          final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
-              dfs, lb, 0, block.getNumBytes());
-          blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
-          blockReader.close();
-        }
-      }
-
-      // check data
-      final int groupPosInFile = group*BLOCK_GROUP_SIZE;
-      for (int i = 0; i < dataBlockBytes.length; i++) {
-        final byte[] actual = dataBlockBytes[i];
-        for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
-          final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
-              CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
-          Assert.assertTrue(posInFile < length);
-          final byte expected = getByte(posInFile);
-
-          if (i == killedDnIndex) {
-            actual[posInBlk] = expected;
-          } else {
-            String s = "expected=" + expected + " but actual=" + actual[posInBlk]
-                + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
-                + ". group=" + group + ", i=" + i;
-            Assert.assertEquals(s, expected, actual[posInBlk]);
-          }
-        }
-      }
-
-      // check parity
-      TestDFSStripedOutputStream.verifyParity(dfs.getConf(),
-          lbs.getLocatedBlocks().get(group).getBlockSize(),
-          CELL_SIZE, dataBlockBytes, parityBlockBytes,
-          killedDnIndex - dataBlockBytes.length);
-    }
-  }
 
   private void waitTokenExpires(FSDataOutputStream out) throws IOException {
     Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
index c0dca4e..764527d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -39,6 +41,12 @@ public class TestWriteStripedFileWithFailure {
   private static MiniDFSCluster cluster;
   private static FileSystem fs;
   private static Configuration conf = new HdfsConfiguration();
+
+  static {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+  }
+
   private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
   private final short parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
   private final int smallFileLength = blockSize * dataBlocks - 123;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 124bf80..ef31527 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -1745,7 +1745,7 @@ public class TestBalancer {
 
       // verify locations of striped blocks
       LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
 
       // add one datanode
       String newRack = "/rack" + (++numOfRacks);
@@ -1761,7 +1761,7 @@ public class TestBalancer {
 
       // verify locations of striped blocks
       locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
     } finally {
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 3a9748f..7cf5656 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -488,7 +488,7 @@ public class TestMover {
           Assert.assertEquals(StorageType.DISK, type);
         }
       }
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
           dataBlocks + parityBlocks);
 
       // start 5 more datanodes
@@ -523,7 +523,7 @@ public class TestMover {
           Assert.assertEquals(StorageType.ARCHIVE, type);
         }
       }
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
           dataBlocks + parityBlocks);
 
     }finally{

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
index 64d33a4..abcdbc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -42,7 +41,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class TestAddOverReplicatedStripedBlocks {
 
@@ -64,6 +62,7 @@ public class TestAddOverReplicatedStripedBlocks {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     // disable block recovery
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     SimulatedFSDataset.setFactory(conf);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.waitActive();
@@ -118,7 +117,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // verify that all internal blocks exists
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
   }
 
   @Test
@@ -162,7 +161,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // verify that all internal blocks exists
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
   }
 
   @Test
@@ -216,7 +215,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // verify that all internal blocks exists
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
   }
 
   @Test
@@ -248,6 +247,7 @@ public class TestAddOverReplicatedStripedBlocks {
 
     // update blocksMap
     cluster.triggerBlockReports();
+    Thread.sleep(2000);
     // add to invalidates
     cluster.triggerHeartbeats();
     // datanode delete block
@@ -259,7 +259,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // we are left GROUP_SIZE - 1 blocks.
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index c27ead5..735f84d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@@ -736,7 +737,13 @@ public class TestRetryCacheWithHA {
       DatanodeInfo[] newNodes = new DatanodeInfo[2];
       newNodes[0] = nodes[0];
       newNodes[1] = nodes[1];
-      String[] storageIDs = {"s0", "s1"};
+      final DatanodeManager dm = cluster.getNamesystem(0).getBlockManager()
+          .getDatanodeManager();
+      final String storageID1 = dm.getDatanode(newNodes[0]).getStorageInfos()[0]
+          .getStorageID();
+      final String storageID2 = dm.getDatanode(newNodes[1]).getStorageInfos()[0]
+          .getStorageID();
+      String[] storageIDs = {storageID1, storageID2};
       
       client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
           newBlock, newNodes, storageIDs);


Mime
View raw message