hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [05/50] hadoop git commit: HDFS-8120. Erasure coding: created util class to analyze striped block groups. Contributed by Zhe Zhang and Li Bo.
Date Mon, 11 May 2015 19:23:05 GMT
HDFS-8120. Erasure coding: created util class to analyze striped block groups. Contributed by Zhe Zhang and Li Bo.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed68791c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed68791c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed68791c

Branch: refs/heads/HDFS-7285
Commit: ed68791cba9898300b16543ac13f3efcdc45d3eb
Parents: 43df7be
Author: Jing Zhao <jing9@apache.org>
Authored: Wed Apr 15 12:59:27 2015 -0700
Committer: Zhe Zhang <zhz@apache.org>
Committed: Mon May 11 11:36:14 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |   4 +-
 .../hadoop/hdfs/DFSStripedInputStream.java      |  77 +++--------
 .../hadoop/hdfs/DFSStripedOutputStream.java     |  34 +++--
 .../apache/hadoop/hdfs/StripedDataStreamer.java |  58 ++------
 .../server/blockmanagement/BlockManager.java    |  26 +++-
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 138 +++++++++++++++++++
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  91 +++++++-----
 .../hadoop/hdfs/TestDFSStripedOutputStream.java |  83 +++++------
 .../apache/hadoop/hdfs/TestReadStripedFile.java |  92 +++----------
 .../server/namenode/TestAddStripedBlocks.java   | 107 ++++++++++++++
 .../namenode/TestRecoverStripedBlocks.java      |   3 +-
 .../hadoop/hdfs/util/TestStripedBlockUtil.java  | 125 +++++++++++++++++
 12 files changed, 562 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 9104f84..16250dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1148,9 +1148,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         for (int i = 0; i < offsets.length; i++) {
           int nread = reader.readAll(buf, offsets[i], lengths[i]);
           updateReadStatistics(readStatistics, nread, reader);
-          if (nread != len) {
+          if (nread != lengths[i]) {
             throw new IOException("truncated return from reader.read(): " +
-                "excpected " + len + ", got " + nread);
+                "excpected " + lengths[i] + ", got " + nread);
           }
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 8a431b1..d597407 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -50,7 +51,7 @@ import java.util.concurrent.Future;
  *
  * | <- Striped Block Group -> |
  *  blk_0      blk_1       blk_2   <- A striped block group has
- *    |          |           |          {@link #groupSize} blocks
+ *    |          |           |          {@link #dataBlkNum} blocks
  *    v          v           v
  * +------+   +------+   +------+
  * |cell_0|   |cell_1|   |cell_2|  <- The logical read order should be
@@ -72,7 +73,7 @@ import java.util.concurrent.Future;
 public class DFSStripedInputStream extends DFSInputStream {
   /**
    * This method plans the read portion from each block in the stripe
-   * @param groupSize The size / width of the striping group
+   * @param dataBlkNum The number of data blocks in the striping group
    * @param cellSize The size of each striping cell
    * @param startInBlk Starting offset in the striped block
    * @param len Length of the read request
@@ -81,29 +82,29 @@ public class DFSStripedInputStream extends DFSInputStream {
    *         for an individual block in the group
    */
   @VisibleForTesting
-  static ReadPortion[] planReadPortions(final int groupSize,
+  static ReadPortion[] planReadPortions(final int dataBlkNum,
       final int cellSize, final long startInBlk, final int len, int bufOffset) {
-    ReadPortion[] results = new ReadPortion[groupSize];
-    for (int i = 0; i < groupSize; i++) {
+    ReadPortion[] results = new ReadPortion[dataBlkNum];
+    for (int i = 0; i < dataBlkNum; i++) {
       results[i] = new ReadPortion();
     }
 
     // cellIdxInBlk is the index of the cell in the block
     // E.g., cell_3 is the 2nd cell in blk_0
-    int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
+    int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
 
     // blkIdxInGroup is the index of the block in the striped block group
     // E.g., blk_2 is the 3rd block in the group
-    final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
+    final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
     results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
         startInBlk % cellSize;
     boolean crossStripe = false;
-    for (int i = 1; i < groupSize; i++) {
-      if (blkIdxInGroup + i >= groupSize && !crossStripe) {
+    for (int i = 1; i < dataBlkNum; i++) {
+      if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
         cellIdxInBlk++;
         crossStripe = true;
       }
-      results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
+      results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
           cellSize * cellIdxInBlk;
     }
 
@@ -112,57 +113,21 @@ public class DFSStripedInputStream extends DFSInputStream {
     results[blkIdxInGroup].lengths.add(firstCellLen);
     results[blkIdxInGroup].readLength += firstCellLen;
 
-    int i = (blkIdxInGroup + 1) % groupSize;
+    int i = (blkIdxInGroup + 1) % dataBlkNum;
     for (int done = firstCellLen; done < len; done += cellSize) {
       ReadPortion rp = results[i];
       rp.offsetsInBuf.add(done + bufOffset);
       final int readLen = Math.min(len - done, cellSize);
       rp.lengths.add(readLen);
       rp.readLength += readLen;
-      i = (i + 1) % groupSize;
+      i = (i + 1) % dataBlkNum;
     }
     return results;
   }
 
-  /**
-   * This method parses a striped block group into individual blocks.
-   *
-   * @param bg The striped block group
-   * @param dataBlkNum the number of data blocks
-   * @return An array containing the blocks in the group
-   */
-  @VisibleForTesting
-  static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
-      int dataBlkNum, int cellSize) {
-    int locatedBGSize = bg.getBlockIndices().length;
-    // TODO not considering missing blocks for now, only identify data blocks
-    LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
-    for (short i = 0; i < locatedBGSize; i++) {
-      final int idx = bg.getBlockIndices()[i];
-      if (idx < dataBlkNum && lbs[idx] == null) {
-        lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
-      }
-    }
-    return lbs;
-  }
-
-  private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
-      int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
-    final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
-    blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
-    // TODO: fix the numBytes computation
-
-    return new LocatedBlock(blk,
-        new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
-        new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
-        new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
-        bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
-        null);
-  }
-
-
   private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
-  private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
 
   DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
       throws IOException {
@@ -199,7 +164,7 @@ public class DFSStripedInputStream extends DFSInputStream {
         "LocatedStripedBlock for a striped file";
 
     int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
-        % groupSize);
+        % dataBlkNum);
     // If indexing information is returned, iterate through the index array
     // to find the entry for position idx in the group
     LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
@@ -213,7 +178,8 @@ public class DFSStripedInputStream extends DFSInputStream {
       DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
           + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
     }
-    return constructInternalBlock(lsb, i, cellSize, idx);
+    return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize,
+        dataBlkNum, idx);
   }
 
   private LocatedBlock getBlockGroupAt(long offset) throws IOException {
@@ -240,13 +206,14 @@ public class DFSStripedInputStream extends DFSInputStream {
     LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
 
     // Planning the portion of I/O for each shard
-    ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
+    ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
         len, offset);
 
     // Parse group to get chosen DN location
-    LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
+    LocatedBlock[] blks = StripedBlockUtil.
+        parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);
 
-    for (short i = 0; i < groupSize; i++) {
+    for (short i = 0; i < dataBlkNum; i++) {
       ReadPortion rp = readPortions[i];
       if (rp.readLength <= 0) {
         continue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 1d0e1be..f11a657 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.util.DataChecksum;
@@ -309,10 +310,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         streamer.closeSocket();
         if (streamer.isLeadingStreamer()) {
           leadingStreamer = streamer;
-        } else {
-          streamer.countTailingBlockGroupBytes();
         }
-
       } catch (InterruptedException e) {
         throw new IOException("Failed to shutdown streamer");
       } finally {
@@ -320,6 +318,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         setClosed();
       }
     }
+    assert leadingStreamer != null : "One streamer should be leader";
     leadingStreamer.countTailingBlockGroupBytes();
   }
 
@@ -337,23 +336,28 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   }
 
   private void writeParityCellsForLastStripe() throws IOException{
-    if(currentBlockGroupBytes == 0 ||
-        currentBlockGroupBytes % stripeDataSize() == 0)
+    long parityBlkSize = StripedBlockUtil.getInternalBlockLength(
+        currentBlockGroupBytes, cellSize, blockGroupDataBlocks,
+        blockGroupDataBlocks + 1);
+    if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) {
       return;
-    int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
-    // Size of parity cells should equal the size of the first cell, if it
-    // is not full.
-    int parityCellSize = cellSize;
-    int index = lastStripeLen / cellSize;
-    if (lastStripeLen < cellSize) {
-      parityCellSize = lastStripeLen;
-      index++;
     }
+    int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize :
+                        (int) (parityBlkSize % cellSize);
+
     for (int i = 0; i < blockGroupBlocks; i++) {
-      if (i >= index) {
+      long internalBlkLen = StripedBlockUtil.getInternalBlockLength(
+          currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i);
+      // Pad zero bytes to make all cells exactly the size of parityCellSize
+      // If internal block is smaller than parity block, pad zero bytes.
+      // Also pad zero bytes to all parity cells
+      if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) {
         int position = cellBuffers[i].position();
+        assert position <= parityCellSize : "If an internal block is smaller" +
+            " than parity block, then its last cell should be small than last" +
+            " parity cell";
         for (int j = 0; j < parityCellSize - position; j++) {
-          cellBuffers[i].put((byte)0);
+          cellBuffers[i].put((byte) 0);
         }
       }
       cellBuffers[i].flip();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 710d92d..5614852 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -19,16 +19,16 @@
 package org.apache.hadoop.hdfs;
 
 import java.util.List;
-import org.apache.hadoop.fs.StorageType;
+
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
@@ -134,19 +134,7 @@ public class StripedDataStreamer extends DataStreamer {
               "putting a block to stripeBlocks, ie = " + ie);
         }
       }
-    } else if (!isParityStreamer()) {
-      if (block == null || block.getNumBytes() == 0) {
-        LocatedBlock finishedBlock = new LocatedBlock(null, null);
-        try {
-          boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
-              TimeUnit.SECONDS);
-        } catch (InterruptedException ie) {
-          //TODO: Handle InterruptedException (HDFS-7786)
-          ie.printStackTrace();
-        }
-      }
     }
-
   }
 
   @Override
@@ -155,8 +143,10 @@ public class StripedDataStreamer extends DataStreamer {
     LocatedBlock lb = null;
     if (isLeadingStreamer()) {
       if(hasCommittedBlock) {
-        //when committing a block group, leading streamer has to adjust
-        // {@link block} including the size of block group
+        /**
+         * when committing a block group, leading streamer has to adjust
+         * {@link block} to include the size of block group
+         */
         for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
           try {
             LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
@@ -179,7 +169,13 @@ public class StripedDataStreamer extends DataStreamer {
 
       lb = super.locateFollowingBlock(excludedNodes);
       hasCommittedBlock = true;
-      LocatedBlock[] blocks = unwrapBlockGroup(lb);
+      assert lb instanceof LocatedStripedBlock;
+      DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
+      LocatedBlock[] blocks = StripedBlockUtil.
+          parseStripedBlockGroup((LocatedStripedBlock) lb,
+              HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS,
+              HdfsConstants.NUM_PARITY_BLOCKS
+          );
       assert blocks.length == blockGroupSize :
           "Fail to get block group from namenode: blockGroupSize: " +
               blockGroupSize + ", blocks.length: " + blocks.length;
@@ -212,30 +208,4 @@ public class StripedDataStreamer extends DataStreamer {
     }
     return lb;
   }
-
-  /**
-   * Generate other blocks in a block group according to the first one.
-   *
-   * @param firstBlockInGroup the first block in a block group
-   * @return  other blocks in this group
-   */
-  public static LocatedBlock[] unwrapBlockGroup(
-      final LocatedBlock firstBlockInGroup) {
-    ExtendedBlock eb = firstBlockInGroup.getBlock();
-    DatanodeInfo[] locs = firstBlockInGroup.getLocations();
-    String[] storageIDs = firstBlockInGroup.getStorageIDs();
-    StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
-    Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
-    LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
-    for (int i = 0; i < blocksInGroup.length; i++) {
-      //each block in a group has the same number of bytes and timestamp
-      ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
-          eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
-      blocksInGroup[i] = new LocatedBlock(extendedBlock,
-          new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
-          new StorageType[] {storageTypes[i]});
-      blocksInGroup[i].setBlockToken(blockToken);
-    }
-    return blocksInGroup;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index b6faacb..07b49c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
@@ -1964,8 +1965,8 @@ public class BlockManager {
       metrics.addBlockReport((int) (endTime - startTime));
     }
     blockLog.info("BLOCK* processReport: from storage {} node {}, " +
-        "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage
-        .getStorageID(), nodeID, newReport.getNumberOfBlocks(),
+            "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage
+            .getStorageID(), nodeID, newReport.getNumberOfBlocks(),
         node.hasStaleStorages(), (endTime - startTime));
     return !node.hasStaleStorages();
   }
@@ -1992,8 +1993,8 @@ public class BlockManager {
     assert(zombie.numBlocks() == 0);
     LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
             "which no longer exists on the DataNode.",
-            Long.toHexString(context.getReportId()), prevBlocks,
-            zombie.getStorageID());
+        Long.toHexString(context.getReportId()), prevBlocks,
+        zombie.getStorageID());
   }
 
   /**
@@ -2472,7 +2473,22 @@ public class BlockManager {
               "block is " + ucState + " and reported genstamp " + reportedGS
               + " does not match genstamp in block map "
               + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
-        } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
+        }
+        boolean wrongSize;
+        if (storedBlock.isStriped()) {
+          assert BlockIdManager.isStripedBlockID(reported.getBlockId());
+          assert storedBlock.getBlockId() ==
+              BlockIdManager.convertToStripedID(reported.getBlockId());
+          BlockInfoStriped stripedBlock = (BlockInfoStriped) storedBlock;
+          int reportedBlkIdx = BlockIdManager.getBlockIndex(reported);
+          wrongSize = reported.getNumBytes() !=
+              getInternalBlockLength(stripedBlock.getNumBytes(),
+                  HdfsConstants.BLOCK_STRIPED_CELL_SIZE,
+                  stripedBlock.getDataBlockNum(), reportedBlkIdx);
+        } else {
+          wrongSize = storedBlock.getNumBytes() != reported.getNumBytes();
+        }
+        if (wrongSize) {
           return new BlockToMarkCorrupt(new Block(reported), storedBlock,
               "block is " + ucState + " and reported length " +
               reported.getNumBytes() + " does not match " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
new file mode 100644
index 0000000..2368021
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+
+/**
+ * Utility class for analyzing striped block groups
+ */
+@InterfaceAudience.Private
+public class StripedBlockUtil {
+
+  /**
+   * This method parses a striped block group into individual blocks.
+   *
+   * @param bg The striped block group
+   * @param cellSize The size of a striping cell
+   * @param dataBlkNum The number of data blocks
+   * @return An array containing the blocks in the group
+   */
+  public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+      int cellSize, int dataBlkNum, int parityBlkNum) {
+    int locatedBGSize = bg.getBlockIndices().length;
+    // TODO not considering missing blocks for now, only identify data blocks
+    LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
+    for (short i = 0; i < locatedBGSize; i++) {
+      final int idx = bg.getBlockIndices()[i];
+      if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) {
+        lbs[idx] = constructInternalBlock(bg, i, cellSize,
+            dataBlkNum, idx);
+      }
+    }
+    return lbs;
+  }
+
+  /**
+   * This method creates an internal block at the given index of a block group
+   *
+   * @param idxInReturnedLocs The index in the stored locations in the
+   *                          {@link LocatedStripedBlock} object
+   * @param idxInBlockGroup The logical index in the striped block group
+   * @return The constructed internal block
+   */
+  public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
+      int idxInReturnedLocs, int cellSize, int dataBlkNum,
+      int idxInBlockGroup) {
+    final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
+    blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
+    blk.setNumBytes(getInternalBlockLength(bg.getBlockSize(),
+        cellSize, dataBlkNum, idxInBlockGroup));
+
+    return new LocatedBlock(blk,
+        new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
+        new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
+        new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
+        bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
+        null);
+  }
+
+  /**
+   * Get the size of an internal block at the given index of a block group
+   *
+   * @param numBytesInGroup Size of the block group only counting data blocks
+   * @param cellSize The size of a striping cell
+   * @param dataBlkNum The number of data blocks
+   * @param idxInGroup The logical index in the striped block group
+   * @return The size of the internal block at the specified index
+   */
+  public static long getInternalBlockLength(long numBytesInGroup,
+      int cellSize, int dataBlkNum, int idxInGroup) {
+    // Size of each stripe (only counting data blocks)
+    final long numBytesPerStripe = cellSize * dataBlkNum;
+    assert numBytesPerStripe  > 0:
+        "getInternalBlockLength should only be called on valid striped blocks";
+    // If block group ends at stripe boundary, each internal block has an equal
+    // share of the group
+    if (numBytesInGroup % numBytesPerStripe == 0) {
+      return numBytesInGroup / dataBlkNum;
+    }
+
+    int numStripes = (int) ((numBytesInGroup - 1) / numBytesPerStripe + 1);
+    assert numStripes >= 1 : "There should be at least 1 stripe";
+
+    // All stripes but the last one are full stripes. The block should at least
+    // contain (numStripes - 1) full cells.
+    long blkSize = (numStripes - 1) * cellSize;
+
+    long lastStripeLen = numBytesInGroup % numBytesPerStripe;
+    // Size of parity cells should equal the size of the first cell, if it
+    // is not full.
+    long lastParityCellLen = Math.min(cellSize, lastStripeLen);
+
+    if (idxInGroup >= dataBlkNum) {
+      // for parity blocks
+      blkSize += lastParityCellLen;
+    } else {
+      // for data blocks
+      blkSize +=  Math.min(cellSize,
+          Math.max(0, lastStripeLen - cellSize * idxInGroup));
+    }
+
+    return blkSize;
+  }
+
+  /**
+   * Given a byte's offset in an internal block, calculate the offset in
+   * the block group
+   */
+  public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
+      long offsetInBlk, int idxInBlockGroup) {
+    int cellIdxInBlk = (int) (offsetInBlk / cellSize);
+    return cellIdxInBlk * cellSize * dataBlkNum // n full stripes before offset
+        + idxInBlockGroup * cellSize // m full cells before offset
+        + offsetInBlk % cellSize; // partial cell
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index ed508fc..0c88842 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -108,7 +108,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -1851,11 +1850,30 @@ public class DFSTestUtil {
     return reports;
   }
 
-  public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
-      int numBlocks, int numStripesPerBlk) throws Exception {
+  /**
+   * Creates the metadata of a file in striped layout. This method only
+   * manipulates the NameNode state without injecting data to DataNode.
+   *  @param file Path of the file to create
+   * @param dir Parent path of the file
+   * @param numBlocks Number of striped block groups to add to the file
+   * @param numStripesPerBlk Number of striped cells in each block
+   * @param toMkdir
+   */
+  public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir,
+      int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception {
     DistributedFileSystem dfs = cluster.getFileSystem();
-    dfs.mkdirs(dir);
-    dfs.getClient().createErasureCodingZone(dir.toString(), null);
+    // If outer test already created EC zone, dir should be left as null
+    if (toMkdir) {
+      assert dir != null;
+      dfs.mkdirs(dir);
+      try {
+        dfs.getClient().createErasureCodingZone(dir.toString(), null);
+      } catch (IOException e) {
+        if (!e.getMessage().contains("non-empty directory")) {
+          throw e;
+        }
+      }
+    }
 
     FSDataOutputStream out = null;
     try {
@@ -1867,7 +1885,7 @@ public class DFSTestUtil {
 
       ExtendedBlock previous = null;
       for (int i = 0; i < numBlocks; i++) {
-        Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns,
+        Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns,
             file.toString(), fileNode, dfs.getClient().getClientName(),
             previous, numStripesPerBlk);
         previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
@@ -1880,43 +1898,50 @@ public class DFSTestUtil {
     }
   }
 
-  static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
-      FSNamesystem ns, String file, INodeFile fileNode, String clientName,
-      ExtendedBlock previous, int numStripes) throws Exception {
+  /**
+   * Adds a striped block group to a file. This method only manipulates NameNode
+   * states of the file and the block without injecting data to DataNode.
+   * It does mimic block reports.
+   * @param dataNodes List DataNodes to host the striped block group
+   * @param previous Previous block in the file
+   * @param numStripes Number of stripes in each block group
+   * @return The added block group
+   */
+  public static Block addStripedBlockToFile(List<DataNode> dataNodes,
+      DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode,
+      String clientName, ExtendedBlock previous, int numStripes)
+      throws Exception {
     fs.getClient().namenode.addBlock(file, clientName, previous, null,
         fileNode.getId(), null);
 
     final BlockInfo lastBlock = fileNode.getLastBlock();
     final int groupSize = fileNode.getBlockReplication();
+    assert dataNodes.size() >= groupSize;
     // 1. RECEIVING_BLOCK IBR
-    int i = 0;
-    for (DataNode dn : dataNodes) {
-      if (i < groupSize) {
-        final Block block = new Block(lastBlock.getBlockId() + i++, 0,
-            lastBlock.getGenerationStamp());
-        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
-        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
-            .makeReportForReceivedBlock(block,
-                ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
-        for (StorageReceivedDeletedBlocks report : reports) {
-          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
-        }
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i, 0,
+          lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
       }
     }
 
     // 2. RECEIVED_BLOCK IBR
-    i = 0;
-    for (DataNode dn : dataNodes) {
-      if (i < groupSize) {
-        final Block block = new Block(lastBlock.getBlockId() + i++,
-            numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
-        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
-        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
-            .makeReportForReceivedBlock(block,
-                ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
-        for (StorageReceivedDeletedBlocks report : reports) {
-          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
-        }
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i,
+          numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index c78922e..4a09bda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -2,7 +2,6 @@ package org.apache.hadoop.hdfs;
 
 import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
@@ -14,10 +13,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -39,16 +40,16 @@ public class TestDFSStripedOutputStream {
   private MiniDFSCluster cluster;
   private Configuration conf = new Configuration();
   private DistributedFileSystem fs;
-  int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
-  int blockSize = 8 * 1024 * 1024;
-  int cellsInBlock = blockSize / cellSize;
+  private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int stripesPerBlock = 4;
+  int blockSize = cellSize * stripesPerBlock;
   private int mod = 29;
 
   @Before
   public void setup() throws IOException {
     int numDNs = dataBlocks + parityBlocks + 2;
     Configuration conf = new Configuration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
     fs = cluster.getFileSystem();
@@ -103,8 +104,7 @@ public class TestDFSStripedOutputStream {
 
   @Test
   public void TestFileMoreThanOneStripe2() throws IOException {
-    testOneFile("/MoreThanOneStripe2",
-        cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1)
+    testOneFile("/MoreThanOneStripe2", cellSize * dataBlocks
             + cellSize * dataBlocks + 123);
   }
 
@@ -113,18 +113,22 @@ public class TestDFSStripedOutputStream {
     testOneFile("/FullBlockGroup", blockSize * dataBlocks);
   }
 
-  //TODO: The following tests will pass after HDFS-8121 fixed
-//  @Test
+  @Test
   public void TestFileMoreThanABlockGroup1() throws IOException {
     testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
   }
 
-  //  @Test
+  @Test
   public void TestFileMoreThanABlockGroup2() throws IOException {
-    testOneFile("/MoreThanABlockGroup2",
-        blockSize * dataBlocks * 3
-            + (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks
-            + 123);
+    testOneFile("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123);
+  }
+
+
+  @Test
+  public void TestFileMoreThanABlockGroup3() throws IOException {
+    testOneFile("/MoreThanABlockGroup3",
+        blockSize * dataBlocks * 3 + cellSize * dataBlocks
+        + cellSize + 123);
   }
 
   private int stripeDataSize() {
@@ -193,7 +197,10 @@ public class TestDFSStripedOutputStream {
     LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
 
     for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
-      LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock);
+      assert firstBlock instanceof LocatedStripedBlock;
+      LocatedBlock[] blocks = StripedBlockUtil.
+          parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
+              cellSize, dataBlocks, parityBlocks);
       List<LocatedBlock> oneGroup = Arrays.asList(blocks);
       blockGroupList.add(oneGroup);
     }
@@ -205,12 +212,6 @@ public class TestDFSStripedOutputStream {
       byte[][] dataBlockBytes = new byte[dataBlocks][];
       byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
 
-      //calculate the size of this block group
-      int lenOfBlockGroup = group < blockGroupList.size() - 1 ?
-          blockSize * dataBlocks :
-          writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks;
-      int intactStripes = lenOfBlockGroup / stripeDataSize();
-      int lastStripeLen = lenOfBlockGroup % stripeDataSize();
 
       //for each block, use BlockReader to read data
       for (int i = 0; i < blockList.size(); i++) {
@@ -223,25 +224,17 @@ public class TestDFSStripedOutputStream {
         InetSocketAddress targetAddr = NetUtils.createSocketAddr(
             nodes[0].getXferAddr());
 
-        int lenOfCell = cellSize;
-        if (i == lastStripeLen / cellSize) {
-          lenOfCell = lastStripeLen % cellSize;
-        } else if (i > lastStripeLen / cellSize) {
-          lenOfCell = 0;
-        }
-        int lenOfBlock = cellSize * intactStripes + lenOfCell;
-        byte[] blockBytes = new byte[lenOfBlock];
+        byte[] blockBytes = new byte[(int)block.getNumBytes()];
         if (i < dataBlocks) {
           dataBlockBytes[i] = blockBytes;
         } else {
           parityBlockBytes[i - dataBlocks] = blockBytes;
         }
 
-        if (lenOfBlock == 0) {
+        if (block.getNumBytes() == 0) {
           continue;
         }
 
-        block.setNumBytes(lenOfBlock);
         BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)).
             setFileName(src).
             setBlock(block).
@@ -276,33 +269,33 @@ public class TestDFSStripedOutputStream {
               }
             }).build();
 
-        blockReader.readAll(blockBytes, 0, lenOfBlock);
+        blockReader.readAll(blockBytes, 0, (int)block.getNumBytes());
         blockReader.close();
       }
 
       //check if we write the data correctly
-      for (int i = 0; i < dataBlockBytes.length; i++) {
-        byte[] cells = dataBlockBytes[i];
-        if (cells == null) {
+      for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; blkIdxInGroup++) {
+        byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
+        if (actualBlkBytes == null) {
           continue;
         }
-        for (int j = 0; j < cells.length; j++) {
+        for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
           byte expected;
           //calculate the postion of this byte in the file
-          long pos = group * dataBlocks * blockSize
-              + (i * cellSize + j / cellSize * cellSize * dataBlocks)
-              + j % cellSize;
-          if (pos >= writeBytes) {
+          long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
+              dataBlocks, posInBlk, blkIdxInGroup) +
+              group * blockSize * dataBlocks;
+          if (posInFile >= writeBytes) {
             expected = 0;
           } else {
-            expected = getByte(pos);
+            expected = getByte(posInFile);
           }
 
-          if (expected != cells[j]) {
-            Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected
+          if (expected != actualBlkBytes[posInBlk]) {
+            Assert.fail("Unexpected byte " + actualBlkBytes[posInBlk] + ", expect " + expected
                 + ". Block group index is " + group +
-                ", stripe index is " + j / cellSize +
-                ", cell index is " + i + ", byte index is " + j % cellSize);
+                ", stripe index is " + posInBlk / cellSize +
+                ", cell index is " + blkIdxInGroup + ", byte index is " + posInBlk % cellSize);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
index 849e12e..90488c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -21,10 +21,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -34,10 +31,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -54,17 +50,18 @@ public class TestReadStripedFile {
   private DistributedFileSystem fs;
   private final Path dirPath = new Path("/striped");
   private Path filePath = new Path(dirPath, "file");
-  private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS;
-  private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
   private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
   private final int NUM_STRIPE_PER_BLOCK = 2;
-  private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE;
+  private final int BLOCKSIZE = NUM_STRIPE_PER_BLOCK * DATA_BLK_NUM * CELLSIZE;
 
   @Before
   public void setup() throws IOException {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
     SimulatedFSDataset.setFactory(conf);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE)
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(BLK_GROUP_SIZE)
         .build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
@@ -77,72 +74,14 @@ public class TestReadStripedFile {
     }
   }
 
-  private LocatedStripedBlock createDummyLocatedBlock() {
-    final long blockGroupID = -1048576;
-    DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
-    String[] storageIDs = new String[TOTAL_SIZE];
-    StorageType[] storageTypes = new StorageType[TOTAL_SIZE];
-    int[] indices = new int[TOTAL_SIZE];
-    for (int i = 0; i < TOTAL_SIZE; i++) {
-      locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId());
-      storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid();
-      storageTypes[i] = StorageType.DISK;
-      indices[i] = (i + 2) % GROUP_SIZE;
-    }
-    return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
-        locs, storageIDs, storageTypes, indices, 0, false, null);
-  }
-
-  @Test
-  public void testParseDummyStripedBlock() {
-    LocatedStripedBlock lsb = createDummyLocatedBlock();
-    LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup(
-        lsb, GROUP_SIZE, CELLSIZE);
-    assertEquals(GROUP_SIZE, blocks.length);
-    for (int j = 0; j < GROUP_SIZE; j++) {
-      assertFalse(blocks[j].isStriped());
-      assertEquals(j,
-          BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock()));
-      assertEquals(j * CELLSIZE, blocks[j].getStartOffset());
-    }
-  }
-
-  @Test
-  public void testParseStripedBlock() throws Exception {
-    final int numBlocks = 4;
-    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
-        NUM_STRIPE_PER_BLOCK);
-    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
-        filePath.toString(), 0, BLOCKSIZE * numBlocks);
-
-    assertEquals(4, lbs.locatedBlockCount());
-    List<LocatedBlock> lbList = lbs.getLocatedBlocks();
-    for (LocatedBlock lb : lbList) {
-      assertTrue(lb.isStriped());
-    }
-
-    for (int i = 0; i < numBlocks; i++) {
-      LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i));
-      LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
-          GROUP_SIZE, CELLSIZE);
-      assertEquals(GROUP_SIZE, blks.length);
-      for (int j = 0; j < GROUP_SIZE; j++) {
-        assertFalse(blks[j].isStriped());
-        assertEquals(j,
-            BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock()));
-        assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset());
-      }
-    }
-  }
-
   /**
    * Test {@link DFSStripedInputStream#getBlockAt(long)}
    */
   @Test
   public void testGetBlock() throws Exception {
     final int numBlocks = 4;
-    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
-        NUM_STRIPE_PER_BLOCK);
+    DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK, true);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
         filePath.toString(), 0, BLOCKSIZE * numBlocks);
     final DFSStripedInputStream in =
@@ -151,9 +90,9 @@ public class TestReadStripedFile {
     List<LocatedBlock> lbList = lbs.getLocatedBlocks();
     for (LocatedBlock aLbList : lbList) {
       LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
-      LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
-          GROUP_SIZE, CELLSIZE);
-      for (int j = 0; j < GROUP_SIZE; j++) {
+      LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
+          CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
         LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
         assertEquals(blks[j].getBlock(), refreshed.getBlock());
         assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
@@ -165,15 +104,16 @@ public class TestReadStripedFile {
   @Test
   public void testPread() throws Exception {
     final int numBlocks = 4;
-    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
-        NUM_STRIPE_PER_BLOCK);
+    DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK, true);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
         filePath.toString(), 0, BLOCKSIZE);
 
     assert lbs.get(0) instanceof LocatedStripedBlock;
     LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
-    for (int i = 0; i < GROUP_SIZE; i++) {
-      Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE,
+    for (int i = 0; i < DATA_BLK_NUM; i++) {
+      Block blk = new Block(bg.getBlock().getBlockId() + i,
+          NUM_STRIPE_PER_BLOCK * CELLSIZE,
           bg.getBlock().getGenerationStamp());
       blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
       cluster.injectBlocks(i, Arrays.asList(blk),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 27df1cd..6bb1162 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -34,11 +34,13 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -53,6 +55,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
 import static org.junit.Assert.assertEquals;
 
 public class TestAddStripedBlocks {
@@ -284,4 +288,107 @@ public class TestAddStripedBlocks {
       Assert.assertEquals(GROUP_SIZE - i - 1, indices[i]);
     }
   }
+
+  @Test
+  public void testCheckStripedReplicaCorrupt() throws Exception {
+    final int numBlocks = 4;
+    final int numStripes = 4;
+    final Path filePath = new Path("/corrupt");
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+    DFSTestUtil.createStripedFile(cluster, filePath, null,
+        numBlocks, numStripes, false);
+
+    INodeFile fileNode = ns.getFSDirectory().getINode(filePath.toString()).
+        asFile();
+    Assert.assertTrue(fileNode.isStriped());
+    BlockInfoStriped stored = fileNode.getStripedBlocksFeature().getBlocks()[0];
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(0, ns.getCorruptReplicaBlocks());
+
+    // Now send a block report with correct size
+    DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+    final Block reported = new Block(stored);
+    reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE);
+    StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+        .makeReportForReceivedBlock(reported,
+            ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(0).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(0, ns.getCorruptReplicaBlocks());
+
+    // Now send a block report with wrong size
+    reported.setBlockId(stored.getBlockId() + 1);
+    reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE - 1);
+    reports = DFSTestUtil.makeReportForReceivedBlock(reported,
+            ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(1).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
+
+    // Now send a parity block report with correct size
+    reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS);
+    reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE);
+    reports = DFSTestUtil.makeReportForReceivedBlock(reported,
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(2).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
+
+    // Now send a parity block report with wrong size
+    reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS);
+    reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 1);
+    reports = DFSTestUtil.makeReportForReceivedBlock(reported,
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(2, ns.getCorruptReplicaBlocks());
+
+    // Now change the size of stored block, and test verifying the last
+    // block size
+    stored.setNumBytes(stored.getNumBytes() + 10);
+    reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS + 2);
+    reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE);
+    reports = DFSTestUtil.makeReportForReceivedBlock(reported,
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
+
+    // Now send a parity block report with correct size based on adjusted
+    // size of stored block
+    /** Now stored block has {@link numStripes} full stripes + a cell + 10 */
+    stored.setNumBytes(stored.getNumBytes() + BLOCK_STRIPED_CELL_SIZE);
+    reported.setBlockId(stored.getBlockId());
+    reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE);
+    reports = DFSTestUtil.makeReportForReceivedBlock(reported,
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(0).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
+
+    reported.setBlockId(stored.getBlockId() + 1);
+    reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 10);
+    reports = DFSTestUtil.makeReportForReceivedBlock(reported,
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(1).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
+
+    reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS);
+    reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE);
+    reports = DFSTestUtil.makeReportForReceivedBlock(reported,
+        ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+    ns.processIncrementalBlockReport(
+        cluster.getDataNodes().get(2).getDatanodeId(), reports[0]);
+    BlockManagerTestUtil.updateState(ns.getBlockManager());
+    Assert.assertEquals(3, ns.getCorruptReplicaBlocks());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index 4292f9a..ea18c3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -78,7 +78,8 @@ public class TestRecoverStripedBlocks {
   @Test
   public void testMissingStripedBlock() throws Exception {
     final int numBlocks = 4;
-    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1);
+    DFSTestUtil.createStripedFile(cluster, filePath,
+        dirPath, numBlocks, 1, true);
 
     // make sure the file is complete in NN
     final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed68791c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
new file mode 100644
index 0000000..ec0b1bb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestStripedBlockUtil {
+  private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
+  private LocatedStripedBlock createDummyLocatedBlock() {
+    final long blockGroupID = -1048576;
+    DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_SIZE];
+    String[] storageIDs = new String[BLK_GROUP_SIZE];
+    StorageType[] storageTypes = new StorageType[BLK_GROUP_SIZE];
+    int[] indices = new int[BLK_GROUP_SIZE];
+    for (int i = 0; i < BLK_GROUP_SIZE; i++) {
+      indices[i] = (i + 2) % DATA_BLK_NUM;
+      // Location port always equal to logical index of a block,
+      // for easier verification
+      locs[i] = DFSTestUtil.getLocalDatanodeInfo(indices[i]);
+      storageIDs[i] = locs[i].getDatanodeUuid();
+      storageTypes[i] = StorageType.DISK;
+    }
+    return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
+        locs, storageIDs, storageTypes, indices, 0, false, null);
+  }
+
+  @Test
+  public void testParseDummyStripedBlock() {
+    LocatedStripedBlock lsb = createDummyLocatedBlock();
+    LocatedBlock[] blocks = parseStripedBlockGroup(
+        lsb, CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
+    assertEquals(DATA_BLK_NUM + PARITY_BLK_NUM, blocks.length);
+    for (int i = 0; i < DATA_BLK_NUM; i++) {
+      assertFalse(blocks[i].isStriped());
+      assertEquals(i,
+          BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock()));
+      assertEquals(i * CELLSIZE, blocks[i].getStartOffset());
+      assertEquals(1, blocks[i].getLocations().length);
+      assertEquals(i, blocks[i].getLocations()[0].getIpcPort());
+      assertEquals(i, blocks[i].getLocations()[0].getXferPort());
+    }
+  }
+
+  private void verifyInternalBlocks (long numBytesInGroup, long[] expected) {
+    for (int i = 1; i < BLK_GROUP_SIZE; i++) {
+      assertEquals(expected[i],
+          getInternalBlockLength(numBytesInGroup, CELLSIZE, DATA_BLK_NUM, i));
+    }
+  }
+
+  @Test
+  public void testGetInternalBlockLength () {
+    // A small delta that is smaller than a cell
+    final int delta = 10;
+    assert delta < CELLSIZE;
+
+    // Block group is smaller than a cell
+    verifyInternalBlocks(CELLSIZE - delta,
+        new long[] {CELLSIZE - delta, 0, 0, 0, 0, 0,
+            CELLSIZE - delta, CELLSIZE - delta, CELLSIZE - delta});
+
+    // Block group is exactly as large as a cell
+    verifyInternalBlocks(CELLSIZE,
+        new long[] {CELLSIZE, 0, 0, 0, 0, 0,
+            CELLSIZE, CELLSIZE, CELLSIZE});
+
+    // Block group is a little larger than a cell
+    verifyInternalBlocks(CELLSIZE + delta,
+        new long[] {CELLSIZE, delta, 0, 0, 0, 0,
+            CELLSIZE, CELLSIZE, CELLSIZE});
+
+    // Block group contains multiple stripes and ends at stripe boundary
+    verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE,
+        new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+            2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+            2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
+
+    // Block group contains multiple stripes and ends at cell boundary
+    // (not ending at stripe boundary)
+    verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE + CELLSIZE,
+        new long[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+            2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+            3 * CELLSIZE, 3 * CELLSIZE, 3 * CELLSIZE});
+
+    // Block group contains multiple stripes and doesn't end at cell boundary
+    verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE - delta,
+        new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+            2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE - delta,
+            2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
+  }
+
+}


Mime
View raw message