hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [02/50] [abbrv] hadoop git commit: HDFS-10999. Introduce separate stats for Replicated and Erasure Coded Blocks apart from the current Aggregated stats. (Manoj Govindassamy via lei)
Date Wed, 21 Jun 2017 23:23:49 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 038b6ce..5075c05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -110,10 +110,13 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
+import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
+import org.apache.hadoop.hdfs.protocol.BlocksStats;
 import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -1651,6 +1654,50 @@ public class DFSTestUtil {
   }
 
   /**
+   * Verify the aggregated {@link ClientProtocol#getStats()} block counts equal
+   * the sum of {@link ClientProtocol#getBlocksStats()} and
+   * {@link ClientProtocol#getECBlockGroupsStats()}.
+   * @throws Exception
+   */
+  public static  void verifyClientStats(Configuration conf,
+      MiniDFSCluster cluster) throws Exception {
+    ClientProtocol client = NameNodeProxies.createProxy(conf,
+        cluster.getFileSystem(0).getUri(),
+        ClientProtocol.class).getProxy();
+    long[] aggregatedStats = cluster.getNameNode().getRpcServer().getStats();
+    BlocksStats blocksStats =
+        client.getBlocksStats();
+    ECBlockGroupsStats ecBlockGroupsStats = client.getECBlockGroupsStats();
+
+    assertEquals("Under replicated stats not matching!",
+        aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX],
+        aggregatedStats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]);
+    assertEquals("Low redundancy stats not matching!",
+        aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX],
+        blocksStats.getLowRedundancyBlocksStat() +
+            ecBlockGroupsStats.getLowRedundancyBlockGroupsStat());
+    assertEquals("Corrupt blocks stats not matching!",
+        aggregatedStats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX],
+        blocksStats.getCorruptBlocksStat() +
+            ecBlockGroupsStats.getCorruptBlockGroupsStat());
+    assertEquals("Missing blocks stats not matching!",
+        aggregatedStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX],
+        blocksStats.getMissingReplicaBlocksStat() +
+            ecBlockGroupsStats.getMissingBlockGroupsStat());
+    assertEquals("Missing blocks with replication factor one not matching!",
+        aggregatedStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX],
+        blocksStats.getMissingReplicationOneBlocksStat());
+    assertEquals("Bytes in future blocks stats not matching!",
+        aggregatedStats[ClientProtocol.GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX],
+        blocksStats.getBytesInFutureBlocksStat() +
+            ecBlockGroupsStats.getBytesInFutureBlockGroupsStat());
+    assertEquals("Pending deletion blocks stats not matching!",
+        aggregatedStats[ClientProtocol.GET_STATS_PENDING_DELETION_BLOCKS_IDX],
+        blocksStats.getPendingDeletionBlocksStat() +
+            ecBlockGroupsStats.getPendingDeletionBlockGroupsStat());
+  }
+
+  /**
    * Helper function to create a key in the Key Provider. Defaults
    * to the first indexed NameNode's Key Provider.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
index 1d9d402..3e9d812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
@@ -281,7 +281,7 @@ public class TestFileCorruption {
         @Override public Boolean get() {
           try {
             return cluster.getNamesystem().getBlockManager()
-                .getUnderReplicatedBlocksCount() == 1;
+                .getLowRedundancyBlocksCount() == 1;
           } catch (Exception e) {
             e.printStackTrace();
             return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
index e0dfb4a..7c536e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
@@ -549,7 +549,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
     FileSystem fileSys = getCluster().getFileSystem(0);
     FSNamesystem ns = getCluster().getNamesystem(0);
 
-    writeFile(fileSys, file, replicas, 1);
+    writeFile(fileSys, file, replicas, 25);
 
     DatanodeInfo nodeOutofService = takeNodeOutofService(0,
         getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
index e225141..ca2fe92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
@@ -98,7 +98,7 @@ public class TestMissingBlocksAlert {
         Thread.sleep(100);
       }
       assertTrue(dfs.getMissingBlocksCount() == 1);
-      assertEquals(4, dfs.getUnderReplicatedBlocksCount());
+      assertEquals(4, dfs.getLowRedundancyBlocksCount());
       assertEquals(3, bm.getUnderReplicatedNotMissingBlocks());
 
       MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -117,7 +117,7 @@ public class TestMissingBlocksAlert {
         Thread.sleep(100);
       }
 
-      assertEquals(2, dfs.getUnderReplicatedBlocksCount());
+      assertEquals(2, dfs.getLowRedundancyBlocksCount());
       assertEquals(2, bm.getUnderReplicatedNotMissingBlocks());
 
       Assert.assertEquals(0, (long)(Long) mbs.getAttribute(mxbeanName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
index 61e69f3..2413918 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
@@ -17,18 +17,24 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.Random;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -53,10 +59,19 @@ public class TestComputeInvalidateWork {
   private FSNamesystem namesystem;
   private BlockManager bm;
   private DatanodeDescriptor[] nodes;
+  private ErasureCodingPolicy ecPolicy;
+  private DistributedFileSystem fs;
+  private Path ecFile;
+  private int totalBlockGroups, blockGroupSize, stripesPerBlock, cellSize;
+  private LocatedStripedBlock locatedStripedBlock;
 
   @Before
   public void setup() throws Exception {
+    ecPolicy = SystemErasureCodingPolicies.getByID(
+        SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
     conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+        ecPolicy.getName());
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
         .build();
     cluster.waitActive();
@@ -65,6 +80,25 @@ public class TestComputeInvalidateWork {
     nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes();
     BlockManagerTestUtil.stopRedundancyThread(bm);
     assertEquals(nodes.length, NUM_OF_DATANODES);
+
+    // Create a striped file
+    Path ecDir = new Path("/ec");
+    fs = cluster.getFileSystem();
+    fs.mkdirs(ecDir);
+    fs.getClient().setErasureCodingPolicy(ecDir.toString(), ecPolicy.getName());
+    ecFile = new Path(ecDir, "ec-file");
+    stripesPerBlock = 2;
+    cellSize = ecPolicy.getCellSize();
+    int blockSize = stripesPerBlock * cellSize;
+    blockGroupSize =  ecPolicy.getNumDataUnits() * blockSize;
+    totalBlockGroups = 4;
+    DFSTestUtil.createStripedFile(cluster, ecFile, ecDir, totalBlockGroups,
+        stripesPerBlock, false, ecPolicy);
+    LocatedBlocks lbs = cluster.getFileSystem().getClient().
+        getNamenode().getBlockLocations(
+        ecFile.toString(), 0, blockGroupSize);
+    assert lbs.get(0) instanceof LocatedStripedBlock;
+    locatedStripedBlock = (LocatedStripedBlock)(lbs.get(0));
   }
 
   @After
@@ -75,12 +109,28 @@ public class TestComputeInvalidateWork {
     }
   }
 
+  private void verifyInvalidationWorkCounts(int blockInvalidateLimit) {
+    assertEquals(blockInvalidateLimit * NUM_OF_DATANODES,
+        bm.computeInvalidateWork(NUM_OF_DATANODES + 1));
+    assertEquals(blockInvalidateLimit * NUM_OF_DATANODES,
+        bm.computeInvalidateWork(NUM_OF_DATANODES));
+    assertEquals(blockInvalidateLimit * (NUM_OF_DATANODES - 1),
+        bm.computeInvalidateWork(NUM_OF_DATANODES - 1));
+    int workCount = bm.computeInvalidateWork(1);
+    if (workCount == 1) {
+      assertEquals(blockInvalidateLimit + 1, bm.computeInvalidateWork(2));
+    } else {
+      assertEquals(workCount, blockInvalidateLimit);
+      assertEquals(2, bm.computeInvalidateWork(2));
+    }
+  }
+
   /**
    * Test if {@link BlockManager#computeInvalidateWork(int)}
-   * can schedule invalidate work correctly 
+   * can schedule invalidate work correctly for the replicas.
    */
   @Test(timeout=120000)
-  public void testCompInvalidate() throws Exception {
+  public void testComputeInvalidateReplicas() throws Exception {
     final int blockInvalidateLimit = bm.getDatanodeManager()
         .getBlockInvalidateLimit();
     namesystem.writeLock();
@@ -92,20 +142,66 @@ public class TestComputeInvalidateWork {
           bm.addToInvalidates(block, nodes[i]);
         }
       }
-      
-      assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
-          bm.computeInvalidateWork(NUM_OF_DATANODES+1));
-      assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
-          bm.computeInvalidateWork(NUM_OF_DATANODES));
-      assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
-          bm.computeInvalidateWork(NUM_OF_DATANODES-1));
-      int workCount = bm.computeInvalidateWork(1);
-      if (workCount == 1) {
-        assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
-      } else {
-        assertEquals(workCount, blockInvalidateLimit);
-        assertEquals(2, bm.computeInvalidateWork(2));
+      verifyInvalidationWorkCounts(blockInvalidateLimit);
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  /**
+   * Test if {@link BlockManager#computeInvalidateWork(int)}
+   * can schedule invalidate work correctly for the striped block groups.
+   */
+  @Test(timeout=120000)
+  public void testComputeInvalidateStripedBlockGroups() throws Exception {
+    final int blockInvalidateLimit =
+        bm.getDatanodeManager().getBlockInvalidateLimit();
+    namesystem.writeLock();
+    try {
+      int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
+      for (int i = 0; i < nodeCount; i++) {
+        for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) {
+          Block blk = new Block(locatedStripedBlock.getBlock().getBlockId() +
+              (i * 10 + j), stripesPerBlock * cellSize,
+              locatedStripedBlock.getBlock().getGenerationStamp());
+          bm.addToInvalidates(blk, nodes[i]);
+        }
+      }
+      verifyInvalidationWorkCounts(blockInvalidateLimit);
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  /**
+   * Test if {@link BlockManager#computeInvalidateWork(int)}
+   * can schedule invalidate work correctly for both replicas and striped
+   * block groups, combined.
+   */
+  @Test(timeout=120000)
+  public void testComputeInvalidate() throws Exception {
+    final int blockInvalidateLimit =
+        bm.getDatanodeManager().getBlockInvalidateLimit();
+    final Random random = new Random(System.currentTimeMillis());
+    namesystem.writeLock();
+    try {
+      int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
+      for (int i = 0; i < nodeCount; i++) {
+        for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) {
+          if (random.nextBoolean()) {
+            Block stripedBlock = new Block(
+                locatedStripedBlock.getBlock().getBlockId() + (i * 10 + j),
+                stripesPerBlock * cellSize,
+                locatedStripedBlock.getBlock().getGenerationStamp());
+            bm.addToInvalidates(stripedBlock, nodes[i]);
+          } else {
+            Block replica = new Block(i * (blockInvalidateLimit + 1) + j, 0,
+                GenerationStamp.LAST_RESERVED_STAMP);
+            bm.addToInvalidates(replica, nodes[i]);
+          }
+        }
       }
+      verifyInvalidationWorkCounts(blockInvalidateLimit);
     } finally {
       namesystem.writeUnlock();
     }
@@ -129,6 +225,11 @@ public class TestComputeInvalidateWork {
 
       Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP);
       bm.addToInvalidates(block, nodes[0]);
+      Block stripedBlock = new Block(
+          locatedStripedBlock.getBlock().getBlockId() + 100,
+          stripesPerBlock * cellSize,
+          locatedStripedBlock.getBlock().getGenerationStamp());
+      bm.addToInvalidates(stripedBlock, nodes[0]);
       bm.getDatanodeManager().registerDatanode(dnr);
 
       // Since UUID has changed, the invalidation work should be skipped
@@ -145,26 +246,37 @@ public class TestComputeInvalidateWork {
     final DistributedFileSystem dfs = cluster.getFileSystem();
     final Path path = new Path("/testRR");
     // Create a file and shutdown the DNs, which populates InvalidateBlocks
+    short totalReplicas = NUM_OF_DATANODES;
     DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(),
-        (short) NUM_OF_DATANODES, 0xED0ED0);
+        totalReplicas, 0xED0ED0);
     DFSTestUtil.waitForReplication(dfs, path, (short) NUM_OF_DATANODES, 12000);
     for (DataNode dn : cluster.getDataNodes()) {
       dn.shutdown();
     }
     dfs.delete(path, false);
+    dfs.delete(ecFile, false);
     namesystem.writeLock();
     InvalidateBlocks invalidateBlocks;
-    int expected = NUM_OF_DATANODES;
+    int totalStripedDataBlocks = totalBlockGroups * (ecPolicy.getNumDataUnits()
+        + ecPolicy.getNumParityUnits());
+    int expected = totalReplicas + totalStripedDataBlocks;
     try {
       invalidateBlocks = (InvalidateBlocks) Whitebox
           .getInternalState(cluster.getNamesystem().getBlockManager(),
               "invalidateBlocks");
-      assertEquals("Expected invalidate blocks to be the number of DNs",
+      assertEquals("Invalidate blocks should include both Replicas and " +
+          "Striped BlockGroups!",
           (long) expected, invalidateBlocks.numBlocks());
+      assertEquals("Unexpected invalidate count for replicas!",
+          totalReplicas, invalidateBlocks.getBlocksStat());
+      assertEquals("Unexpected invalidate count for striped block groups!",
+          totalStripedDataBlocks, invalidateBlocks.getECBlockGroupsStat());
     } finally {
       namesystem.writeUnlock();
     }
     // Re-register each DN and see that it wipes the invalidation work
+    int totalBlockGroupsPerDataNode = totalBlockGroups;
+    int totalReplicasPerDataNode = totalReplicas / NUM_OF_DATANODES;
     for (DataNode dn : cluster.getDataNodes()) {
       DatanodeID did = dn.getDatanodeId();
       DatanodeRegistration reg = new DatanodeRegistration(
@@ -175,7 +287,7 @@ public class TestComputeInvalidateWork {
       namesystem.writeLock();
       try {
         bm.getDatanodeManager().registerDatanode(reg);
-        expected--;
+        expected -= (totalReplicasPerDataNode + totalBlockGroupsPerDataNode);
         assertEquals("Expected number of invalidate blocks to decrease",
             (long) expected, invalidateBlocks.numBlocks());
       } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
index 4bdaaac..3f8a5cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
@@ -25,14 +25,13 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 import org.junit.Test;
 
@@ -45,88 +44,130 @@ import org.junit.Test;
  */
 public class TestCorruptReplicaInfo {
   
-  private static final Log LOG = 
-                           LogFactory.getLog(TestCorruptReplicaInfo.class);
-  
-  private final Map<Long, Block> block_map =
-    new HashMap<Long, Block>();  
-    
-  // Allow easy block creation by block id
-  // Return existing block if one with same block id already exists
-  private Block getBlock(Long block_id) {
-    if (!block_map.containsKey(block_id)) {
-      block_map.put(block_id, new Block(block_id,0,0));
+  private static final Log LOG = LogFactory.getLog(
+      TestCorruptReplicaInfo.class);
+  private final Map<Long, Block> replicaMap = new HashMap<>();
+  private final Map<Long, Block> stripedBlocksMap = new HashMap<>();
+
+  // Allow easy block creation by block id. Return existing
+  // replica block if one with same block id already exists.
+  private Block getReplica(Long blockId) {
+    if (!replicaMap.containsKey(blockId)) {
+      replicaMap.put(blockId, new Block(blockId, 0, 0));
     }
-    
-    return block_map.get(block_id);
+    return replicaMap.get(blockId);
   }
-  
-  private Block getBlock(int block_id) {
-    return getBlock((long)block_id);
+
+  private Block getReplica(int blkId) {
+    return getReplica(Long.valueOf(blkId));
+  }
+
+  private Block getStripedBlock(int blkId) {
+    Long stripedBlockId = (1L << 63) + blkId;
+    assertTrue(BlockIdManager.isStripedBlockID(stripedBlockId));
+    if (!stripedBlocksMap.containsKey(stripedBlockId)) {
+      stripedBlocksMap.put(stripedBlockId, new Block(stripedBlockId, 1024, 0));
+    }
+    return stripedBlocksMap.get(stripedBlockId);
+  }
+
+  private void verifyCorruptBlocksCount(CorruptReplicasMap corruptReplicasMap,
+      long expectedReplicaCount, long expectedStripedBlockCount) {
+    long totalExpectedCorruptBlocks = expectedReplicaCount +
+        expectedStripedBlockCount;
+    assertEquals("Unexpected total corrupt blocks count!",
+        totalExpectedCorruptBlocks, corruptReplicasMap.size());
+    assertEquals("Unexpected replica blocks count!",
+        expectedReplicaCount, corruptReplicasMap.getCorruptBlocksStat());
+    assertEquals("Unexpected striped blocks count!",
+        expectedStripedBlockCount,
+        corruptReplicasMap.getCorruptECBlockGroupsStat());
   }
   
   @Test
-  public void testCorruptReplicaInfo() throws IOException, 
-                                       InterruptedException {
-    
-      CorruptReplicasMap crm = new CorruptReplicasMap();
-      
-      // Make sure initial values are returned correctly
-      assertEquals("Number of corrupt blocks must initially be 0", 0, crm.size());
-      assertNull("Param n cannot be less than 0", crm.getCorruptReplicaBlockIdsForTesting(-1, null));
-      assertNull("Param n cannot be greater than 100", crm.getCorruptReplicaBlockIdsForTesting(101, null));
-      long[] l = crm.getCorruptReplicaBlockIdsForTesting(0, null);
-      assertNotNull("n = 0 must return non-null", l);
-      assertEquals("n = 0 must return an empty list", 0, l.length);
-
-      // create a list of block_ids. A list is used to allow easy validation of the
-      // output of getCorruptReplicaBlockIds
-      int NUM_BLOCK_IDS = 140;
-      List<Long> block_ids = new LinkedList<Long>();
-      for (int i=0;i<NUM_BLOCK_IDS;i++) {
-        block_ids.add((long)i);
-      }
-      
-      DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
-      DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
-      
-      addToCorruptReplicasMap(crm, getBlock(0), dn1);
-      assertEquals("Number of corrupt blocks not returning correctly",
-                   1, crm.size());
-      addToCorruptReplicasMap(crm, getBlock(1), dn1);
-      assertEquals("Number of corrupt blocks not returning correctly",
-                   2, crm.size());
-      
-      addToCorruptReplicasMap(crm, getBlock(1), dn2);
-      assertEquals("Number of corrupt blocks not returning correctly",
-                   2, crm.size());
-      
-      crm.removeFromCorruptReplicasMap(getBlock(1));
-      assertEquals("Number of corrupt blocks not returning correctly",
-                   1, crm.size());
-      
-      crm.removeFromCorruptReplicasMap(getBlock(0));
-      assertEquals("Number of corrupt blocks not returning correctly",
-                   0, crm.size());
-      
-      for (Long block_id: block_ids) {
-        addToCorruptReplicasMap(crm, getBlock(block_id), dn1);
-      }
-            
-      assertEquals("Number of corrupt blocks not returning correctly",
-                   NUM_BLOCK_IDS, crm.size());
-      
-      assertTrue("First five block ids not returned correctly ",
-                Arrays.equals(new long[]{0,1,2,3,4},
-                              crm.getCorruptReplicaBlockIdsForTesting(5, null)));
-                              
-      LOG.info(crm.getCorruptReplicaBlockIdsForTesting(10, 7L));
-      LOG.info(block_ids.subList(7, 18));
-
-      assertTrue("10 blocks after 7 not returned correctly ",
-                Arrays.equals(new long[]{8,9,10,11,12,13,14,15,16,17},
-                              crm.getCorruptReplicaBlockIdsForTesting(10, 7L)));
-      
+  public void testCorruptReplicaInfo()
+      throws IOException, InterruptedException {
+    CorruptReplicasMap crm = new CorruptReplicasMap();
+
+    // Make sure initial values are returned correctly
+    assertEquals("Total number of corrupt blocks must initially be 0!",
+        0, crm.size());
+    assertEquals("Number of corrupt replicas must initially be 0!",
+        0, crm.getCorruptBlocksStat());
+    assertEquals("Number of corrupt striped block groups must initially be 0!",
+        0, crm.getCorruptECBlockGroupsStat());
+    assertNull("Param n cannot be less than 0",
+        crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, -1, null));
+    assertNull("Param n cannot be greater than 100",
+        crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 101, null));
+    long[] l = crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 0, null);
+    assertNotNull("n = 0 must return non-null", l);
+    assertEquals("n = 0 must return an empty list", 0, l.length);
+
+    // Create a list of block ids. A list is used to allow easy
+    // validation of the output of getCorruptReplicaBlockIds.
+    final int blockCount = 140;
+    long[] replicaIds = new long[blockCount];
+    long[] stripedIds = new long[blockCount];
+    for (int i = 0; i < blockCount; i++) {
+      replicaIds[i] = getReplica(i).getBlockId();
+      stripedIds[i] = getStripedBlock(i).getBlockId();
+    }
+
+    DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
+    DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
+
+    // Add to corrupt blocks map.
+    // Replicas
+    addToCorruptReplicasMap(crm, getReplica(0), dn1);
+    verifyCorruptBlocksCount(crm, 1, 0);
+    addToCorruptReplicasMap(crm, getReplica(1), dn1);
+    verifyCorruptBlocksCount(crm, 2, 0);
+    addToCorruptReplicasMap(crm, getReplica(1), dn2);
+    verifyCorruptBlocksCount(crm, 2, 0);
+
+    // Striped blocks
+    addToCorruptReplicasMap(crm, getStripedBlock(0), dn1);
+    verifyCorruptBlocksCount(crm, 2, 1);
+    addToCorruptReplicasMap(crm, getStripedBlock(1), dn1);
+    verifyCorruptBlocksCount(crm, 2, 2);
+    addToCorruptReplicasMap(crm, getStripedBlock(1), dn2);
+    verifyCorruptBlocksCount(crm, 2, 2);
+
+    // Remove from corrupt blocks map.
+    // Replicas
+    crm.removeFromCorruptReplicasMap(getReplica(1));
+    verifyCorruptBlocksCount(crm, 1, 2);
+    crm.removeFromCorruptReplicasMap(getReplica(0));
+    verifyCorruptBlocksCount(crm, 0, 2);
+
+    // Striped blocks
+    crm.removeFromCorruptReplicasMap(getStripedBlock(1));
+    verifyCorruptBlocksCount(crm, 0, 1);
+    crm.removeFromCorruptReplicasMap(getStripedBlock(0));
+    verifyCorruptBlocksCount(crm, 0, 0);
+
+    for (int blockId = 0; blockId  < blockCount; blockId++) {
+      addToCorruptReplicasMap(crm, getReplica(blockId), dn1);
+      addToCorruptReplicasMap(crm, getStripedBlock(blockId), dn1);
+    }
+
+    assertEquals("Number of corrupt blocks not returning correctly",
+        2 * blockCount, crm.size());
+    assertTrue("First five corrupt replica blocks ids are not right!",
+        Arrays.equals(Arrays.copyOfRange(replicaIds, 0, 5),
+            crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 5, null)));
+    assertTrue("First five corrupt striped blocks ids are not right!",
+        Arrays.equals(Arrays.copyOfRange(stripedIds, 0, 5),
+            crm.getCorruptBlockIdsForTesting(BlockType.STRIPED, 5, null)));
+
+    assertTrue("10 replica blocks after 7 not returned correctly!",
+        Arrays.equals(Arrays.copyOfRange(replicaIds, 7, 17),
+            crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 10, 7L)));
+    assertTrue("10 striped blocks after 7 not returned correctly!",
+        Arrays.equals(Arrays.copyOfRange(stripedIds, 7, 17),
+            crm.getCorruptBlockIdsForTesting(BlockType.STRIPED,
+                10, getStripedBlock(7).getBlockId())));
   }
   
   private static void addToCorruptReplicasMap(CorruptReplicasMap crm,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
index d853762..c65fc64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
@@ -45,9 +45,32 @@ public class TestLowRedundancyBlockQueues {
     return sblk;
   }
 
+  private void verifyBlockStats(LowRedundancyBlocks queues,
+      int lowRedundancyReplicaCount, int corruptReplicaCount,
+      int corruptReplicationOneCount, int lowRedundancyStripedCount,
+      int corruptStripedCount) {
+    assertEquals("Low redundancy replica count incorrect!",
+        lowRedundancyReplicaCount, queues.getLowRedundancyBlocksStat());
+    assertEquals("Corrupt replica count incorrect!",
+        corruptReplicaCount, queues.getCorruptBlocksStat());
+    assertEquals("Corrupt replica one count incorrect!",
+        corruptReplicationOneCount,
+        queues.getCorruptReplicationOneBlocksStat());
+    assertEquals("Low redundancy striped blocks count incorrect!",
+        lowRedundancyStripedCount, queues.getLowRedundancyECBlockGroupsStat());
+    assertEquals("Corrupt striped blocks count incorrect!",
+        corruptStripedCount, queues.getCorruptECBlockGroupsStat());
+    assertEquals("Low Redundancy count incorrect!",
+        lowRedundancyReplicaCount + lowRedundancyStripedCount,
+        queues.getLowRedundancyBlockCount());
+    assertEquals("LowRedundancyBlocks queue size incorrect!",
+        (lowRedundancyReplicaCount + corruptReplicaCount +
+        lowRedundancyStripedCount + corruptStripedCount), queues.size());
+  }
+
   /**
    * Test that adding blocks with different replication counts puts them
-   * into different queues
+   * into different queues.
    * @throws Throwable if something goes wrong
    */
   @Test
@@ -59,43 +82,45 @@ public class TestLowRedundancyBlockQueues {
     BlockInfo block_corrupt = genBlockInfo(4);
     BlockInfo block_corrupt_repl_one = genBlockInfo(5);
 
-    //add a block with a single entry
+    // Add a block with a single entry
     assertAdded(queues, block1, 1, 0, 3);
-
-    assertEquals(1, queues.getLowRedundancyBlockCount());
-    assertEquals(1, queues.size());
     assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
-    //repeated additions fail
+    verifyBlockStats(queues, 1, 0, 0, 0, 0);
+
+    // Repeated additions fail
     assertFalse(queues.add(block1, 1, 0, 0, 3));
+    verifyBlockStats(queues, 1, 0, 0, 0, 0);
 
-    //add a second block with two replicas
+    // Add a second block with two replicas
     assertAdded(queues, block2, 2, 0, 3);
-    assertEquals(2, queues.getLowRedundancyBlockCount());
-    assertEquals(2, queues.size());
     assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
-    //now try to add a block that is corrupt
+    verifyBlockStats(queues, 2, 0, 0, 0, 0);
+
+    // Now try to add a block that is corrupt
     assertAdded(queues, block_corrupt, 0, 0, 3);
-    assertEquals(3, queues.size());
-    assertEquals(2, queues.getLowRedundancyBlockCount());
-    assertEquals(1, queues.getCorruptBlockSize());
     assertInLevel(queues, block_corrupt,
                   LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+    verifyBlockStats(queues, 2, 1, 0, 0, 0);
 
-    //insert a very insufficiently redundancy block
+    // Insert a very insufficiently redundancy block
     assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
     assertInLevel(queues, block_very_low_redundancy,
                   LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
+    verifyBlockStats(queues, 3, 1, 0, 0, 0);
 
-    //insert a corrupt block with replication factor 1
+    // Insert a corrupt block with replication factor 1
     assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
-    assertEquals(2, queues.getCorruptBlockSize());
-    assertEquals(1, queues.getCorruptReplOneBlockSize());
+    verifyBlockStats(queues, 3, 2, 1, 0, 0);
+
+    // Bump up the expected count for corrupt replica one block from 1 to 3
     queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
-    assertEquals(0, queues.getCorruptReplOneBlockSize());
+    verifyBlockStats(queues, 3, 2, 0, 0, 0);
+
+    // Reduce the expected replicas to 1
     queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
-    assertEquals(1, queues.getCorruptReplOneBlockSize());
+    verifyBlockStats(queues, 3, 2, 1, 0, 0);
     queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
-    assertEquals(2, queues.getCorruptReplOneBlockSize());
+    verifyBlockStats(queues, 2, 3, 2, 0, 0);
   }
 
   @Test
@@ -131,16 +156,18 @@ public class TestLowRedundancyBlockQueues {
         assertInLevel(queues, block,
             LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
       }
+      verifyBlockStats(queues, 0, 0, 0, numUR, 0);
     }
 
     // add a corrupted block
     BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
     assertEquals(numCorrupt, queues.getCorruptBlockSize());
+    verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt);
+
     assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
     numCorrupt++;
-    assertEquals(numUR + numCorrupt, queues.size());
-    assertEquals(numUR, queues.getLowRedundancyBlockCount());
-    assertEquals(numCorrupt, queues.getCorruptBlockSize());
+    verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt);
+
     assertInLevel(queues, block_corrupt,
         LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
index c0b54b0..e21d44e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
@@ -37,38 +37,51 @@ import java.util.Iterator;
 
 
 public class TestUnderReplicatedBlocks {
-  @Test(timeout=60000) // 1 min timeout
-  public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
+  @Test(timeout=120000) // 1 min timeout
+  public void testSetRepIncWithUnderReplicatedBlocks() throws Exception {
     Configuration conf = new HdfsConfiguration();
     final short REPLICATION_FACTOR = 2;
     final String FILE_NAME = "/testFile";
     final Path FILE_PATH = new Path(FILE_NAME);
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR + 1).build();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+        numDataNodes(REPLICATION_FACTOR + 1).build();
     try {
       // create a file with one block with a replication factor of 2
       final FileSystem fs = cluster.getFileSystem();
+      final BlockManager bm = cluster.getNamesystem().getBlockManager();
       DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
       DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
-      
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
+
       // remove one replica from the blocksMap so block becomes under-replicated
       // but the block does not get put into the under-replicated blocks queue
-      final BlockManager bm = cluster.getNamesystem().getBlockManager();
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
       DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock())
           .iterator().next().getDatanodeDescriptor();
       bm.addToInvalidates(b.getLocalBlock(), dn);
+
+
       // Compute the invalidate work in NN, and trigger the heartbeat from DN
       BlockManagerTestUtil.computeAllPendingWork(bm);
       DataNodeTestUtils.triggerHeartbeat(cluster.getDataNode(dn.getIpcPort()));
       // Wait to make sure the DataNode receives the deletion request 
       Thread.sleep(5000);
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
+
       // Remove the record from blocksMap
       bm.blocksMap.removeNode(b.getLocalBlock(), dn);
-      
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
+
       // increment this file's replication factor
       FsShell shell = new FsShell(conf);
-      assertEquals(0, shell.run(new String[]{
-          "-setrep", "-w", Integer.toString(1+REPLICATION_FACTOR), FILE_NAME}));
+      assertEquals(0, shell.run(new String[] {
+          "-setrep", "-w", Integer.toString(1 + REPLICATION_FACTOR),
+          FILE_NAME }));
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
     } finally {
       cluster.shutdown();
     }
@@ -126,25 +139,30 @@ public class TestUnderReplicatedBlocks {
       final BlockManager bm = cluster.getNamesystem().getBlockManager();
       ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
       Iterator<DatanodeStorageInfo> storageInfos =
-          bm.blocksMap.getStorages(b.getLocalBlock())
-          .iterator();
+          bm.blocksMap.getStorages(b.getLocalBlock()).iterator();
       DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
       DatanodeDescriptor secondDn = storageInfos.next().getDatanodeDescriptor();
 
-      bm.getDatanodeManager().removeDatanode(firstDn);
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
 
+      bm.getDatanodeManager().removeDatanode(firstDn);
+      BlockManagerTestUtil.updateState(bm);
       assertEquals(NUM_OF_BLOCKS, bm.getUnderReplicatedNotMissingBlocks());
-      bm.computeDatanodeWork();
+      DFSTestUtil.verifyClientStats(conf, cluster);
 
+      bm.computeDatanodeWork();
       assertTrue("The number of replication work pending before targets are " +
               "determined should be non-negative.",
           (Integer)Whitebox.getInternalState(secondDn,
               "pendingReplicationWithoutTargets") >= 0);
 
+      BlockManagerTestUtil.updateState(bm);
       assertTrue("The number of blocks to be replicated should be less than "
           + "or equal to " + bm.replicationStreamsHardLimit,
           secondDn.getNumberOfBlocksToBeReplicated()
           <= bm.replicationStreamsHardLimit);
+      DFSTestUtil.verifyClientStats(conf, cluster);
     } finally {
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
index 90eb7d1..22cba6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java
@@ -199,7 +199,7 @@ public class TestReadOnlySharedStorage {
     assertThat(numberReplicas.replicasOnStaleNodes(), is(0));
     
     BlockManagerTestUtil.updateState(blockManager);
-    assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L));
+    assertThat(blockManager.getLowRedundancyBlocksCount(), is(0L));
     assertThat(blockManager.getExcessBlocksCount(), is(0L));
   }
   
@@ -238,7 +238,7 @@ public class TestReadOnlySharedStorage {
     
     // The block should be reported as under-replicated
     BlockManagerTestUtil.updateState(blockManager);
-    assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L));
+    assertThat(blockManager.getLowRedundancyBlocksCount(), is(1L));
     
     // The BlockManager should be able to heal the replication count back to 1
     // by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index d268d01..71a9f6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -118,7 +118,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
     assertThat(cluster.getNameNode()
                       .getNamesystem()
                       .getBlockManager()
-                      .getUnderReplicatedBlocksCount(),
+                      .getLowRedundancyBlocksCount(),
                is(0L));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 555c2fa..c556699 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -424,7 +424,9 @@ public class TestAddStripedBlocks {
         cluster.getDataNodes().get(3).getDatanodeId(), reports[0]);
     BlockManagerTestUtil.updateState(ns.getBlockManager());
     // the total number of corrupted block info is still 1
+    Assert.assertEquals(1, ns.getCorruptECBlockGroupsStat());
     Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
+    Assert.assertEquals(0, ns.getCorruptBlocksStat());
     // 2 internal blocks corrupted
     Assert.assertEquals(2, bm.getCorruptReplicas(stored).size());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index 3cf025c..11d7431 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -410,7 +410,7 @@ public class TestDecommissioningStatus {
 
     // All nodes are dead and decommed. Blocks should be missing.
     long  missingBlocks = bm.getMissingBlocksCount();
-    long underreplicated = bm.getUnderReplicatedBlocksCount();
+    long underreplicated = bm.getLowRedundancyBlocksCount();
     assertTrue(missingBlocks > 0);
     assertTrue(underreplicated > 0);
 
@@ -440,7 +440,7 @@ public class TestDecommissioningStatus {
 
     // Blocks should be still be under-replicated
     Thread.sleep(2000);  // Let replication monitor run
-    assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
+    assertEquals(underreplicated, bm.getLowRedundancyBlocksCount());
 
     // Start up a node.
     LOG.info("Starting two more nodes");
@@ -448,13 +448,13 @@ public class TestDecommissioningStatus {
     cluster.waitActive();
     // Replication should fix it.
     int count = 0;
-    while((bm.getUnderReplicatedBlocksCount() > 0 ||
+    while((bm.getLowRedundancyBlocksCount() > 0 ||
         bm.getPendingReconstructionBlocksCount() > 0) &&
         count++ < 10) {
       Thread.sleep(1000);
     }
 
-    assertEquals(0, bm.getUnderReplicatedBlocksCount());
+    assertEquals(0, bm.getLowRedundancyBlocksCount());
     assertEquals(0, bm.getPendingReconstructionBlocksCount());
     assertEquals(0, bm.getMissingBlocksCount());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
index ed9ed3a..32c2a49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -774,17 +775,24 @@ public class TestNameNodeMXBean {
       }
 
       MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-      ObjectName mxbeanName = new ObjectName(
+      ObjectName replStateMBeanName = new ObjectName(
+          "Hadoop:service=NameNode,name=ReplicatedBlocksState");
+      ObjectName ecBlkGrpStateMBeanName = new ObjectName(
+          "Hadoop:service=NameNode,name=ECBlockGroupsState");
+      ObjectName namenodeMXBeanName = new ObjectName(
           "Hadoop:service=NameNode,name=NameNodeInfo");
 
       // Wait for the metrics to discover the unrecoverable block group
+      long expectedMissingBlockCount = 1L;
+      long expectedCorruptBlockCount = 1L;
       GenericTestUtils.waitFor(new Supplier<Boolean>() {
         @Override
         public Boolean get() {
           try {
             Long numMissingBlocks =
-                (Long) mbs.getAttribute(mxbeanName, "NumberOfMissingBlocks");
-            if (numMissingBlocks == 1L) {
+                (Long) mbs.getAttribute(namenodeMXBeanName,
+                    "NumberOfMissingBlocks");
+            if (numMissingBlocks == expectedMissingBlockCount) {
               return true;
             }
           } catch (Exception e) {
@@ -794,7 +802,43 @@ public class TestNameNodeMXBean {
         }
       }, 1000, 60000);
 
-      String corruptFiles = (String) (mbs.getAttribute(mxbeanName,
+      BlockManagerTestUtil.updateState(
+          cluster.getNamesystem().getBlockManager());
+
+      // Verification of missing blocks
+      long totalMissingBlocks = cluster.getNamesystem().getMissingBlocksCount();
+      Long replicaMissingBlocks =
+          (Long) mbs.getAttribute(replStateMBeanName,
+              "MissingBlocksStat");
+      Long ecMissingBlocks =
+          (Long) mbs.getAttribute(ecBlkGrpStateMBeanName,
+              "MissingECBlockGroupsStat");
+      assertEquals("Unexpected total missing blocks!",
+          expectedMissingBlockCount, totalMissingBlocks);
+      assertEquals("Unexpected total missing blocks!",
+          totalMissingBlocks,
+          (replicaMissingBlocks + ecMissingBlocks));
+      assertEquals("Unexpected total ec missing blocks!",
+          expectedMissingBlockCount, ecMissingBlocks.longValue());
+
+      // Verification of corrupt blocks
+      long totalCorruptBlocks =
+          cluster.getNamesystem().getCorruptReplicaBlocks();
+      Long replicaCorruptBlocks =
+          (Long) mbs.getAttribute(replStateMBeanName,
+              "CorruptBlocksStat");
+      Long ecCorruptBlocks =
+          (Long) mbs.getAttribute(ecBlkGrpStateMBeanName,
+              "CorruptECBlockGroupsStat");
+      assertEquals("Unexpected total corrupt blocks!",
+          expectedCorruptBlockCount, totalCorruptBlocks);
+      assertEquals("Unexpected total corrupt blocks!",
+          totalCorruptBlocks,
+          (replicaCorruptBlocks + ecCorruptBlocks));
+      assertEquals("Unexpected total ec corrupt blocks!",
+          expectedCorruptBlockCount, ecCorruptBlocks.longValue());
+
+      String corruptFiles = (String) (mbs.getAttribute(namenodeMXBeanName,
           "CorruptFiles"));
       int numCorruptFiles = ((Object[]) JSON.parse(corruptFiles)).length;
       assertEquals(1, numCorruptFiles);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
index 34fec5b..540ae63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -50,6 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -157,6 +159,8 @@ public class TestReconstructStripedBlocks {
         assertEquals(numBlocks, missedNode.numBlocks());
         bm.getDatanodeManager().removeDatanode(missedNode);
       }
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
 
       BlockManagerTestUtil.getComputedDatanodeWork(bm);
 
@@ -185,6 +189,8 @@ public class TestReconstructStripedBlocks {
               info.getSourceDnInfos().length);
         }
       }
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
     } finally {
       cluster.shutdown();
     }
@@ -212,6 +218,8 @@ public class TestReconstructStripedBlocks {
       final byte[] data = new byte[fileLen];
       DFSTestUtil.writeFile(fs, p, data);
       DFSTestUtil.waitForReplication(fs, p, groupSize, 5000);
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, cluster);
 
       LocatedStripedBlock lb = (LocatedStripedBlock)fs.getClient()
           .getLocatedBlocks(p.toString(), 0).get(0);
@@ -219,16 +227,20 @@ public class TestReconstructStripedBlocks {
           cellSize, dataBlocks, parityBlocks);
 
       BlockManagerTestUtil.getComputedDatanodeWork(bm);
+      BlockManagerTestUtil.updateState(bm);
       assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster));
       assertEquals(0, bm.getPendingReconstructionBlocksCount());
+      DFSTestUtil.verifyClientStats(conf, cluster);
 
       // missing 1 block, so 1 task should be scheduled
       DatanodeInfo dn0 = lbs[0].getLocations()[0];
       cluster.stopDataNode(dn0.getName());
       cluster.setDataNodeDead(dn0);
       BlockManagerTestUtil.getComputedDatanodeWork(bm);
+      BlockManagerTestUtil.updateState(bm);
       assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
       assertEquals(1, bm.getPendingReconstructionBlocksCount());
+      DFSTestUtil.verifyClientStats(conf, cluster);
 
       // missing another block, but no new task should be scheduled because
       // previous task isn't finished.
@@ -236,8 +248,10 @@ public class TestReconstructStripedBlocks {
       cluster.stopDataNode(dn1.getName());
       cluster.setDataNodeDead(dn1);
       BlockManagerTestUtil.getComputedDatanodeWork(bm);
+      BlockManagerTestUtil.updateState(bm);
       assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
       assertEquals(1, bm.getPendingReconstructionBlocksCount());
+      DFSTestUtil.verifyClientStats(conf, cluster);
     } finally {
       cluster.shutdown();
     }
@@ -294,6 +308,7 @@ public class TestReconstructStripedBlocks {
       // bring the dn back: 10 internal blocks now
       cluster.restartDataNode(dnProp);
       cluster.waitActive();
+      DFSTestUtil.verifyClientStats(conf, cluster);
 
       // stop another dn: 9 internal blocks, but only cover 8 real one
       dnToStop = block.getLocations()[1];
@@ -352,4 +367,72 @@ public class TestReconstructStripedBlocks {
       cluster.shutdown();
     }
   }
+
+  @Test(timeout=120000) // 1 min timeout
+  public void testReconstructionWork() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1000);
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+        5);
+
+    ErasureCodingPolicy policy =  SystemErasureCodingPolicies.getByID(
+        SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
+    conf.setStrings(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+        policy.getName());
+    Path ecDir = new Path("/ec");
+    Path ecFilePath = new Path(ecDir, "ec-file");
+    int blockGroups = 2;
+    int totalDataNodes = policy.getNumDataUnits() +
+        policy.getNumParityUnits() + 1;
+
+    MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        totalDataNodes).build();
+    try {
+      // create an EC file with 2 block groups
+      final DistributedFileSystem fs = dfsCluster.getFileSystem();
+      fs.mkdirs(ecDir);
+      fs.setErasureCodingPolicy(ecDir, policy.getName());
+      DFSTestUtil.createStripedFile(dfsCluster, ecFilePath, ecDir,
+          blockGroups, 2, false, policy);
+
+      final BlockManager bm = dfsCluster.getNamesystem().getBlockManager();
+      LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations(
+          ecFilePath.toString(), 0, blockGroups);
+      assert lbs.get(0) instanceof LocatedStripedBlock;
+      LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+
+      Iterator<DatanodeStorageInfo> storageInfos =
+          bm.getStorages(bg.getBlock().getLocalBlock()).iterator();
+      DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
+
+      BlockManagerTestUtil.updateState(bm);
+      DFSTestUtil.verifyClientStats(conf, dfsCluster);
+
+      // Remove one of the DataUnit nodes
+      bm.getDatanodeManager().removeDatanode(firstDn);
+
+      // Verify low redundancy count matching EC block groups count
+      BlockManagerTestUtil.updateState(bm);
+      assertEquals(blockGroups, bm.getLowRedundancyECBlockGroupsStat());
+      DFSTestUtil.verifyClientStats(conf, dfsCluster);
+
+
+      // Trigger block group reconstruction
+      BlockManagerTestUtil.getComputedDatanodeWork(bm);
+      BlockManagerTestUtil.updateState(bm);
+
+      // Verify pending reconstruction count
+      assertEquals(blockGroups, getNumberOfBlocksToBeErasureCoded(dfsCluster));
+      assertEquals(0, bm.getLowRedundancyECBlockGroupsStat());
+      DFSTestUtil.verifyClientStats(conf, dfsCluster);
+    } finally {
+      dfsCluster.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index 4ad742e..c84f8e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystemTestWrapper;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 
@@ -57,8 +58,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -90,17 +95,23 @@ public class TestNameNodeMetrics {
     new Path("/testNameNodeMetrics");
   private static final String NN_METRICS = "NameNodeActivity";
   private static final String NS_METRICS = "FSNamesystem";
+  private static final int BLOCK_SIZE = 1024 * 1024;
+  private static final ErasureCodingPolicy EC_POLICY =
+      SystemErasureCodingPolicies.getByID(
+          SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
+
   public static final Log LOG = LogFactory.getLog(TestNameNodeMetrics.class);
   
   // Number of datanodes in the cluster
-  private static final int DATANODE_COUNT = 3; 
+  private static final int DATANODE_COUNT = EC_POLICY.getNumDataUnits() +
+      EC_POLICY.getNumParityUnits() + 1;
   private static final int WAIT_GAUGE_VALUE_RETRIES = 20;
   
   // Rollover interval of percentile metrics (in seconds)
   private static final int PERCENTILES_INTERVAL = 1;
 
   static {
-    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
+    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
     CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_REDUNDANCY_INTERVAL);
@@ -109,7 +120,11 @@ public class TestNameNodeMetrics {
     CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, 
         "" + PERCENTILES_INTERVAL);
     // Enable stale DataNodes checking
-    CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    CONF.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    // Enable erasure coding
+    CONF.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+        EC_POLICY.getName());
     GenericTestUtils.setLogLevel(LogFactory.getLog(MetricsAsserts.class),
         Level.DEBUG);
   }
@@ -119,18 +134,23 @@ public class TestNameNodeMetrics {
   private final Random rand = new Random();
   private FSNamesystem namesystem;
   private BlockManager bm;
+  private Path ecDir;
 
   private static Path getTestPath(String fileName) {
     return new Path(TEST_ROOT_DIR_PATH, fileName);
   }
-  
+
   @Before
   public void setUp() throws Exception {
-    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
+    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT)
+        .build();
     cluster.waitActive();
     namesystem = cluster.getNamesystem();
     bm = namesystem.getBlockManager();
     fs = cluster.getFileSystem();
+    ecDir = getTestPath("/ec");
+    fs.mkdirs(ecDir);
+    fs.setErasureCodingPolicy(ecDir, EC_POLICY.getName());
   }
   
   @After
@@ -219,49 +239,125 @@ public class TestNameNodeMetrics {
   /** Test metrics associated with addition of a file */
   @Test
   public void testFileAdd() throws Exception {
-    // Add files with 100 blocks
-    final Path file = getTestPath("testFileAdd");
-    createFile(file, 3200, (short)3);
+    // File creations
     final long blockCount = 32;
+    final Path normalFile = getTestPath("testFileAdd");
+    createFile(normalFile, blockCount * BLOCK_SIZE, (short)3);
+    final Path ecFile = new Path(ecDir, "ecFile.log");
+    DFSTestUtil.createStripedFile(cluster, ecFile, null, (int) blockCount, 1,
+        false, EC_POLICY);
+
     int blockCapacity = namesystem.getBlockCapacity();
     assertGauge("BlockCapacity", blockCapacity, getMetrics(NS_METRICS));
 
     MetricsRecordBuilder rb = getMetrics(NN_METRICS);
-    // File create operations is 1
-    // Number of files created is depth of <code>file</code> path
-    assertCounter("CreateFileOps", 1L, rb);
-    assertCounter("FilesCreated", (long)file.depth(), rb);
-
-    long filesTotal = file.depth() + 1; // Add 1 for root
+    // File create operations are 2
+    assertCounter("CreateFileOps", 2L, rb);
+    // Number of files created is depth of normalFile and ecFile, after
+    // removing the duplicate accounting for root test dir.
+    assertCounter("FilesCreated",
+        (long)(normalFile.depth() + ecFile.depth()), rb);
+
+    long filesTotal = normalFile.depth() + ecFile.depth() + 1 /* ecDir */;
     rb = getMetrics(NS_METRICS);
     assertGauge("FilesTotal", filesTotal, rb);
-    assertGauge("BlocksTotal", blockCount, rb);
-    fs.delete(file, true);
+    assertGauge("BlocksTotal", blockCount * 2, rb);
+    fs.delete(normalFile, true);
     filesTotal--; // reduce the filecount for deleted file
 
     rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal);
+    assertGauge("BlocksTotal", blockCount, rb);
+    assertGauge("PendingDeletionBlocks", 0L, rb);
+
+    fs.delete(ecFile, true);
+    filesTotal--;
+    rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal);
     assertGauge("BlocksTotal", 0L, rb);
     assertGauge("PendingDeletionBlocks", 0L, rb);
 
     rb = getMetrics(NN_METRICS);
     // Delete file operations and number of files deleted must be 1
-    assertCounter("DeleteFileOps", 1L, rb);
-    assertCounter("FilesDeleted", 1L, rb);
+    assertCounter("DeleteFileOps", 2L, rb);
+    assertCounter("FilesDeleted", 2L, rb);
   }
-  
+
+  /**
+   * Verify low redundancy and corrupt blocks metrics are zero.
+   * @throws Exception
+   */
+  private void verifyZeroMetrics() throws Exception {
+    BlockManagerTestUtil.updateState(bm);
+    MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS,
+        "CorruptBlocks", 0L, 500);
+
+    // Verify aggregated blocks metrics
+    assertGauge("UnderReplicatedBlocks", 0L, rb); // Deprecated metric
+    assertGauge("LowRedundancyBlocks", 0L, rb);
+    assertGauge("PendingReplicationBlocks", 0L, rb); // Deprecated metric
+    assertGauge("PendingReconstructionBlocks", 0L, rb);
+
+    // Verify replica metrics
+    assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+    assertGauge("CorruptReplicatedBlocks", 0L, rb);
+
+    // Verify striped block groups metrics
+    assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+    assertGauge("CorruptECBlockGroups", 0L, rb);
+  }
+
+  /**
+   * Verify aggregated metrics equals the sum of replicated blocks metrics
+   * and erasure coded blocks metrics.
+   * @throws Exception
+   */
+  private void verifyAggregatedMetricsTally() throws Exception {
+    BlockManagerTestUtil.updateState(bm);
+    assertEquals("Under replicated metrics not matching!",
+        namesystem.getLowRedundancyBlocks(),
+        namesystem.getUnderReplicatedBlocks());
+    assertEquals("Low redundancy metrics not matching!",
+        namesystem.getLowRedundancyBlocks(),
+        namesystem.getLowRedundancyBlocksStat() +
+            namesystem.getLowRedundancyECBlockGroupsStat());
+    assertEquals("Corrupt blocks metrics not matching!",
+        namesystem.getCorruptReplicaBlocks(),
+        namesystem.getCorruptBlocksStat() +
+            namesystem.getCorruptECBlockGroupsStat());
+    assertEquals("Missing blocks metrics not matching!",
+        namesystem.getMissingBlocksCount(),
+        namesystem.getMissingBlocksStat() +
+            namesystem.getMissingECBlockGroupsStat());
+    assertEquals("Missing blocks with replication factor one not matching!",
+        namesystem.getMissingReplOneBlocksCount(),
+        namesystem.getMissingReplicationOneBlocksStat());
+    assertEquals("Bytes in future blocks metrics not matching!",
+        namesystem.getBytesInFuture(),
+        namesystem.getBlocksBytesInFutureStat() +
+            namesystem.getECBlocksBytesInFutureStat());
+    assertEquals("Pending deletion blocks metrics not matching!",
+        namesystem.getPendingDeletionBlocks(),
+        namesystem.getPendingDeletionBlocksStat() +
+            namesystem.getPendingDeletionECBlockGroupsStat());
+  }
+
   /** Corrupt a block and ensure metrics reflects it */
   @Test
   public void testCorruptBlock() throws Exception {
     // Create a file with single block with two replicas
     final Path file = getTestPath("testCorruptBlock");
-    createFile(file, 100, (short)2);
-    
+    final short replicaCount = 2;
+    createFile(file, 100, replicaCount);
+    DFSTestUtil.waitForReplication(fs, file, replicaCount, 15000);
+
     // Disable the heartbeats, so that no corrupted replica
     // can be fixed
     for (DataNode dn : cluster.getDataNodes()) {
       DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
     }
-    
+
+    verifyZeroMetrics();
+    verifyAggregatedMetricsTally();
+
     // Corrupt first replica of the block
     LocatedBlock block = NameNodeAdapter.getBlockLocations(
         cluster.getNameNode(), file.toString(), 0, 1).get(0);
@@ -272,12 +368,50 @@ public class TestNameNodeMetrics {
     } finally {
       cluster.getNamesystem().writeUnlock();
     }
+
+    BlockManagerTestUtil.updateState(bm);
+    MetricsRecordBuilder  rb = waitForDnMetricValue(NS_METRICS,
+        "CorruptBlocks", 1L, 500);
+    // Verify aggregated blocks metrics
+    assertGauge("LowRedundancyBlocks", 1L, rb);
+    assertGauge("PendingReplicationBlocks", 0L, rb);
+    assertGauge("PendingReconstructionBlocks", 0L, rb);
+    // Verify replicated blocks metrics
+    assertGauge("LowRedundancyReplicatedBlocks", 1L, rb);
+    assertGauge("CorruptReplicatedBlocks", 1L, rb);
+    // Verify striped blocks metrics
+    assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+    assertGauge("CorruptECBlockGroups", 0L, rb);
+
+    verifyAggregatedMetricsTally();
+
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    }
+
+    // Start block reconstruction work
     BlockManagerTestUtil.getComputedDatanodeWork(bm);
-    MetricsRecordBuilder rb = getMetrics(NS_METRICS);
-    assertGauge("CorruptBlocks", 1L, rb);
-    assertGauge("PendingReplicationBlocks", 1L, rb);
-    
+
+    BlockManagerTestUtil.updateState(bm);
+    DFSTestUtil.waitForReplication(fs, file, replicaCount, 30000);
+    rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+
+    // Verify aggregated blocks metrics
+    assertGauge("LowRedundancyBlocks", 0L, rb);
+    assertGauge("CorruptBlocks", 0L, rb);
+    assertGauge("PendingReplicationBlocks", 0L, rb);
+    assertGauge("PendingReconstructionBlocks", 0L, rb);
+    // Verify replicated blocks metrics
+    assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+    assertGauge("CorruptReplicatedBlocks", 0L, rb);
+    // Verify striped blocks metrics
+    assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+    assertGauge("CorruptECBlockGroups", 0L, rb);
+
+    verifyAggregatedMetricsTally();
+
     fs.delete(file, true);
+    BlockManagerTestUtil.getComputedDatanodeWork(bm);
     // During the file deletion, both BlockManager#corruptReplicas and
     // BlockManager#pendingReplications will be updated, i.e., the records
     // for the blocks of the deleted file will be removed from both
@@ -287,11 +421,97 @@ public class TestNameNodeMetrics {
     // BlockManager#updateState is called. And in
     // BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks
     // will also be updated.
-    rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L);
+    BlockManagerTestUtil.updateState(bm);
+    waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+    verifyZeroMetrics();
+    verifyAggregatedMetricsTally();
+  }
+
+  @Test (timeout = 90000L)
+  public void testStripedFileCorruptBlocks() throws Exception {
+    final long fileLen = BLOCK_SIZE * 4;
+    final Path ecFile = new Path(ecDir, "ecFile.log");
+    DFSTestUtil.createFile(fs, ecFile, fileLen, (short) 1, 0L);
+    StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
+
+    // Disable the heartbeats, so that no corrupted replica
+    // can be fixed
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+    }
+
+    verifyZeroMetrics();
+    verifyAggregatedMetricsTally();
+
+    // Corrupt first replica of the block
+    LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations(
+        ecFile.toString(), 0, fileLen);
+    assert lbs.get(0) instanceof LocatedStripedBlock;
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+
+    cluster.getNamesystem().writeLock();
+    try {
+      bm.findAndMarkBlockAsCorrupt(bg.getBlock(), bg.getLocations()[0],
+          "STORAGE_ID", "TEST");
+    } finally {
+      cluster.getNamesystem().writeUnlock();
+    }
+
+    BlockManagerTestUtil.updateState(bm);
+    MetricsRecordBuilder  rb = waitForDnMetricValue(NS_METRICS,
+        "CorruptBlocks", 1L, 500);
+    // Verify aggregated blocks metrics
+    assertGauge("LowRedundancyBlocks", 1L, rb);
     assertGauge("PendingReplicationBlocks", 0L, rb);
-    assertGauge("ScheduledReplicationBlocks", 0L, rb);
+    assertGauge("PendingReconstructionBlocks", 0L, rb);
+    // Verify replica metrics
+    assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+    assertGauge("CorruptReplicatedBlocks", 0L, rb);
+    // Verify striped block groups metrics
+    assertGauge("LowRedundancyECBlockGroups", 1L, rb);
+    assertGauge("CorruptECBlockGroups", 1L, rb);
+
+    verifyAggregatedMetricsTally();
+
+    for (DataNode dn : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    }
+
+    // Start block reconstruction work
+    BlockManagerTestUtil.getComputedDatanodeWork(bm);
+    BlockManagerTestUtil.updateState(bm);
+    StripedFileTestUtil.waitForReconstructionFinished(ecFile, fs, 3);
+
+    rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+    assertGauge("CorruptBlocks", 0L, rb);
+    assertGauge("PendingReplicationBlocks", 0L, rb);
+    assertGauge("PendingReconstructionBlocks", 0L, rb);
+    // Verify replicated blocks metrics
+    assertGauge("LowRedundancyReplicatedBlocks", 0L, rb);
+    assertGauge("CorruptReplicatedBlocks", 0L, rb);
+    // Verify striped blocks metrics
+    assertGauge("LowRedundancyECBlockGroups", 0L, rb);
+    assertGauge("CorruptECBlockGroups", 0L, rb);
+
+    verifyAggregatedMetricsTally();
+
+    fs.delete(ecFile, true);
+    BlockManagerTestUtil.getComputedDatanodeWork(bm);
+    // During the file deletion, both BlockManager#corruptReplicas and
+    // BlockManager#pendingReplications will be updated, i.e., the records
+    // for the blocks of the deleted file will be removed from both
+    // corruptReplicas and pendingReplications. The corresponding
+    // metrics (CorruptBlocks and PendingReplicationBlocks) will only be updated
+    // when BlockManager#computeDatanodeWork is run where the
+    // BlockManager#updateState is called. And in
+    // BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks
+    // will also be updated.
+    BlockManagerTestUtil.updateState(bm);
+    waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500);
+    verifyZeroMetrics();
+    verifyAggregatedMetricsTally();
   }
-  
+
   /** Create excess blocks by reducing the replication factor for
    * for a file and ensure metrics reflects it
    */
@@ -340,7 +560,7 @@ public class TestNameNodeMetrics {
   private void waitForDeletion() throws InterruptedException {
     // Wait for more than DATANODE_COUNT replication intervals to ensure all
     // the blocks pending deletion are sent for deletion to the datanodes.
-    Thread.sleep(DFS_REDUNDANCY_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+    Thread.sleep(DFS_REDUNDANCY_INTERVAL * DATANODE_COUNT * 1000);
   }
 
   /**
@@ -358,20 +578,25 @@ public class TestNameNodeMetrics {
    * @throws Exception if something went wrong.
    */
   private MetricsRecordBuilder waitForDnMetricValue(String source,
-                                                    String name,
-                                                    long expected)
-      throws Exception {
+      String name, long expected) throws Exception {
+    // initial wait
+    waitForDeletion();
+    return waitForDnMetricValue(source, name, expected,
+        DFS_REDUNDANCY_INTERVAL * 500);
+  }
+
+  private MetricsRecordBuilder waitForDnMetricValue(String source,
+      String name, long expected, long sleepInterval) throws Exception {
     MetricsRecordBuilder rb;
     long gauge;
-    //initial wait.
-    waitForDeletion();
-    //lots of retries are allowed for slow systems; fast ones will still
-    //exit early
-    int retries = (DATANODE_COUNT + 1) * WAIT_GAUGE_VALUE_RETRIES;
+    // Lots of retries are allowed for slow systems.
+    // Fast ones will still exit early.
+    int retries = DATANODE_COUNT * WAIT_GAUGE_VALUE_RETRIES;
     rb = getMetrics(source);
     gauge = MetricsAsserts.getLongGauge(name, rb);
     while (gauge != expected && (--retries > 0)) {
-      Thread.sleep(DFS_REDUNDANCY_INTERVAL * 500);
+      Thread.sleep(sleepInterval);
+      BlockManagerTestUtil.updateState(bm);
       rb = getMetrics(source);
       gauge = MetricsAsserts.getLongGauge(name, rb);
     }
@@ -516,22 +741,22 @@ public class TestNameNodeMetrics {
         getMetrics(NS_METRICS));
     
     assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
-    assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS));
-    assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
-    assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
+    assertGauge("LastWrittenTransactionId", 3L, getMetrics(NS_METRICS));
+    assertGauge("TransactionsSinceLastCheckpoint", 3L, getMetrics(NS_METRICS));
+    assertGauge("TransactionsSinceLastLogRoll", 3L, getMetrics(NS_METRICS));
     
     fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp"));
     
     assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
-    assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS));
-    assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS));
-    assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS));
+    assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
+    assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
+    assertGauge("TransactionsSinceLastLogRoll", 4L, getMetrics(NS_METRICS));
     
     cluster.getNameNodeRpc().rollEditLog();
     
     assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS));
-    assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS));
-    assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
+    assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
+    assertGauge("TransactionsSinceLastCheckpoint", 6L, getMetrics(NS_METRICS));
     assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
     
     cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
@@ -541,7 +766,7 @@ public class TestNameNodeMetrics {
     long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
         getMetrics(NS_METRICS));
     assertTrue(lastCkptTime < newLastCkptTime);
-    assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS));
+    assertGauge("LastWrittenTransactionId", 8L, getMetrics(NS_METRICS));
     assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
     assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
   }
@@ -554,10 +779,10 @@ public class TestNameNodeMetrics {
   public void testSyncAndBlockReportMetric() throws Exception {
     MetricsRecordBuilder rb = getMetrics(NN_METRICS);
     // We have one sync when the cluster starts up, just opening the journal
-    assertCounter("SyncsNumOps", 1L, rb);
+    assertCounter("SyncsNumOps", 3L, rb);
     // Each datanode reports in when the cluster comes up
     assertCounter("BlockReportNumOps",
-                  (long)DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
+                  (long) DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb);
     
     // Sleep for an interval+slop to let the percentiles rollover
     Thread.sleep((PERCENTILES_INTERVAL+1)*1000);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message