Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 261FF18093 for ; Fri, 19 Jun 2015 18:56:00 +0000 (UTC) Received: (qmail 49269 invoked by uid 500); 19 Jun 2015 18:55:59 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 49207 invoked by uid 500); 19 Jun 2015 18:55:59 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 49198 invoked by uid 99); 19 Jun 2015 18:55:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2015 18:55:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BA04BE0428; Fri, 19 Jun 2015 18:55:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jing9@apache.org To: common-commits@hadoop.apache.org Message-Id: <090d7ecc36b14698bd78b57570912b42@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block. Contributed by Walter Su. Date: Fri, 19 Jun 2015 18:55:59 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/HDFS-7285 3682e0198 -> 448cb7df6 HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block. Contributed by Walter Su. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/448cb7df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/448cb7df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/448cb7df Branch: refs/heads/HDFS-7285 Commit: 448cb7df676d3c0f5fdc52fbbe736f3b54e519a3 Parents: 3682e01 Author: Jing Zhao Authored: Fri Jun 19 11:53:05 2015 -0700 Committer: Jing Zhao Committed: Fri Jun 19 11:53:05 2015 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../server/blockmanagement/BlockManager.java | 151 +++++++++++++++---- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 15 +- .../hdfs/server/balancer/TestBalancer.java | 1 + .../hadoop/hdfs/server/mover/TestMover.java | 1 + .../TestAddOverReplicatedStripedBlocks.java | 116 ++++++++++++++ 6 files changed, 254 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index a710c2e..a12f361 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -311,3 +311,6 @@ HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to tolerate datanode failure. (Tsz Wo Nicholas Sze via jing9) + + HDFS-8543. Erasure Coding: processOverReplicatedBlock() handles striped block. + (Walter Su via jing9) http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 48a1b35..2533ca5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -3085,10 +3086,30 @@ public class BlockManager { } } } - chooseExcessReplicates(nonExcess, block, replication, - addedNode, delNodeHint, placementPolicies.getPolicy(false)); + chooseExcessReplicates(nonExcess, block, replication, addedNode, + delNodeHint); } + private void chooseExcessReplicates( + final Collection nonExcess, + BlockInfo storedBlock, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint) { + assert namesystem.hasWriteLock(); + // first form a rack to datanodes map and + BlockCollection bc = getBlockCollection(storedBlock); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); + final List excessTypes = storagePolicy.chooseExcess( + replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); + if (!storedBlock.isStriped()) { + chooseExcessReplicasContiguous(bc, nonExcess, storedBlock, + replication, addedNode, delNodeHint, excessTypes); + } else { + chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint, + excessTypes); + } + } /** * We want "replication" replicates for the block, but we now have too many. @@ -3104,20 +3125,13 @@ public class BlockManager { * If no such a node is available, * then pick a node with least free space */ - private void chooseExcessReplicates(final Collection nonExcess, - BlockInfo storedBlock, short replication, - DatanodeDescriptor addedNode, - DatanodeDescriptor delNodeHint, - BlockPlacementPolicy replicator) { - assert namesystem.hasWriteLock(); - // first form a rack to datanodes map and - BlockCollection bc = getBlockCollection(storedBlock); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( - bc.getStoragePolicyID()); - final List excessTypes = storagePolicy.chooseExcess( - replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); - - + private void chooseExcessReplicasContiguous(BlockCollection bc, + final Collection nonExcess, + BlockInfo storedBlock, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint, + List excessTypes) { + BlockPlacementPolicy replicator = placementPolicies.getPolicy(false); final Map> rackMap = new HashMap<>(); final List moreThanOne = new ArrayList<>(); final List exactlyOne = new ArrayList<>(); @@ -3145,28 +3159,101 @@ public class BlockManager { moreThanOne, exactlyOne, excessTypes); } firstOne = false; - // adjust rackmap, moreThanOne, and exactlyOne replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur); - nonExcess.remove(cur); - addToExcessReplicate(cur.getDatanodeDescriptor(), storedBlock); + processChosenExcessReplica(nonExcess, cur, storedBlock); + } + } - // - // The 'excessblocks' tracks blocks until we get confirmation - // that the datanode has deleted them; the only way we remove them - // is when we get a "removeBlock" message. - // - // The 'invalidate' list is used to inform the datanode the block - // should be deleted. Items are removed from the invalidate list - // upon giving instructions to the datanodes. - // - final Block blockToInvalidate = getBlockToInvalidate(storedBlock, cur); - addToInvalidates(blockToInvalidate, cur.getDatanodeDescriptor()); - blockLog.info("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", cur, storedBlock); + /** + * We want block group has every internal block, but we have redundant + * internal blocks (which have the same index). + * In this method, we delete the redundant internal blocks until only one + * left for each index. + * + * The block placement policy will make sure that the left internal blocks are + * spread across racks and also try hard to pick one with least free space. + */ + private void chooseExcessReplicasStriped(BlockCollection bc, + final Collection nonExcess, + BlockInfo storedBlock, + DatanodeDescriptor delNodeHint, + List excessTypes) { + assert storedBlock instanceof BlockInfoStriped; + BlockInfoStriped sblk = (BlockInfoStriped) storedBlock; + short groupSize = sblk.getTotalBlockNum(); + if (nonExcess.size() <= groupSize) { + return; + } + BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true); + List empty = new ArrayList<>(0); + + // find all duplicated indices + BitSet found = new BitSet(groupSize); //indices found + BitSet duplicated = new BitSet(groupSize); //indices found more than once + HashMap storage2index = new HashMap<>(); + for (DatanodeStorageInfo storage : nonExcess) { + int index = sblk.getStorageBlockIndex(storage); + assert index >= 0; + if (found.get(index)) { + duplicated.set(index); + } + found.set(index); + storage2index.put(storage, index); } + + // use delHint only if delHint is duplicated + final DatanodeStorageInfo delStorageHint = + DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint); + if (delStorageHint != null) { + Integer index = storage2index.get(delStorageHint); + if (index != null && duplicated.get(index)) { + processChosenExcessReplica(nonExcess, delStorageHint, storedBlock); + } + } + + // for each duplicated index, delete some replicas until only one left + for (int targetIndex = duplicated.nextSetBit(0); targetIndex >= 0; + targetIndex = duplicated.nextSetBit(targetIndex + 1)) { + List candidates = new ArrayList<>(); + for (DatanodeStorageInfo storage : nonExcess) { + int index = storage2index.get(storage); + if (index == targetIndex) { + candidates.add(storage); + } + } + Block internalBlock = new Block(storedBlock); + internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex); + while (candidates.size() > 1) { + DatanodeStorageInfo target = placementPolicy.chooseReplicaToDelete(bc, + internalBlock, (short)1, candidates, empty, excessTypes); + processChosenExcessReplica(nonExcess, target, storedBlock); + candidates.remove(target); + } + duplicated.clear(targetIndex); + } + } + + private void processChosenExcessReplica( + final Collection nonExcess, + final DatanodeStorageInfo chosen, BlockInfo storedBlock) { + nonExcess.remove(chosen); + addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock); + // + // The 'excessblocks' tracks blocks until we get confirmation + // that the datanode has deleted them; the only way we remove them + // is when we get a "removeBlock" message. + // + // The 'invalidate' list is used to inform the datanode the block + // should be deleted. Items are removed from the invalidate list + // upon giving instructions to the datanodes. + // + final Block blockToInvalidate = getBlockToInvalidate(storedBlock, chosen); + addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor()); + blockLog.info("BLOCK* chooseExcessReplicates: " + +"({}, {}) is added to invalidated blocks set", chosen, storedBlock); } /** Check if we can use delHint */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index b9ded80..7c9eabf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -1968,17 +1969,29 @@ public class DFSTestUtil { } /** - * Verify that blocks in striped block group are on different nodes. + * Verify that blocks in striped block group are on different nodes, and every + * internal blocks exists. */ public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, int groupSize) { for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; HashSet locs = new HashSet<>(); for (DatanodeInfo datanodeInfo : lb.getLocations()) { locs.add(datanodeInfo); } assertEquals(groupSize, lb.getLocations().length); assertEquals(groupSize, locs.size()); + + // verify that every internal blocks exists + int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices(); + assertEquals(groupSize, blockIndices.length); + HashSet found = new HashSet<>(); + for (int index : blockIndices) { + assert index >=0; + found.add(index); + } + assertEquals(groupSize, found.size()); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index f6475cd..759eb45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -143,6 +143,7 @@ public class TestBalancer { static void initConfWithStripe(Configuration conf) { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); SimulatedFSDataset.setFactory(conf); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 74f09fd..29e8d24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -379,6 +379,7 @@ public class TestMover { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); Dispatcher.setBlockMoveWaitTime(3000L); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/448cb7df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java new file mode 100644 index 0000000..eaf3435 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestAddOverReplicatedStripedBlocks { + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; + private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; + private final short GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM; + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int NUM_STRIPE_PER_BLOCK = 1; + private final int BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE; + private final int numDNs = GROUP_SIZE + 3; + + @Before + public void setup() throws IOException { + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + SimulatedFSDataset.setFactory(conf); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + fs.mkdirs(dirPath); + fs.getClient().createErasureCodingZone(dirPath.toString(), null, CELLSIZE); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testProcessOverReplicatedStripedBlock() throws Exception { + // create a file which has exact one block group to the first GROUP_SIZE DNs + long fileLen = DATA_BLK_NUM * BLOCK_SIZE; + DFSTestUtil.createStripedFile(cluster, filePath, null, 1, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + for (int i = 0; i < GROUP_SIZE; i++) { + blk.setBlockId(groupId + i); + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + } + cluster.triggerBlockReports(); + + // let a internal block be over replicated with 2 redundant blocks. + blk.setBlockId(groupId + 2); + cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid); + cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid); + // let a internal block be over replicated with 1 redundant block. + blk.setBlockId(groupId + 6); + cluster.injectBlocks(numDNs - 1, Arrays.asList(blk), bpid); + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // verify that all internal blocks exists + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); + } +}