hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [24/50] [abbrv] hadoop git commit: HDFS-8005. Erasure Coding: simplify striped block recovery work computation and add tests. Contributed by Jing Zhao.
Date Mon, 13 Apr 2015 20:11:00 GMT
HDFS-8005. Erasure Coding: simplify striped block recovery work computation and add tests.
Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e05166c6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e05166c6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e05166c6

Branch: refs/heads/HDFS-7285
Commit: e05166c6be6cd89b0260fb33f9680f64ff00e0ac
Parents: f10ef25
Author: Jing Zhao <jing9@apache.org>
Authored: Mon Mar 30 13:35:36 2015 -0700
Committer: Zhe Zhang <zhz@apache.org>
Committed: Mon Apr 13 13:09:54 2015 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 134 +++++-------
 .../blockmanagement/DatanodeDescriptor.java     |  14 +-
 .../hadoop/hdfs/server/namenode/INodeFile.java  |   1 +
 .../blockmanagement/TestBlockManager.java       |  33 +--
 .../TestRecoverStripedBlocks.java               | 107 ----------
 .../server/namenode/TestAddStripedBlocks.java   |   2 +-
 .../namenode/TestRecoverStripedBlocks.java      | 210 +++++++++++++++++++
 7 files changed, 290 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e05166c6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 47865ec..78ada82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -539,7 +539,7 @@ public class BlockManager {
     // source node returned is not used
     chooseSourceDatanodes(getStoredBlock(block), containingNodes,
         containingLiveReplicasNodes, numReplicas,
-        new LinkedList<Short>(), 1, UnderReplicatedBlocks.LEVEL);
+        new LinkedList<Short>(), UnderReplicatedBlocks.LEVEL);
     
     // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are 
     // not included in the numReplicas.liveReplicas() count
@@ -1376,7 +1376,7 @@ public class BlockManager {
   int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
-    BlockCollection bc = null;
+    BlockCollection bc;
     int additionalReplRequired;
 
     int scheduledWork = 0;
@@ -1404,13 +1404,10 @@ public class BlockManager {
             containingNodes = new ArrayList<>();
             List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
             NumberReplicas numReplicas = new NumberReplicas();
-            List<Short> missingBlockIndices = new LinkedList<>();
-            DatanodeDescriptor[] srcNodes;
-            int numSourceNodes = bc.isStriped() ?
-                HdfsConstants.NUM_DATA_BLOCKS : 1;
-            srcNodes = chooseSourceDatanodes(
-                block, containingNodes, liveReplicaNodes, numReplicas,
-                missingBlockIndices, numSourceNodes, priority);
+            List<Short> liveBlockIndices = new ArrayList<>();
+            final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
+                containingNodes, liveReplicaNodes, numReplicas,
+                liveBlockIndices, priority);
             if(srcNodes == null || srcNodes.length == 0) {
               // block can not be replicated from any node
               LOG.debug("Block " + block + " cannot be recovered " +
@@ -1442,15 +1439,14 @@ public class BlockManager {
             } else {
               additionalReplRequired = 1; // Needed on a new rack
             }
-            if (bc.isStriped()) {
+            if (block.isStriped()) {
+              short[] indices = new short[liveBlockIndices.size()];
+              for (int i = 0 ; i < liveBlockIndices.size(); i++) {
+                indices[i] = liveBlockIndices.get(i);
+              }
               ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
                   containingNodes, liveReplicaNodes, additionalReplRequired,
-                  priority);
-              short[] missingBlockArray = new short[missingBlockIndices.size()];
-              for (int i = 0 ; i < missingBlockIndices.size(); i++) {
-                missingBlockArray[i] = missingBlockIndices.get(i);
-              }
-              ecw.setMissingBlockIndices(missingBlockArray);
+                  priority, indices);
               recovWork.add(ecw);
             } else {
               recovWork.add(new ReplicationWork(block, bc, srcNodes,
@@ -1530,15 +1526,14 @@ public class BlockManager {
           }
 
           // Add block to the to be replicated list
-          if (bc.isStriped()) {
+          if (block.isStriped()) {
             assert rw instanceof ErasureCodingWork;
             assert rw.targets.length > 0;
             rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
                 new ExtendedBlock(namesystem.getBlockPoolId(), block),
                 rw.srcNodes, rw.targets,
-                ((ErasureCodingWork)rw).getMissingBlockIndicies());
-          }
-          else {
+                ((ErasureCodingWork) rw).liveBlockIndicies);
+          } else {
             rw.srcNodes[0].addBlockToBeReplicated(block, targets);
           }
           scheduledWork++;
@@ -1568,9 +1563,9 @@ public class BlockManager {
         DatanodeStorageInfo[] targets = rw.targets;
         if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
-          for (int k = 0; k < targets.length; k++) {
+          for (DatanodeStorageInfo target : targets) {
             targetList.append(' ');
-            targetList.append(targets[k].getDatanodeDescriptor());
+            targetList.append(target.getDatanodeDescriptor());
           }
           blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
               rw.block, targetList);
@@ -1681,11 +1676,8 @@ public class BlockManager {
    * @param numReplicas NumberReplicas instance to be initialized with the
    *                    counts of live, corrupt, excess, and decommissioned
    *                    replicas of the given block.
-   * @param missingBlockIndices List to be populated with indices of missing
-   *                            blocks in a striped block group or missing
-   *                            replicas of a replicated block
-   * @param numSourceNodes integer specifying the number of source nodes to
-   *                       choose
+   * @param liveBlockIndices List to be populated with indices of healthy
+   *                         blocks in a striped block group
    * @param priority integer representing replication priority of the given
    *                 block
    * @return the array of DatanodeDescriptor of the chosen nodes from which to
@@ -1696,24 +1688,20 @@ public class BlockManager {
       List<DatanodeDescriptor> containingNodes,
       List<DatanodeStorageInfo> nodesContainingLiveReplicas,
       NumberReplicas numReplicas,
-      List<Short> missingBlockIndices, int numSourceNodes, int priority) {
+      List<Short> liveBlockIndices, int priority) {
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
-    LinkedList<DatanodeDescriptor> srcNodes = new LinkedList<>();
+    List<DatanodeDescriptor> srcNodes = new ArrayList<>();
     int live = 0;
     int decommissioned = 0;
     int decommissioning = 0;
     int corrupt = 0;
     int excess = 0;
-    missingBlockIndices.clear();
-    Set<Short> healthyIndices = new HashSet<>();
+    liveBlockIndices.clear();
+    final boolean isStriped = block.isStriped();
 
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
-      if (block.isStriped()) {
-        healthyIndices.add((short) ((BlockInfoStriped) block).
-            getStorageBlockIndex(storage));
-      }
+    for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       LightWeightLinkedSet<BlockInfo> excessBlocks =
         excessReplicateMap.get(node.getDatanodeUuid());
@@ -1752,27 +1740,19 @@ public class BlockManager {
       if(node.isDecommissioned())
         continue;
 
-      // We got this far, current node is a reasonable choice
-      if(srcNodes.size() < numSourceNodes) {
+      if(isStriped || srcNodes.isEmpty()) {
         srcNodes.add(node);
+        if (isStriped) {
+          liveBlockIndices.add((short) ((BlockInfoStriped) block).
+              getStorageBlockIndex(storage));
+        }
         continue;
       }
-      // switch to a different node randomly
+      // for replicated block, switch to a different node randomly
       // this to prevent from deterministically selecting the same node even
       // if the node failed to replicate the block on previous iterations
-      if(DFSUtil.getRandom().nextBoolean()) {
-        int pos = DFSUtil.getRandom().nextInt(numSourceNodes);
-        if(!srcNodes.get(pos).isDecommissionInProgress()) {
-          srcNodes.set(pos, node);
-        }
-      }
-    }
-    if (block.isStriped()) {
-      for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS +
-          HdfsConstants.NUM_PARITY_BLOCKS; i++) {
-        if (!healthyIndices.contains(i)) {
-          missingBlockIndices.add(i);
-        }
+      if (!isStriped && DFSUtil.getRandom().nextBoolean()) {
+        srcNodes.set(0, node);
       }
     }
     if(numReplicas != null)
@@ -3833,25 +3813,25 @@ public class BlockManager {
    * to represent a task to recover a block through replication or erasure
    * coding. Recovery is done by transferring data from srcNodes to targets
    */
-  private static class BlockRecoveryWork {
-    protected final BlockInfo block;
-    protected final BlockCollection bc;
+  private abstract static class BlockRecoveryWork {
+    final BlockInfo block;
+    final BlockCollection bc;
 
     /**
      * An erasure coding recovery task has multiple source nodes.
      * A replication task only has 1 source node, stored on top of the array
      */
-    protected final DatanodeDescriptor[] srcNodes;
+    final DatanodeDescriptor[] srcNodes;
     /** Nodes containing the block; avoid them in choosing new targets */
-    protected final List<DatanodeDescriptor> containingNodes;
+    final List<DatanodeDescriptor> containingNodes;
     /** Required by {@link BlockPlacementPolicy#chooseTarget} */
-    protected final List<DatanodeStorageInfo> liveReplicaStorages;
-    protected final int additionalReplRequired;
+    final List<DatanodeStorageInfo> liveReplicaStorages;
+    final int additionalReplRequired;
 
-    protected DatanodeStorageInfo[] targets;
-    protected final int priority;
+    DatanodeStorageInfo[] targets;
+    final int priority;
 
-    public BlockRecoveryWork(BlockInfo block,
+    BlockRecoveryWork(BlockInfo block,
         BlockCollection bc,
         DatanodeDescriptor[] srcNodes,
         List<DatanodeDescriptor> containingNodes,
@@ -3868,15 +3848,13 @@ public class BlockManager {
       this.targets = null;
     }
 
-    protected void chooseTargets(BlockPlacementPolicy blockplacement,
+    abstract void chooseTargets(BlockPlacementPolicy blockplacement,
         BlockStoragePolicySuite storagePolicySuite,
-        Set<Node> excludedNodes) {
-    }
+        Set<Node> excludedNodes);
   }
 
   private static class ReplicationWork extends BlockRecoveryWork {
-
-    public ReplicationWork(BlockInfo block,
+    ReplicationWork(BlockInfo block,
         BlockCollection bc,
         DatanodeDescriptor[] srcNodes,
         List<DatanodeDescriptor> containingNodes,
@@ -3888,7 +3866,8 @@ public class BlockManager {
       LOG.debug("Creating a ReplicationWork to recover " + block);
     }
 
-    protected void chooseTargets(BlockPlacementPolicy blockplacement,
+    @Override
+    void chooseTargets(BlockPlacementPolicy blockplacement,
         BlockStoragePolicySuite storagePolicySuite,
         Set<Node> excludedNodes) {
       assert srcNodes.length > 0
@@ -3905,30 +3884,23 @@ public class BlockManager {
   }
 
   private static class ErasureCodingWork extends BlockRecoveryWork {
+    final short[] liveBlockIndicies;
 
-    private short[] missingBlockIndicies = null;
-
-    public ErasureCodingWork(BlockInfo block,
+    ErasureCodingWork(BlockInfo block,
         BlockCollection bc,
         DatanodeDescriptor[] srcNodes,
         List<DatanodeDescriptor> containingNodes,
         List<DatanodeStorageInfo> liveReplicaStorages,
         int additionalReplRequired,
-        int priority) {
+        int priority, short[] liveBlockIndicies) {
       super(block, bc, srcNodes, containingNodes,
           liveReplicaStorages, additionalReplRequired, priority);
+      this.liveBlockIndicies = liveBlockIndicies;
       LOG.debug("Creating an ErasureCodingWork to recover " + block);
     }
 
-    public short[] getMissingBlockIndicies() {
-      return missingBlockIndicies;
-    }
-
-    public void setMissingBlockIndices(short[] missingBlockIndicies) {
-      this.missingBlockIndicies = missingBlockIndicies;
-    }
-
-    protected void chooseTargets(BlockPlacementPolicy blockplacement,
+    @Override
+    void chooseTargets(BlockPlacementPolicy blockplacement,
         BlockStoragePolicySuite storagePolicySuite,
         Set<Node> excludedNodes) {
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e05166c6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 7bc5e7e..15427f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -106,14 +106,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
     public final ExtendedBlock block;
     public final DatanodeDescriptor[] sources;
     public final DatanodeStorageInfo[] targets;
-    public final short[] missingBlockIndices;
+    public final short[] liveBlockIndices;
 
     BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
-        DatanodeStorageInfo[] targets, short[] missingBlockIndices) {
+        DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
       this.block = block;
       this.sources = sources;
       this.targets = targets;
-      this.missingBlockIndices = missingBlockIndices;
+      this.liveBlockIndices = liveBlockIndices;
     }
 
     @Override
@@ -122,6 +122,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
           append("Recovering ").append(block).
           append(" From: ").append(Arrays.asList(sources)).
           append(" To: ").append(Arrays.asList(targets)).append(")\n").
+          append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)).
           toString();
     }
   }
@@ -635,10 +636,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * Store block erasure coding work.
    */
   void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
-      DatanodeStorageInfo[] targets, short[] missingBlockIndicies) {
+      DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
     assert(block != null && sources != null && sources.length > 0);
     BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
-        missingBlockIndicies);
+        liveBlockIndices);
     erasurecodeBlocks.offer(task);
     BlockManager.LOG.debug("Adding block recovery task " + task +
         "to " + getName() + ", current queue size is " +
@@ -679,7 +680,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /**
    * The number of work items that are pending to be replicated
    */
-  int getNumberOfBlocksToBeErasureCoded() {
+  @VisibleForTesting
+  public int getNumberOfBlocksToBeErasureCoded() {
     return erasurecodeBlocks.size();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e05166c6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index b40022e8..2aa44cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -402,6 +402,7 @@ public class INodeFile extends INodeWithAdditionalFields
 
   /** The same as getFileReplication(null). */
   @Override // INodeFileAttributes
+  // TODO striped
   public final short getFileReplication() {
     return getFileReplication(CURRENT_STATE_ID);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e05166c6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 43f4607..f7504ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -508,30 +508,33 @@ public class TestBlockManager {
             cntNodes,
             liveNodes,
             new NumberReplicas(),
-            new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
+            new ArrayList<Short>(),
+            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
 
-    assertNull("Does not choose a source node for a less-than-highest-priority"
-        + " replication since all available source nodes have reached"
-        + " their replication limits.",
+    assertEquals("Does not choose a source node for a less-than-highest-priority"
+            + " replication since all available source nodes have reached"
+            + " their replication limits.", 0,
         bm.chooseSourceDatanodes(
             bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
-            new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]);
+            new ArrayList<Short>(),
+            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
 
     // Increase the replication count to test replication count > hard limit
     DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
     origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
 
-    assertNull("Does not choose a source node for a highest-priority"
-        + " replication when all available nodes exceed the hard limit.",
+    assertEquals("Does not choose a source node for a highest-priority"
+            + " replication when all available nodes exceed the hard limit.", 0,
         bm.chooseSourceDatanodes(
             bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
-            new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
+            new ArrayList<Short>(),
+            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
   }
 
   @Test
@@ -556,26 +559,24 @@ public class TestBlockManager {
             bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
-            new NumberReplicas(), new LinkedList<Short>(), 1,
-            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
+            new NumberReplicas(), new LinkedList<Short>(),
+            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
 
 
     // Increase the replication count to test replication count > hard limit
     DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
     origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
 
-    assertNull("Does not choose a source decommissioning node for a normal"
-        + " replication when all available nodes exceed the hard limit.",
+    assertEquals("Does not choose a source decommissioning node for a normal"
+        + " replication when all available nodes exceed the hard limit.", 0,
         bm.chooseSourceDatanodes(
             bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
-            new NumberReplicas(), new LinkedList<Short>(), 1,
-            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
+            new NumberReplicas(), new LinkedList<Short>(),
+            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
   }
 
-
-
   @Test
   public void testSafeModeIBR() throws Exception {
     DatanodeDescriptor node = spy(nodes.get(0));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e05166c6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
deleted file mode 100644
index d883c9b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
-import static org.junit.Assert.assertTrue;
-
-public class TestRecoverStripedBlocks {
-  private final short GROUP_SIZE =
-      HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
-  private final short NUM_OF_DATANODES = GROUP_SIZE + 1;
-  private Configuration conf;
-  private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
-  private static final int BLOCK_SIZE = 1024;
-  private HdfsAdmin dfsAdmin;
-  private FSNamesystem namesystem;
-  private Path ECFilePath;
-
-  @Before
-  public void setupCluster() throws IOException {
-    conf = new HdfsConfiguration();
-    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    // Large value to make sure the pending replication request can stay in
-    // DatanodeDescriptor.replicateBlocks before test timeout.
-    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
-    // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
-    // chooseUnderReplicatedBlocks at once.
-    conf.setInt(
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
-
-    cluster = new MiniDFSCluster.Builder(conf).
-        numDataNodes(NUM_OF_DATANODES).build();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
-    dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
-    namesystem = cluster.getNamesystem();
-    ECFilePath = new Path("/ecfile");
-    DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0);
-    dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME);
-  }
-
-  @Test
-  public void testMissingStripedBlock() throws Exception {
-    final BlockManager bm = cluster.getNamesystem().getBlockManager();
-    ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath);
-    Iterator<DatanodeStorageInfo> storageInfos =
-        bm.blocksMap.getStorages(b.getLocalBlock())
-            .iterator();
-
-    DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
-    Iterator<BlockInfo> it = firstDn.getBlockIterator();
-    int missingBlkCnt = 0;
-    while (it.hasNext()) {
-      BlockInfo blk = it.next();
-      BlockManager.LOG.debug("Block " + blk + " will be lost");
-      missingBlkCnt++;
-    }
-    BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks");
-
-    bm.getDatanodeManager().removeDatanode(firstDn);
-
-    bm.computeDatanodeWork();
-
-    short cnt = 0;
-    for (DataNode dn : cluster.getDataNodes()) {
-      DatanodeDescriptor dnDescriptor =
-          bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid());
-      cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded();
-    }
-
-    assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e05166c6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 7d7c81e..215a4e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -224,7 +224,7 @@ public class TestAddStripedBlocks {
       int i = 0;
       for (DataNode dn : cluster.getDataNodes()) {
         final Block block = new Block(lastBlock.getBlockId() + i++,
-            lastBlock.getGenerationStamp(), 0);
+            0, lastBlock.getGenerationStamp());
         DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
         storageIDs.add(storage.getStorageID());
         StorageReceivedDeletedBlocks[] reports = DFSTestUtil

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e05166c6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
new file mode 100644
index 0000000..b9fd4fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestRecoverStripedBlocks {
+  private final short GROUP_SIZE =
+      NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+  private MiniDFSCluster cluster;
+  private final Path dirPath = new Path("/dir");
+  private Path filePath = new Path(dirPath, "file");
+
+  @Before
+  public void setup() throws IOException {
+    final Configuration conf = new HdfsConfiguration();
+    // Large value to make sure the pending replication request can stay in
+    // DatanodeDescriptor.replicateBlocks before test timeout.
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
+    // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
+    // chooseUnderReplicatedBlocks at once.
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
+        .build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
+      int numBlocks) throws Exception {
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    dfs.mkdirs(dir);
+    dfs.setStoragePolicy(dir, EC_STORAGE_POLICY_NAME);
+
+    FSDataOutputStream out = null;
+    try {
+      out = dfs.create(file, (short) 1); // create an empty file
+
+      FSNamesystem ns = cluster.getNamesystem();
+      FSDirectory fsdir = ns.getFSDirectory();
+      INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+      ExtendedBlock previous = null;
+      for (int i = 0; i < numBlocks; i++) {
+        Block newBlock = createBlock(cluster.getDataNodes(), ns,
+            file.toString(), fileNode, dfs.getClient().getClientName(),
+            previous);
+        previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+      }
+
+      ns.completeFile(file.toString(), dfs.getClient().getClientName(),
+          previous, fileNode.getId());
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+
+  static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
+      String file, INodeFile fileNode, String clientName,
+      ExtendedBlock previous) throws Exception {
+    ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
+        null);
+
+    final BlockInfo lastBlock = fileNode.getLastBlock();
+    final int groupSize = fileNode.getBlockReplication();
+    // 1. RECEIVING_BLOCK IBR
+    int i = 0;
+    for (DataNode dn : dataNodes) {
+      if (i < groupSize) {
+        final Block block = new Block(lastBlock.getBlockId() + i++, 0,
+            lastBlock.getGenerationStamp());
+        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+            .makeReportForReceivedBlock(block,
+                ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+        for (StorageReceivedDeletedBlocks report : reports) {
+          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+        }
+      }
+    }
+
+    // 2. RECEIVED_BLOCK IBR
+    i = 0;
+    for (DataNode dn : dataNodes) {
+      if (i < groupSize) {
+        final Block block = new Block(lastBlock.getBlockId() + i++,
+            BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
+        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+            .makeReportForReceivedBlock(block,
+                ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+        for (StorageReceivedDeletedBlocks report : reports) {
+          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+        }
+      }
+    }
+
+    lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
+    return lastBlock;
+  }
+
+  @Test
+  public void testMissingStripedBlock() throws Exception {
+    final int numBlocks = 4;
+    createECFile(cluster, filePath, dirPath, numBlocks);
+
+    // make sure the file is complete in NN
+    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+        .getINode4Write(filePath.toString()).asFile();
+    assertFalse(fileNode.isUnderConstruction());
+    assertTrue(fileNode.isWithStripedBlocks());
+    BlockInfo[] blocks = fileNode.getBlocks();
+    assertEquals(numBlocks, blocks.length);
+    for (BlockInfo blk : blocks) {
+      assertTrue(blk.isStriped());
+      assertTrue(blk.isComplete());
+      assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
+      final BlockInfoStriped sb = (BlockInfoStriped) blk;
+      assertEquals(GROUP_SIZE, sb.numNodes());
+    }
+
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+    BlockInfo firstBlock = fileNode.getBlocks()[0];
+    DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
+
+    DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor();
+    assertEquals(numBlocks, secondDn.numBlocks());
+
+    bm.getDatanodeManager().removeDatanode(secondDn);
+
+    BlockManagerTestUtil.getComputedDatanodeWork(bm);
+
+    // all the recovery work will be scheduled on the last DN
+    DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
+    DatanodeDescriptor last =
+          bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
+    assertEquals("Counting the number of outstanding EC tasks", numBlocks,
+        last.getNumberOfBlocksToBeErasureCoded());
+    List<BlockECRecoveryInfo> recovery = last.getErasureCodeCommand(numBlocks);
+    for (BlockECRecoveryInfo info : recovery) {
+      assertEquals(1, info.targets.length);
+      assertEquals(last, info.targets[0].getDatanodeDescriptor());
+      assertEquals(GROUP_SIZE - 1, info.sources.length);
+      assertEquals(GROUP_SIZE - 1, info.liveBlockIndices.length);
+    }
+  }
+}


Mime
View raw message