hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject hadoop git commit: HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block. Contributed by Walter Su.
Date Fri, 19 Jun 2015 18:55:59 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 3682e0198 -> 448cb7df6


HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block. Contributed
by Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/448cb7df
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/448cb7df
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/448cb7df

Branch: refs/heads/HDFS-7285
Commit: 448cb7df676d3c0f5fdc52fbbe736f3b54e519a3
Parents: 3682e01
Author: Jing Zhao <jing9@apache.org>
Authored: Fri Jun 19 11:53:05 2015 -0700
Committer: Jing Zhao <jing9@apache.org>
Committed: Fri Jun 19 11:53:05 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../server/blockmanagement/BlockManager.java    | 151 +++++++++++++++----
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  15 +-
 .../hdfs/server/balancer/TestBalancer.java      |   1 +
 .../hadoop/hdfs/server/mover/TestMover.java     |   1 +
 .../TestAddOverReplicatedStripedBlocks.java     | 116 ++++++++++++++
 6 files changed, 254 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index a710c2e..a12f361 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -311,3 +311,6 @@
 
     HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to
     tolerate datanode failure. (Tsz Wo Nicholas Sze via jing9)
+
+    HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block.
+    (Walter Su via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 48a1b35..2533ca5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -3085,10 +3086,30 @@ public class BlockManager {
         }
       }
     }
-    chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint, placementPolicies.getPolicy(false));
+    chooseExcessReplicates(nonExcess, block, replication, addedNode,
+        delNodeHint);
   }
 
+  private void chooseExcessReplicates(
+      final Collection<DatanodeStorageInfo> nonExcess,
+      BlockInfo storedBlock, short replication,
+      DatanodeDescriptor addedNode,
+      DatanodeDescriptor delNodeHint) {
+    assert namesystem.hasWriteLock();
+    // first form a rack to datanodes map and
+    BlockCollection bc = getBlockCollection(storedBlock);
+    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+        bc.getStoragePolicyID());
+    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
+        replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
+    if (!storedBlock.isStriped()) {
+      chooseExcessReplicasContiguous(bc, nonExcess, storedBlock,
+          replication, addedNode, delNodeHint, excessTypes);
+    } else {
+      chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint,
+          excessTypes);
+    }
+  }
 
   /**
    * We want "replication" replicates for the block, but we now have too many.  
@@ -3104,20 +3125,13 @@ public class BlockManager {
    * If no such a node is available,
    * then pick a node with least free space
    */
-  private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,

-                              BlockInfo storedBlock, short replication,
-                              DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint,
-                              BlockPlacementPolicy replicator) {
-    assert namesystem.hasWriteLock();
-    // first form a rack to datanodes map and
-    BlockCollection bc = getBlockCollection(storedBlock);
-    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
-        bc.getStoragePolicyID());
-    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
-        replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
-
-
+  private void chooseExcessReplicasContiguous(BlockCollection bc,
+      final Collection<DatanodeStorageInfo> nonExcess,
+      BlockInfo storedBlock, short replication,
+      DatanodeDescriptor addedNode,
+      DatanodeDescriptor delNodeHint,
+      List<StorageType> excessTypes) {
+    BlockPlacementPolicy replicator = placementPolicies.getPolicy(false);
     final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
     final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
     final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
@@ -3145,28 +3159,101 @@ public class BlockManager {
             moreThanOne, exactlyOne, excessTypes);
       }
       firstOne = false;
-
       // adjust rackmap, moreThanOne, and exactlyOne
       replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
           exactlyOne, cur);
 
-      nonExcess.remove(cur);
-      addToExcessReplicate(cur.getDatanodeDescriptor(), storedBlock);
+      processChosenExcessReplica(nonExcess, cur, storedBlock);
+    }
+  }
 
-      //
-      // The 'excessblocks' tracks blocks until we get confirmation
-      // that the datanode has deleted them; the only way we remove them
-      // is when we get a "removeBlock" message.  
-      //
-      // The 'invalidate' list is used to inform the datanode the block 
-      // should be deleted.  Items are removed from the invalidate list
-      // upon giving instructions to the datanodes.
-      //
-      final Block blockToInvalidate = getBlockToInvalidate(storedBlock, cur);
-      addToInvalidates(blockToInvalidate, cur.getDatanodeDescriptor());
-      blockLog.info("BLOCK* chooseExcessReplicates: "
-                +"({}, {}) is added to invalidated blocks set", cur, storedBlock);
+  /**
+   * We want block group has every internal block, but we have redundant
+   * internal blocks (which have the same index).
+   * In this method, we delete the redundant internal blocks until only one
+   * left for each index.
+   *
+   * The block placement policy will make sure that the left internal blocks are
+   * spread across racks and also try hard to pick one with least free space.
+   */
+  private void chooseExcessReplicasStriped(BlockCollection bc,
+      final Collection<DatanodeStorageInfo> nonExcess,
+      BlockInfo storedBlock,
+      DatanodeDescriptor delNodeHint,
+      List<StorageType> excessTypes) {
+    assert storedBlock instanceof BlockInfoStriped;
+    BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
+    short groupSize = sblk.getTotalBlockNum();
+    if (nonExcess.size() <= groupSize) {
+      return;
+    }
+    BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true);
+    List<DatanodeStorageInfo> empty = new ArrayList<>(0);
+
+    // find all duplicated indices
+    BitSet found = new BitSet(groupSize); //indices found
+    BitSet duplicated = new BitSet(groupSize); //indices found more than once
+    HashMap<DatanodeStorageInfo, Integer> storage2index = new HashMap<>();
+    for (DatanodeStorageInfo storage : nonExcess) {
+      int index = sblk.getStorageBlockIndex(storage);
+      assert index >= 0;
+      if (found.get(index)) {
+        duplicated.set(index);
+      }
+      found.set(index);
+      storage2index.put(storage, index);
     }
+
+    // use delHint only if delHint is duplicated
+    final DatanodeStorageInfo delStorageHint =
+        DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
+    if (delStorageHint != null) {
+      Integer index = storage2index.get(delStorageHint);
+      if (index != null && duplicated.get(index)) {
+        processChosenExcessReplica(nonExcess, delStorageHint, storedBlock);
+      }
+    }
+
+    // for each duplicated index, delete some replicas until only one left
+    for (int targetIndex = duplicated.nextSetBit(0); targetIndex >= 0;
+         targetIndex = duplicated.nextSetBit(targetIndex + 1)) {
+      List<DatanodeStorageInfo> candidates = new ArrayList<>();
+      for (DatanodeStorageInfo storage : nonExcess) {
+        int index = storage2index.get(storage);
+        if (index == targetIndex) {
+          candidates.add(storage);
+        }
+      }
+      Block internalBlock = new Block(storedBlock);
+      internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex);
+      while (candidates.size() > 1) {
+        DatanodeStorageInfo target = placementPolicy.chooseReplicaToDelete(bc,
+            internalBlock, (short)1, candidates, empty, excessTypes);
+        processChosenExcessReplica(nonExcess, target, storedBlock);
+        candidates.remove(target);
+      }
+      duplicated.clear(targetIndex);
+    }
+  }
+
+  private void processChosenExcessReplica(
+      final Collection<DatanodeStorageInfo> nonExcess,
+      final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
+    nonExcess.remove(chosen);
+    addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
+    //
+    // The 'excessblocks' tracks blocks until we get confirmation
+    // that the datanode has deleted them; the only way we remove them
+    // is when we get a "removeBlock" message.
+    //
+    // The 'invalidate' list is used to inform the datanode the block
+    // should be deleted.  Items are removed from the invalidate list
+    // upon giving instructions to the datanodes.
+    //
+    final Block blockToInvalidate = getBlockToInvalidate(storedBlock, chosen);
+    addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
+    blockLog.info("BLOCK* chooseExcessReplicates: "
+        +"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
   }
 
   /** Check if we can use delHint */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index b9ded80..7c9eabf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -1968,17 +1969,29 @@ public class DFSTestUtil {
   }
 
   /**
-   * Verify that blocks in striped block group are on different nodes.
+   * Verify that blocks in striped block group are on different nodes, and every
+   * internal blocks exists.
    */
   public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
        int groupSize) {
     for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
       HashSet<DatanodeInfo> locs = new HashSet<>();
       for (DatanodeInfo datanodeInfo : lb.getLocations()) {
         locs.add(datanodeInfo);
       }
       assertEquals(groupSize, lb.getLocations().length);
       assertEquals(groupSize, locs.size());
+
+      // verify that every internal blocks exists
+      int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+      assertEquals(groupSize, blockIndices.length);
+      HashSet<Integer> found = new HashSet<>();
+      for (int index : blockIndices) {
+        assert index >=0;
+        found.add(index);
+      }
+      assertEquals(groupSize, found.size());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index f6475cd..759eb45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -143,6 +143,7 @@ public class TestBalancer {
 
   static void initConfWithStripe(Configuration conf) {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
     SimulatedFSDataset.setFactory(conf);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 74f09fd..29e8d24 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -379,6 +379,7 @@ public class TestMover {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
     Dispatcher.setBlockMoveWaitTime(3000L);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
new file mode 100644
index 0000000..eaf3435
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAddOverReplicatedStripedBlocks {
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private final Path dirPath = new Path("/striped");
+  private Path filePath = new Path(dirPath, "file");
+  private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int NUM_STRIPE_PER_BLOCK = 1;
+  private final int BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
+  private final int numDNs = GROUP_SIZE + 3;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    SimulatedFSDataset.setFactory(conf);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    fs.mkdirs(dirPath);
+    fs.getClient().createErasureCodingZone(dirPath.toString(), null, CELLSIZE);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testProcessOverReplicatedStripedBlock() throws Exception {
+    // create a file which has exact one block group to the first GROUP_SIZE DNs
+    long fileLen = DATA_BLK_NUM * BLOCK_SIZE;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, 1,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+    long gs = bg.getBlock().getGenerationStamp();
+    String bpid = bg.getBlock().getBlockPoolId();
+    long groupId = bg.getBlock().getBlockId();
+    Block blk = new Block(groupId, BLOCK_SIZE, gs);
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      blk.setBlockId(groupId + i);
+      cluster.injectBlocks(i, Arrays.asList(blk), bpid);
+    }
+    cluster.triggerBlockReports();
+
+    // let a internal block be over replicated with 2 redundant blocks.
+    blk.setBlockId(groupId + 2);
+    cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
+    cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
+    // let a internal block be over replicated with 1 redundant block.
+    blk.setBlockId(groupId + 6);
+    cluster.injectBlocks(numDNs - 1, Arrays.asList(blk), bpid);
+
+    // update blocksMap
+    cluster.triggerBlockReports();
+    // add to invalidates
+    cluster.triggerHeartbeats();
+    // datanode delete block
+    cluster.triggerHeartbeats();
+    // update blocksMap
+    cluster.triggerBlockReports();
+
+    // verify that all internal blocks exists
+    lbs = cluster.getNameNodeRpc().getBlockLocations(
+        filePath.toString(), 0, fileLen);
+    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+  }
+}


Mime
View raw message