Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6FBB818F79 for ; Tue, 13 Oct 2015 03:07:47 +0000 (UTC) Received: (qmail 21414 invoked by uid 500); 13 Oct 2015 03:07:47 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 21352 invoked by uid 500); 13 Oct 2015 03:07:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 21337 invoked by uid 99); 13 Oct 2015 03:07:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2015 03:07:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DB81EE08E2; Tue, 13 Oct 2015 03:07:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lei@apache.org To: common-commits@hadoop.apache.org Message-Id: <509bbd66268442a0a70152999bc4693f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. (Ming Ma via lei) Date: Tue, 13 Oct 2015 03:07:46 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 eed61a5ed -> 218aa7bba HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. (Ming Ma via lei) (cherry picked from commit 0f5f9846edab3ea7e80f35000072136f998bcd46) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/218aa7bb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/218aa7bb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/218aa7bb Branch: refs/heads/branch-2 Commit: 218aa7bba179a19500681c8352ac92639867a7fa Parents: eed61a5 Author: Lei Xu Authored: Mon Oct 12 16:23:42 2015 -0700 Committer: Lei Xu Committed: Mon Oct 12 16:24:46 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../BlockPlacementPolicyDefault.java | 21 +- .../BlockPlacementPolicyWithUpgradeDomain.java | 264 ++++++++++++++ .../BlockPlacementStatusWithUpgradeDomain.java | 88 +++++ .../src/main/resources/hdfs-default.xml | 14 + .../BaseReplicationPolicyTest.java | 1 - .../blockmanagement/TestReplicationPolicy.java | 35 +- .../TestReplicationPolicyConsiderLoad.java | 5 +- .../TestReplicationPolicyWithNodeGroup.java | 36 +- .../TestReplicationPolicyWithUpgradeDomain.java | 353 +++++++++++++++++++ 11 files changed, 772 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b9e9e7d..3edfd5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -688,6 +688,9 @@ Release 2.8.0 - UNRELEASED HDFS-8988. Use LightWeightHashSet instead of LightWeightLinkedSet in BlockManager#excessReplicateMap. (yliu) + HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. + (Ming Ma via lei) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7821e5a..9005a65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -326,6 +326,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT = 16384; public static final int DFS_NAMENODE_MAX_XATTR_SIZE_HARD_LIMIT = 32768; + public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor"; + public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT; //Following keys have no defaults public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index f761150..ad399d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -125,14 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } Set favoriteAndExcludedNodes = excludedNodes == null ? - new HashSet() : new HashSet(excludedNodes); + new HashSet() : new HashSet<>(excludedNodes); final List requiredStorageTypes = storagePolicy .chooseStorageTypes((short)numOfReplicas); final EnumMap storageTypes = getRequiredStorageTypes(requiredStorageTypes); // Choose favored nodes - List results = new ArrayList(); + List results = new ArrayList<>(); boolean avoidStaleNodes = stats != null && stats.isAvoidingStaleDataNodesForWrite(); @@ -192,14 +192,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } if (excludedNodes == null) { - excludedNodes = new HashSet(); + excludedNodes = new HashSet<>(); } int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas); numOfReplicas = result[0]; int maxNodesPerRack = result[1]; - final List results = new ArrayList(chosenStorage); + final List results = new ArrayList<>(chosenStorage); for (DatanodeStorageInfo storage : chosenStorage) { // add localMachine and related nodes to excludedNodes addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes); @@ -266,8 +266,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { private EnumMap getRequiredStorageTypes( List types) { - EnumMap map = new EnumMap(StorageType.class); + EnumMap map = new EnumMap<>(StorageType.class); for (StorageType type : types) { if (!map.containsKey(type)) { map.put(type, 1); @@ -310,7 +309,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } // Keep a copy of original excludedNodes - final Set oldExcludedNodes = new HashSet(excludedNodes); + final Set oldExcludedNodes = new HashSet<>(excludedNodes); // choose storage types; use fallbacks for unavailable storages final List requiredStorageTypes = storagePolicy @@ -929,11 +928,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * So pick up first set if not empty. If first is empty, then pick second. */ protected Collection pickupReplicaSet( - Collection first, - Collection second) { - return first.isEmpty() ? second : first; + Collection moreThanOne, + Collection exactlyOne) { + return moreThanOne.isEmpty() ? exactlyOne : moreThanOne; } - + @VisibleForTesting void setPreferLocalNode(boolean prefer) { this.preferLocalNode = prefer; http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java new file mode 100644 index 0000000..71c02b8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.net.NetworkTopology; + +/** + * The class is responsible for choosing the desired number of targets + * for placing block replicas that honors upgrade domain policy. + * Here is the replica placement strategy. If the writer is on a datanode, + * the 1st replica is placed on the local machine, + * otherwise a random datanode. The 2nd replica is placed on a datanode + * that is on a different rack. The 3rd replica is placed on a datanode + * which is on a different node of the rack as the second replica. + * All 3 replicas have unique upgrade domains. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockPlacementPolicyWithUpgradeDomain extends + BlockPlacementPolicyDefault { + + private int upgradeDomainFactor; + + @Override + public void initialize(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) { + super.initialize(conf, stats, clusterMap, host2datanodeMap); + upgradeDomainFactor = conf.getInt( + DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR, + DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT); + } + + @Override + protected boolean isGoodDatanode(DatanodeDescriptor node, + int maxTargetPerRack, boolean considerLoad, + List results, boolean avoidStaleNodes) { + boolean isGoodTarget = super.isGoodDatanode(node, + maxTargetPerRack, considerLoad, results, avoidStaleNodes); + if (isGoodTarget) { + if (results.size() > 0 && results.size() < upgradeDomainFactor) { + // Each node in "results" has a different upgrade domain. Make sure + // the candidate node introduces a new upgrade domain. + Set upgradeDomains = getUpgradeDomains(results); + if (upgradeDomains.contains(node.getUpgradeDomain())) { + isGoodTarget = false; + } + } + } + return isGoodTarget; + } + + // If upgrade domain isn't specified, uses its XferAddr as upgrade domain. + // Such fallback is useful to test the scenario where upgrade domain isn't + // defined but the block placement is set to upgrade domain policy. + public String getUpgradeDomainWithDefaultValue(DatanodeInfo datanodeInfo) { + String upgradeDomain = datanodeInfo.getUpgradeDomain(); + if (upgradeDomain == null) { + LOG.warn("Upgrade domain isn't defined for " + datanodeInfo); + upgradeDomain = datanodeInfo.getXferAddr(); + } + return upgradeDomain; + } + + private String getUpgradeDomain(DatanodeStorageInfo storage) { + return getUpgradeDomainWithDefaultValue(storage.getDatanodeDescriptor()); + } + + private Set getUpgradeDomains(List results) { + Set upgradeDomains = new HashSet<>(); + if (results == null) { + return upgradeDomains; + } + for(DatanodeStorageInfo storageInfo : results) { + upgradeDomains.add(getUpgradeDomain(storageInfo)); + } + return upgradeDomains; + } + + private Set getUpgradeDomainsFromNodes(DatanodeInfo[] nodes) { + Set upgradeDomains = new HashSet<>(); + if (nodes == null) { + return upgradeDomains; + } + for(DatanodeInfo node : nodes) { + upgradeDomains.add(getUpgradeDomainWithDefaultValue(node)); + } + return upgradeDomains; + } + + private Map> getUpgradeDomainMap( + DatanodeStorageInfo[] storageInfos) { + Map> upgradeDomainMap = new HashMap<>(); + for(DatanodeStorageInfo storage : storageInfos) { + String upgradeDomain = getUpgradeDomainWithDefaultValue( + storage.getDatanodeDescriptor()); + List storages = upgradeDomainMap.get(upgradeDomain); + if (storages == null) { + storages = new ArrayList<>(); + upgradeDomainMap.put(upgradeDomain, storages); + } + storages.add(storage); + } + return upgradeDomainMap; + } + + @Override + public BlockPlacementStatus verifyBlockPlacement(String srcPath, + LocatedBlock lBlk, int numberOfReplicas) { + BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(srcPath, + lBlk, numberOfReplicas); + BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus = + new BlockPlacementStatusWithUpgradeDomain(defaultStatus, + getUpgradeDomainsFromNodes(lBlk.getLocations()), + numberOfReplicas, upgradeDomainFactor); + return upgradeDomainStatus; + } + + private List getShareUDSet( + Map> upgradeDomains) { + List getShareUDSet = new ArrayList<>(); + for (Map.Entry> e : upgradeDomains.entrySet()) { + if (e.getValue().size() > 1) { + getShareUDSet.addAll(e.getValue()); + } + } + return getShareUDSet; + } + + /* + * The policy to pick the replica set for deleting the over-replicated + * replica which meet the rack and upgrade domain requirements. + * The algorithm: + * a. Each replica has a boolean attribute "shareRack" that defines + * whether it shares its rack with another replica of the same block. + * b. Each replica has another boolean attribute "shareUD" that defines + * whether it shares its upgrade domain with another replica of the same + * block. + * c. Partition the replicas into 4 sets (some might be empty.): + * shareRackAndUDSet: {shareRack==true, shareUD==true} + * shareUDNotRackSet: {shareRack==false, shareUD==true} + * shareRackNotUDSet: {shareRack==true, shareUD==false} + * NoShareRackOrUDSet: {shareRack==false, shareUD==false} + * d. Pick the first not-empty replica set in the following order. + * shareRackAndUDSet, shareUDNotRackSet, shareRackNotUDSet, + * NoShareRackOrUDSet + * e. Proof this won't degrade the existing rack-based data + * availability model under different scenarios. + * 1. shareRackAndUDSet isn't empty. Removing a node + * from shareRackAndUDSet won't change # of racks and # of UD. + * The followings cover empty shareUDNotRackSet scenarios. + * 2. shareUDNotRackSet isn't empty and shareRackNotUDSet isn't empty. + * Let us proof that # of racks >= 3 before the deletion and thus + * after deletion # of racks >= 2. + * Given shareUDNotRackSet is empty, there won't be overlap between + * shareUDNotRackSet and shareRackNotUDSet. It means DNs in + * shareRackNotUDSet should be on at least a rack + * different from any DN' rack in shareUDNotRackSet. + * Given shareUDNotRackSet.size() >= 2 and each DN in the set + * doesn't share rack with any other DNs, there are at least 2 racks + * coming from shareUDNotRackSet. + * Thus the # of racks from DNs in {shareUDNotRackSet, + * shareRackNotUDSet} >= 3. Removing a node from shareUDNotRackSet + * will reduce the # of racks by 1 and won't change # of upgrade + * domains. + * Note that this is different from BlockPlacementPolicyDefault which + * will keep the # of racks after deletion. With upgrade domain policy, + * given # of racks is still >= 2 after deletion, the data availability + * model remains the same as BlockPlacementPolicyDefault (only supports + * one rack failure). + * For example, assume we have 4 replicas: d1(rack1, ud1), + * d2(rack2, ud1), d3(rack3, ud3), d4(rack3, ud4). Thus we have + * shareUDNotRackSet: {d1, d2} and shareRackNotUDSet: {d3, d4}. + * With upgrade domain policy, the remaining replicas after deletion + * are {d1(or d2), d3, d4} which has 2 racks. + * With BlockPlacementPolicyDefault policy, the remaining replicas + * after deletion are {d1, d2, d3(or d4)} which has 3 racks. + * 3. shareUDNotRackSet isn't empty and shareRackNotUDSet is empty. This + * implies all replicas are on unique racks. Removing a node from + * shareUDNotRackSet will reduce # of racks (no different from + * BlockPlacementPolicyDefault) by 1 and won't change # + * of upgrade domains. + * 4. shareUDNotRackSet is empty and shareRackNotUDSet isn't empty. + * Removing a node from shareRackNotUDSet is no different from + * BlockPlacementPolicyDefault. + * 5. shareUDNotRackSet is empty and shareRackNotUDSet is empty. + * Removing a node from NoShareRackOrUDSet is no different from + * BlockPlacementPolicyDefault. + * The implementation: + * 1. Generate set shareUDSet which includes all DatanodeStorageInfo that + * share the same upgrade domain with another DatanodeStorageInfo, + * e.g. {shareRackAndUDSet, shareUDNotRackSet}. + * 2. If shareUDSet is empty, it means shareRackAndUDSet is empty and + * shareUDNotRackSet is empty. Use the default rack based policy. + * 3. If shareUDSet isn't empty, intersect it with moreThanOne( + * {shareRackAndUDSet, shareRackNotUDSet})to generate shareRackAndUDSet. + * 4. If shareRackAndUDSet isn't empty, return + * shareRackAndUDSet, otherwise return shareUDSet which is the same as + * shareUDNotRackSet. + */ + @Override + protected Collection pickupReplicaSet( + Collection moreThanOne, + Collection exactlyOne) { + List all = new ArrayList<>(); + if (moreThanOne != null) { + all.addAll(moreThanOne); + } + if (exactlyOne != null) { + all.addAll(exactlyOne); + } + + Map> upgradeDomains = + getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()])); + + // shareUDSet includes DatanodeStorageInfo that share same upgrade + // domain with another DatanodeStorageInfo. + List shareUDSet = getShareUDSet(upgradeDomains); + // shareRackAndUDSet contains those DatanodeStorageInfo that + // share rack and upgrade domain with another DatanodeStorageInfo. + List shareRackAndUDSet = new ArrayList<>(); + if (shareUDSet.size() == 0) { + // All upgrade domains are unique, use the parent set. + return super.pickupReplicaSet(moreThanOne, exactlyOne); + } else if (moreThanOne != null) { + for (DatanodeStorageInfo storage : shareUDSet) { + if (moreThanOne.contains(storage)) { + shareRackAndUDSet.add(storage); + } + } + } + return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java new file mode 100644 index 0000000..e2e1486 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.Set; + +/** + * An implementation of @see BlockPlacementStatus for + * @see BlockPlacementPolicyWithUpgradeDomain + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockPlacementStatusWithUpgradeDomain implements + BlockPlacementStatus { + + private final BlockPlacementStatus parentBlockPlacementStatus; + private final Set upgradeDomains; + private final int numberOfReplicas; + private final int upgradeDomainFactor; + + /** + * @param parentBlockPlacementStatus the parent class' status + * @param upgradeDomains the set of upgrade domains of the replicas + * @param numberOfReplicas the number of replicas of the block + * @param upgradeDomainFactor the configured upgrade domain factor + */ + public BlockPlacementStatusWithUpgradeDomain( + BlockPlacementStatus parentBlockPlacementStatus, + Set upgradeDomains, int numberOfReplicas, + int upgradeDomainFactor){ + this.parentBlockPlacementStatus = parentBlockPlacementStatus; + this.upgradeDomains = upgradeDomains; + this.numberOfReplicas = numberOfReplicas; + this.upgradeDomainFactor = upgradeDomainFactor; + } + + @Override + public boolean isPlacementPolicySatisfied() { + return parentBlockPlacementStatus.isPlacementPolicySatisfied() && + isUpgradeDomainPolicySatisfied(); + } + + private boolean isUpgradeDomainPolicySatisfied() { + if (numberOfReplicas <= upgradeDomainFactor) { + return (numberOfReplicas == upgradeDomains.size()); + } else { + return upgradeDomains.size() >= upgradeDomainFactor; + } + } + + @Override + public String getErrorDescription() { + if (isPlacementPolicySatisfied()) { + return null; + } + StringBuilder errorDescription = new StringBuilder(); + if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) { + errorDescription.append(parentBlockPlacementStatus.getErrorDescription()); + } + if (!isUpgradeDomainPolicySatisfied()) { + if (errorDescription.length() != 0) { + errorDescription.append(" "); + } + errorDescription.append("The block has " + numberOfReplicas + + " replicas. But it only has " + upgradeDomains.size() + + " upgrade domains " + upgradeDomains +"."); + } + return errorDescription.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 88ab54f..5726ef2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2482,4 +2482,18 @@ + + dfs.namenode.upgrade.domain.factor + ${dfs.replication} + + This is valid only when block placement policy is set to + BlockPlacementPolicyWithUpgradeDomain. It defines the number of + unique upgrade domains any block's replicas should have. + When the number of replicas is less or equal to this value, the policy + ensures each replica has an unique upgrade domain. When the number of + replicas is greater than this value, the policy ensures the number of + unique domains is at least this value. + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java index c541da3..6174447 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -96,7 +96,6 @@ abstract public class BaseReplicationPolicyTest { // construct network topology for (int i=0; i < dataNodes.length; i++) { cluster.add(dataNodes[i]); - //bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]); bm.getDatanodeManager().getHeartbeatManager().addDatanode( dataNodes[i]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 2ddb1ec..3127e23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -83,7 +83,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { @Parameterized.Parameters public static Iterable data() { return Arrays.asList(new Object[][] { - { BlockPlacementPolicyDefault.class.getName() } }); + { BlockPlacementPolicyDefault.class.getName() }, + { BlockPlacementPolicyWithUpgradeDomain.class.getName() } }); } private void updateHeartbeatForExtraStorage(long capacity, @@ -231,10 +232,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { public void testChooseTarget2() throws Exception { Set excludedNodes; DatanodeStorageInfo[] targets; - List chosenNodes = new ArrayList(); - - excludedNodes = new HashSet(); - excludedNodes.add(dataNodes[1]); + List chosenNodes = new ArrayList<>(); + + excludedNodes = new HashSet<>(); + excludedNodes.add(dataNodes[1]); targets = chooseTarget(0, chosenNodes, excludedNodes); assertEquals(targets.length, 0); @@ -422,9 +423,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { "DS-xxxx", "7.7.7.7", "/d2/r3", "host7"); DatanodeDescriptor newDn = storage.getDatanodeDescriptor(); Set excludedNodes; - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); - excludedNodes = new HashSet(); + excludedNodes = new HashSet<>(); excludedNodes.add(dataNodes[0]); excludedNodes.add(dataNodes[1]); excludedNodes.add(dataNodes[2]); @@ -554,9 +555,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { assertEquals(targets.length, 1); assertEquals(storages[1], targets[0]); - Set excludedNodes = new HashSet(); + Set excludedNodes = new HashSet<>(); excludedNodes.add(dataNodes[1]); - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); targets = chooseTarget(1, chosenNodes, excludedNodes); assertEquals(targets.length, 1); assertFalse(isOnSameRack(targets[0], dataNodes[0])); @@ -726,8 +727,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { */ @Test public void testRereplicate1() throws Exception { - List chosenNodes = new ArrayList(); - chosenNodes.add(storages[0]); + List chosenNodes = new ArrayList<>(); + chosenNodes.add(storages[0]); DatanodeStorageInfo[] targets; targets = chooseTarget(0, chosenNodes); @@ -757,7 +758,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { */ @Test public void testRereplicate2() throws Exception { - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); chosenNodes.add(storages[0]); chosenNodes.add(storages[1]); @@ -784,7 +785,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { */ @Test public void testRereplicate3() throws Exception { - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); chosenNodes.add(storages[0]); chosenNodes.add(storages[2]); @@ -950,7 +951,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { */ @Test public void testChooseReplicaToDelete() throws Exception { - List replicaList = new ArrayList(); + List replicaList = new ArrayList<>(); final Map> rackMap = new HashMap>(); @@ -971,14 +972,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0); } - List first = new ArrayList(); - List second = new ArrayList(); + List first = new ArrayList<>(); + List second = new ArrayList<>(); replicator.splitNodesWithRack(replicaList, rackMap, first, second); // storages[0] and storages[1] are in first set as their rack has two // replica nodes, while storages[2] and dataNodes[5] are in second set. assertEquals(2, first.size()); assertEquals(2, second.size()); - List excessTypes = new ArrayList(); + List excessTypes = new ArrayList<>(); { // test returning null excessTypes.add(StorageType.SSD); http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index f4bd709..6e84b91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -45,7 +45,8 @@ public class TestReplicationPolicyConsiderLoad @Parameterized.Parameters public static Iterable data() { return Arrays.asList(new Object[][] { - { BlockPlacementPolicyDefault.class.getName() } }); + { BlockPlacementPolicyDefault.class.getName() }, + { BlockPlacementPolicyWithUpgradeDomain.class.getName() } }); } @Override @@ -111,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); assertEquals(3, targets.length); - Set targetSet = new HashSet( + Set targetSet = new HashSet<>( Arrays.asList(targets)); for (int i = 3; i < storages.length; i++) { assertTrue(targetSet.contains(storages[i])); http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 85598ca..86f10a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -137,7 +137,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes DatanodeStorageInfo[] targets) { if(targets.length == 0) return true; - Set targetSet = new HashSet(); + Set targetSet = new HashSet<>(); for(DatanodeStorageInfo storage:targets) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation()); @@ -217,7 +217,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes } private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) { - Set nodeGroupSet = new HashSet(); + Set nodeGroupSet = new HashSet<>(); for (DatanodeStorageInfo target: targets) { nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation()); } @@ -236,10 +236,10 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes public void testChooseTarget2() throws Exception { DatanodeStorageInfo[] targets; BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator; - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); - Set excludedNodes = new HashSet(); - excludedNodes.add(dataNodes[1]); + Set excludedNodes = new HashSet<>(); + excludedNodes.add(dataNodes[1]); targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); assertEquals(targets.length, 4); @@ -415,7 +415,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes @Test public void testRereplicate1() throws Exception { updateHeartbeatWithUsage(); - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); chosenNodes.add(storages[0]); DatanodeStorageInfo[] targets; @@ -448,7 +448,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes @Test public void testRereplicate2() throws Exception { updateHeartbeatWithUsage(); - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); chosenNodes.add(storages[0]); chosenNodes.add(storages[1]); @@ -476,7 +476,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes @Test public void testRereplicate3() throws Exception { updateHeartbeatWithUsage(); - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); chosenNodes.add(storages[0]); chosenNodes.add(storages[3]); @@ -511,9 +511,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes */ @Test public void testChooseReplicaToDelete() throws Exception { - List replicaList = new ArrayList(); - final Map> rackMap - = new HashMap>(); + List replicaList = new ArrayList<>(); + final Map> rackMap = new HashMap<>(); dataNodes[0].setRemaining(4*1024*1024); replicaList.add(storages[0]); @@ -526,13 +525,13 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes dataNodes[5].setRemaining(1*1024*1024); replicaList.add(storages[5]); - List first = new ArrayList(); - List second = new ArrayList(); + List first = new ArrayList<>(); + List second = new ArrayList<>(); replicator.splitNodesWithRack( replicaList, rackMap, first, second); assertEquals(3, first.size()); assertEquals(1, second.size()); - List excessTypes = new ArrayList(); + List excessTypes = new ArrayList<>(); excessTypes.add(StorageType.DEFAULT); DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete( null, null, (short)3, first, second, excessTypes); @@ -614,7 +613,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } - List chosenNodes = new ArrayList(); + List chosenNodes = new ArrayList<>(); chosenNodes.add(storagesInBoundaryCase[0]); chosenNodes.add(storagesInBoundaryCase[5]); DatanodeStorageInfo[] targets; @@ -703,11 +702,10 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); } - - List chosenNodes = new ArrayList(); - + + List chosenNodes = new ArrayList<>(); DatanodeStorageInfo[] targets; - Set excludedNodes = new HashSet(); + Set excludedNodes = new HashSet<>(); excludedNodes.add(dataNodesForDependencies[5]); //try to select three targets as there are three node groups http://git-wip-us.apache.org/repos/asf/hadoop/blob/218aa7bb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java new file mode 100644 index 0000000..feb2b79 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.TestBlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.net.Node; +import org.junit.Test; + + +public class TestReplicationPolicyWithUpgradeDomain + extends BaseReplicationPolicyTest { + public TestReplicationPolicyWithUpgradeDomain() { + this.blockPlacementPolicy = + BlockPlacementPolicyWithUpgradeDomain.class.getName(); + } + + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { + final String[] racks = { + "/d1/r1", + "/d1/r1", + "/d1/r1", + "/d1/r2", + "/d1/r2", + "/d1/r2", + "/d1/r3", + "/d1/r3", + "/d1/r3"}; + storages = DFSTestUtil.createDatanodeStorageInfos(racks); + DatanodeDescriptor dataNodes[] = + DFSTestUtil.toDatanodeDescriptor(storages); + for (int i=0; i < dataNodes.length; i++) { + // each rack has 3 DNs with upgrade domain id 1,2,3 respectively. + String upgradeDomain = Integer.toString((i%3)+1); + dataNodes[i].setUpgradeDomain(upgradeDomain); + } + return dataNodes; + } + + + /** + * Verify the targets are chosen to honor both + * rack and upgrade domain policies when number of replica is + * 0, 1, 2, 3, 4 respectively. + * @throws Exception + */ + @Test + public void testChooseTarget1() throws Exception { + updateHeartbeatWithUsage(dataNodes[0], + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 0L, 0L, 4, 0); + + DatanodeStorageInfo[] targets; + targets = chooseTarget(0); + assertEquals(targets.length, 0); + + targets = chooseTarget(1); + assertEquals(targets.length, 1); + assertEquals(storages[0], targets[0]); + + targets = chooseTarget(2); + assertEquals(targets.length, 2); + assertEquals(storages[0], targets[0]); + assertFalse(isOnSameRack(targets[0], targets[1])); + assertEquals(getUpgradeDomains(targets).size(), 2); + + targets = chooseTarget(3); + assertEquals(targets.length, 3); + assertEquals(storages[0], targets[0]); + assertFalse(isOnSameRack(targets[0], targets[1])); + assertTrue(isOnSameRack(targets[1], targets[2])); + assertEquals(getUpgradeDomains(targets).size(), 3); + + targets = chooseTarget(4); + assertEquals(targets.length, 4); + assertEquals(storages[0], targets[0]); + assertTrue(isOnSameRack(targets[1], targets[2]) || + isOnSameRack(targets[2], targets[3])); + assertFalse(isOnSameRack(targets[0], targets[2])); + assertEquals(getUpgradeDomains(targets).size(), 3); + + updateHeartbeatWithUsage(dataNodes[0], + 2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); + } + + /** + * Verify the rack and upgrade domain policies when excludeNodes are + * specified. + * @throws Exception + */ + @Test + public void testChooseTargetWithExcludeNodes() throws Exception { + Set excludedNodes = new HashSet<>(); + DatanodeStorageInfo[] targets; + List chosenNodes = new ArrayList<>(); + + excludedNodes.clear(); + chosenNodes.clear(); + excludedNodes.add(dataNodes[4]); + targets = chooseTarget(3, chosenNodes, excludedNodes); + assertEquals(targets.length, 3); + assertEquals(storages[0], targets[0]); + assertEquals(getRacks(targets).size(), 2); + assertEquals(getUpgradeDomains(targets).size(), 3); + + excludedNodes.clear(); + chosenNodes.clear(); + excludedNodes.add(dataNodes[4]); + excludedNodes.add(dataNodes[8]); + targets = chooseTarget(3, chosenNodes, excludedNodes); + assertEquals(targets.length, 3); + assertEquals(storages[0], targets[0]); + assertEquals(getRacks(targets).size(), 2); + assertEquals(getUpgradeDomains(targets).size(), 3); + + excludedNodes.clear(); + chosenNodes.clear(); + excludedNodes.add(dataNodes[4]); + excludedNodes.add(dataNodes[5]); + excludedNodes.add(dataNodes[8]); + targets = chooseTarget(3, chosenNodes, excludedNodes); + assertEquals(targets.length, 3); + assertEquals(storages[0], targets[0]); + assertEquals(storages[2], targets[1]); + assertEquals(storages[7], targets[2]); + + excludedNodes.clear(); + chosenNodes.clear(); + excludedNodes.add(dataNodes[4]); + targets = chooseTarget(4, chosenNodes, excludedNodes); + assertEquals(targets.length, 4); + assertEquals(storages[0], targets[0]); + assertTrue(getRacks(targets).size()>=2); + assertEquals(getUpgradeDomains(targets).size(), 3); + + excludedNodes.clear(); + chosenNodes.clear(); + excludedNodes.add(dataNodes[4]); + excludedNodes.add(dataNodes[8]); + targets = chooseTarget(4, chosenNodes, excludedNodes); + assertEquals(targets.length, 4); + assertEquals(storages[0], targets[0]); + assertTrue(getRacks(targets).size()>=2); + assertEquals(getUpgradeDomains(targets).size(), 3); + + excludedNodes.clear(); + chosenNodes.clear(); + excludedNodes.add(dataNodes[1]); + chosenNodes.add(storages[2]); + targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, + true, excludedNodes, BLOCK_SIZE, + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + System.out.println("targets=" + Arrays.asList(targets)); + assertEquals(2, targets.length); + } + + /** + * Verify the correct replica is chosen to satisfy both rack and upgrade + * domain policy. + * @throws Exception + */ + @Test + public void testChooseReplicaToDelete() throws Exception { + BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy = + (BlockPlacementPolicyWithUpgradeDomain)replicator; + List first = new ArrayList<>(); + List second = new ArrayList<>(); + List excessTypes = new ArrayList<>(); + excessTypes.add(StorageType.DEFAULT); + first.add(storages[0]); + first.add(storages[1]); + second.add(storages[4]); + second.add(storages[8]); + DatanodeStorageInfo chosenStorage = + upgradeDomainPolicy.chooseReplicaToDelete( + null, null, (short)3, first, second, excessTypes); + assertEquals(chosenStorage, storages[1]); + first.clear(); + second.clear(); + + excessTypes.add(StorageType.DEFAULT); + first.add(storages[0]); + first.add(storages[1]); + first.add(storages[4]); + first.add(storages[5]); + chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete( + null, null, (short)3, first, second, excessTypes); + assertTrue(chosenStorage.equals(storages[1]) || + chosenStorage.equals(storages[4])); + } + + /** + * Test the scenario where not enough replicas can't satisfy the policy. + * @throws Exception + */ + @Test + public void testChooseTargetWithoutEnoughReplica() throws Exception { + Set excludedNodes = new HashSet<>(); + DatanodeStorageInfo[] targets; + List chosenNodes = new ArrayList<>(); + + excludedNodes.clear(); + chosenNodes.clear(); + excludedNodes.add(dataNodes[4]); + excludedNodes.add(dataNodes[5]); + excludedNodes.add(dataNodes[7]); + excludedNodes.add(dataNodes[8]); + targets = chooseTarget(3, chosenNodes, excludedNodes); + assertEquals(targets.length, 2); + assertEquals(storages[0], targets[0]); + assertTrue(targets[1].equals(storages[1]) || + targets[1].equals(storages[2])); + } + + /** + * Test the scenario where not enough replicas can't satisfy the policy. + * @throws Exception + */ + @Test + public void testVerifyBlockPlacement() throws Exception { + LocatedBlock locatedBlock; + BlockPlacementStatus status; + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); + List set = new ArrayList<>(); + + // 2 upgrade domains (not enough), 2 racks (enough) + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[4]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + + // 3 upgrade domains (enough), 2 racks (enough) + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[5]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + assertTrue(status.isPlacementPolicySatisfied()); + + // 3 upgrade domains (enough), 1 rack (not enough) + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[2]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + assertFalse(status.getErrorDescription().contains("upgrade domain")); + + // 2 upgrade domains( not enough), 3 racks (enough) + set.clear(); + set.add(storages[0]); + set.add(storages[5]); + set.add(storages[8]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + assertTrue(status.getErrorDescription().contains("upgrade domain")); + + // 3 upgrade domains (enough), 3 racks (enough) + set.clear(); + set.add(storages[0]); + set.add(storages[4]); + set.add(storages[8]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + assertTrue(status.isPlacementPolicySatisfied()); + + + // 3 upgrade domains (enough), 3 racks (enough), 4 replicas + set.clear(); + set.add(storages[0]); + set.add(storages[1]); + set.add(storages[5]); + set.add(storages[8]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + assertTrue(status.isPlacementPolicySatisfied()); + + // 2 upgrade domains (not enough), 3 racks (enough), 4 replicas + set.clear(); + set.add(storages[0]); + set.add(storages[3]); + set.add(storages[5]); + set.add(storages[8]); + locatedBlock = BlockManager.newLocatedBlock(b, + set.toArray(new DatanodeStorageInfo[set.size()]), 0, false); + status = replicator.verifyBlockPlacement("", locatedBlock, set.size()); + assertFalse(status.isPlacementPolicySatisfied()); + } + + private Set getUpgradeDomains(DatanodeStorageInfo[] nodes) { + HashSet upgradeDomains = new HashSet<>(); + for (DatanodeStorageInfo node : nodes) { + upgradeDomains.add(node.getDatanodeDescriptor().getUpgradeDomain()); + } + return upgradeDomains; + } + + private Set getRacks(DatanodeStorageInfo[] nodes) { + HashSet racks = new HashSet<>(); + for (DatanodeStorageInfo node : nodes) { + String rack = node.getDatanodeDescriptor().getNetworkLocation(); + racks.add(rack); + } + return racks; + } +}