hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject hadoop git commit: HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. (Ming Ma via lei)
Date Tue, 13 Oct 2015 03:07:35 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk c60a16fce -> 0f5f9846e


HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. (Ming Ma via lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0f5f9846
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0f5f9846
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0f5f9846

Branch: refs/heads/trunk
Commit: 0f5f9846edab3ea7e80f35000072136f998bcd46
Parents: c60a16f
Author: Lei Xu <lei@apache.org>
Authored: Mon Oct 12 16:23:42 2015 -0700
Committer: Lei Xu <lei@apache.org>
Committed: Mon Oct 12 16:24:16 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +
 .../BlockPlacementPolicyDefault.java            |  21 +-
 .../BlockPlacementPolicyWithUpgradeDomain.java  | 264 ++++++++++++++
 .../BlockPlacementStatusWithUpgradeDomain.java  |  88 +++++
 .../src/main/resources/hdfs-default.xml         |  14 +
 .../BaseReplicationPolicyTest.java              |   1 -
 .../blockmanagement/TestReplicationPolicy.java  |  35 +-
 .../TestReplicationPolicyConsiderLoad.java      |   5 +-
 .../TestReplicationPolicyWithNodeGroup.java     |  36 +-
 .../TestReplicationPolicyWithUpgradeDomain.java | 353 +++++++++++++++++++
 11 files changed, 772 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 938546c..23abc97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1515,6 +1515,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8988. Use LightWeightHashSet instead of LightWeightLinkedSet in
     BlockManager#excessReplicateMap. (yliu)
 
+    HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain.
+    (Ming Ma via lei)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0d24c8f..8510c9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -333,6 +333,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_MAX_XATTR_SIZE_DEFAULT = 16384;
   public static final int     DFS_NAMENODE_MAX_XATTR_SIZE_HARD_LIMIT = 32768;
 
+  public static final String  DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
+  public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
 
   //Following keys have no defaults
   public static final String  DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index f761150..ad399d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -125,14 +125,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       }
 
       Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
-          new HashSet<Node>() : new HashSet<Node>(excludedNodes);
+          new HashSet<Node>() : new HashSet<>(excludedNodes);
       final List<StorageType> requiredStorageTypes = storagePolicy
           .chooseStorageTypes((short)numOfReplicas);
       final EnumMap<StorageType, Integer> storageTypes =
           getRequiredStorageTypes(requiredStorageTypes);
 
       // Choose favored nodes
-      List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
+      List<DatanodeStorageInfo> results = new ArrayList<>();
       boolean avoidStaleNodes = stats != null
           && stats.isAvoidingStaleDataNodesForWrite();
 
@@ -192,14 +192,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     }
       
     if (excludedNodes == null) {
-      excludedNodes = new HashSet<Node>();
+      excludedNodes = new HashSet<>();
     }
      
     int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
     numOfReplicas = result[0];
     int maxNodesPerRack = result[1];
       
-    final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
+    final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage);
     for (DatanodeStorageInfo storage : chosenStorage) {
       // add localMachine and related nodes to excludedNodes
       addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
@@ -266,8 +266,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
 
   private EnumMap<StorageType, Integer> getRequiredStorageTypes(
       List<StorageType> types) {
-    EnumMap<StorageType, Integer> map = new EnumMap<StorageType,
-        Integer>(StorageType.class);
+    EnumMap<StorageType, Integer> map = new EnumMap<>(StorageType.class);
     for (StorageType type : types) {
       if (!map.containsKey(type)) {
         map.put(type, 1);
@@ -310,7 +309,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     }
 
     // Keep a copy of original excludedNodes
-    final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
+    final Set<Node> oldExcludedNodes = new HashSet<>(excludedNodes);
 
     // choose storage types; use fallbacks for unavailable storages
     final List<StorageType> requiredStorageTypes = storagePolicy
@@ -929,11 +928,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    * So pick up first set if not empty. If first is empty, then pick second.
    */
   protected Collection<DatanodeStorageInfo> pickupReplicaSet(
-      Collection<DatanodeStorageInfo> first,
-      Collection<DatanodeStorageInfo> second) {
-    return first.isEmpty() ? second : first;
+      Collection<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne) {
+    return moreThanOne.isEmpty() ? exactlyOne : moreThanOne;
   }
-  
+
   @VisibleForTesting
   void setPreferLocalNode(boolean prefer) {
     this.preferLocalNode = prefer;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
new file mode 100644
index 0000000..71c02b8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+
+/**
+ * The class is responsible for choosing the desired number of targets
+ * for placing block replicas that honors upgrade domain policy.
+ * Here is the replica placement strategy. If the writer is on a datanode,
+ * the 1st replica is placed on the local machine,
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on a different node of the rack as the second replica.
+ * All 3 replicas have unique upgrade domains.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockPlacementPolicyWithUpgradeDomain extends
+    BlockPlacementPolicyDefault {
+
+  private int upgradeDomainFactor;
+
+  @Override
+  public void initialize(Configuration conf,  FSClusterStats stats,
+      NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) {
+    super.initialize(conf, stats, clusterMap, host2datanodeMap);
+    upgradeDomainFactor = conf.getInt(
+        DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR,
+        DFSConfigKeys.DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT);
+  }
+
+  @Override
+  protected boolean isGoodDatanode(DatanodeDescriptor node,
+      int maxTargetPerRack, boolean considerLoad,
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes) {
+    boolean isGoodTarget = super.isGoodDatanode(node,
+        maxTargetPerRack, considerLoad, results, avoidStaleNodes);
+    if (isGoodTarget) {
+      if (results.size() > 0 && results.size() < upgradeDomainFactor) {
+        // Each node in "results" has a different upgrade domain. Make sure
+        // the candidate node introduces a new upgrade domain.
+        Set<String> upgradeDomains = getUpgradeDomains(results);
+        if (upgradeDomains.contains(node.getUpgradeDomain())) {
+          isGoodTarget = false;
+        }
+      }
+    }
+    return isGoodTarget;
+  }
+
+  // If upgrade domain isn't specified, uses its XferAddr as upgrade domain.
+  // Such fallback is useful to test the scenario where upgrade domain isn't
+  // defined but the block placement is set to upgrade domain policy.
+  public String getUpgradeDomainWithDefaultValue(DatanodeInfo datanodeInfo) {
+    String upgradeDomain = datanodeInfo.getUpgradeDomain();
+    if (upgradeDomain == null) {
+      LOG.warn("Upgrade domain isn't defined for " + datanodeInfo);
+      upgradeDomain = datanodeInfo.getXferAddr();
+    }
+    return upgradeDomain;
+  }
+
+  private String getUpgradeDomain(DatanodeStorageInfo storage) {
+    return getUpgradeDomainWithDefaultValue(storage.getDatanodeDescriptor());
+  }
+
+  private Set<String> getUpgradeDomains(List<DatanodeStorageInfo> results) {
+    Set<String> upgradeDomains = new HashSet<>();
+    if (results == null) {
+      return upgradeDomains;
+    }
+    for(DatanodeStorageInfo storageInfo : results) {
+      upgradeDomains.add(getUpgradeDomain(storageInfo));
+    }
+    return upgradeDomains;
+  }
+
+  private Set<String> getUpgradeDomainsFromNodes(DatanodeInfo[] nodes) {
+    Set<String> upgradeDomains = new HashSet<>();
+    if (nodes == null) {
+      return upgradeDomains;
+    }
+    for(DatanodeInfo node : nodes) {
+      upgradeDomains.add(getUpgradeDomainWithDefaultValue(node));
+    }
+    return upgradeDomains;
+  }
+
+  private Map<String, List<DatanodeStorageInfo>> getUpgradeDomainMap(
+      DatanodeStorageInfo[] storageInfos) {
+    Map<String, List<DatanodeStorageInfo>> upgradeDomainMap = new HashMap<>();
+    for(DatanodeStorageInfo storage : storageInfos) {
+      String upgradeDomain = getUpgradeDomainWithDefaultValue(
+          storage.getDatanodeDescriptor());
+      List<DatanodeStorageInfo> storages = upgradeDomainMap.get(upgradeDomain);
+      if (storages == null) {
+        storages = new ArrayList<>();
+        upgradeDomainMap.put(upgradeDomain, storages);
+      }
+      storages.add(storage);
+    }
+    return upgradeDomainMap;
+  }
+
+  @Override
+  public BlockPlacementStatus verifyBlockPlacement(String srcPath,
+      LocatedBlock lBlk, int numberOfReplicas) {
+    BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(srcPath,
+        lBlk, numberOfReplicas);
+    BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus =
+        new BlockPlacementStatusWithUpgradeDomain(defaultStatus,
+            getUpgradeDomainsFromNodes(lBlk.getLocations()),
+                numberOfReplicas, upgradeDomainFactor);
+    return upgradeDomainStatus;
+  }
+
+  private <T> List<T> getShareUDSet(
+      Map<String, List<T>> upgradeDomains) {
+    List<T> getShareUDSet = new ArrayList<>();
+    for (Map.Entry<String, List<T>> e : upgradeDomains.entrySet()) {
+      if (e.getValue().size() > 1) {
+        getShareUDSet.addAll(e.getValue());
+      }
+    }
+    return getShareUDSet;
+  }
+
+  /*
+   * The policy to pick the replica set for deleting the over-replicated
+   * replica which meet the rack and upgrade domain requirements.
+   * The algorithm:
+   * a. Each replica has a boolean attribute "shareRack" that defines
+   *    whether it shares its rack with another replica of the same block.
+   * b. Each replica has another boolean attribute "shareUD" that defines
+   *    whether it shares its upgrade domain with another replica of the same
+   *    block.
+   * c. Partition the replicas into 4 sets (some might be empty.):
+   *    shareRackAndUDSet: {shareRack==true, shareUD==true}
+   *    shareUDNotRackSet: {shareRack==false, shareUD==true}
+   *    shareRackNotUDSet: {shareRack==true, shareUD==false}
+   *    NoShareRackOrUDSet: {shareRack==false, shareUD==false}
+   * d. Pick the first not-empty replica set in the following order.
+   *    shareRackAndUDSet, shareUDNotRackSet, shareRackNotUDSet,
+   *    NoShareRackOrUDSet
+   * e. Proof this won't degrade the existing rack-based data
+   *    availability model under different scenarios.
+   *    1. shareRackAndUDSet isn't empty. Removing a node
+   *       from shareRackAndUDSet won't change # of racks and # of UD.
+   *       The followings cover empty shareUDNotRackSet scenarios.
+   *    2. shareUDNotRackSet isn't empty and shareRackNotUDSet isn't empty.
+   *       Let us proof that # of racks >= 3 before the deletion and thus
+   *       after deletion # of racks >= 2.
+   *         Given shareUDNotRackSet is empty, there won't be overlap between
+   *       shareUDNotRackSet and shareRackNotUDSet. It means DNs in
+   *       shareRackNotUDSet should be on at least a rack
+   *       different from any DN' rack in shareUDNotRackSet.
+   *         Given shareUDNotRackSet.size() >= 2 and each DN in the set
+   *       doesn't share rack with any other DNs, there are at least 2 racks
+   *       coming from shareUDNotRackSet.
+   *         Thus the # of racks from DNs in {shareUDNotRackSet,
+   *       shareRackNotUDSet} >= 3. Removing a node from shareUDNotRackSet
+   *       will reduce the # of racks by 1 and won't change # of upgrade
+   *       domains.
+   *         Note that this is different from BlockPlacementPolicyDefault which
+   *       will keep the # of racks after deletion. With upgrade domain policy,
+   *       given # of racks is still >= 2 after deletion, the data availability
+   *       model remains the same as BlockPlacementPolicyDefault (only supports
+   *       one rack failure).
+   *         For example, assume we have 4 replicas: d1(rack1, ud1),
+   *       d2(rack2, ud1), d3(rack3, ud3), d4(rack3, ud4). Thus we have
+   *       shareUDNotRackSet: {d1, d2} and shareRackNotUDSet: {d3, d4}.
+   *       With upgrade domain policy, the remaining replicas after deletion
+   *       are {d1(or d2), d3, d4} which has 2 racks.
+   *       With BlockPlacementPolicyDefault policy, the remaining replicas
+   *       after deletion are {d1, d2, d3(or d4)} which has 3 racks.
+   *    3. shareUDNotRackSet isn't empty and shareRackNotUDSet is empty. This
+   *       implies all replicas are on unique racks. Removing a node from
+   *       shareUDNotRackSet will reduce # of racks (no different from
+   *       BlockPlacementPolicyDefault) by 1 and won't change #
+   *       of upgrade domains.
+   *    4. shareUDNotRackSet is empty and shareRackNotUDSet isn't empty.
+   *       Removing a node from shareRackNotUDSet is no different from
+   *       BlockPlacementPolicyDefault.
+   *    5. shareUDNotRackSet is empty and shareRackNotUDSet is empty.
+   *       Removing a node from NoShareRackOrUDSet is no different from
+   *       BlockPlacementPolicyDefault.
+   * The implementation:
+   * 1. Generate set shareUDSet which includes all DatanodeStorageInfo that
+   *    share the same upgrade domain with another DatanodeStorageInfo,
+   *    e.g. {shareRackAndUDSet, shareUDNotRackSet}.
+   * 2. If shareUDSet is empty, it means shareRackAndUDSet is empty and
+   *    shareUDNotRackSet is empty. Use the default rack based policy.
+   * 3. If shareUDSet isn't empty, intersect it with moreThanOne(
+   *    {shareRackAndUDSet, shareRackNotUDSet})to generate shareRackAndUDSet.
+   * 4. If shareRackAndUDSet isn't empty, return
+   *    shareRackAndUDSet, otherwise return shareUDSet which is the same as
+   *    shareUDNotRackSet.
+   */
+  @Override
+  protected Collection<DatanodeStorageInfo> pickupReplicaSet(
+      Collection<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne) {
+    List<DatanodeStorageInfo> all = new ArrayList<>();
+    if (moreThanOne != null) {
+      all.addAll(moreThanOne);
+    }
+    if (exactlyOne != null) {
+      all.addAll(exactlyOne);
+    }
+
+    Map<String, List<DatanodeStorageInfo>> upgradeDomains =
+        getUpgradeDomainMap(all.toArray(new DatanodeStorageInfo[all.size()]));
+
+    // shareUDSet includes DatanodeStorageInfo that share same upgrade
+    // domain with another DatanodeStorageInfo.
+    List<DatanodeStorageInfo> shareUDSet = getShareUDSet(upgradeDomains);
+    // shareRackAndUDSet contains those DatanodeStorageInfo that
+    // share rack and upgrade domain with another DatanodeStorageInfo.
+    List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
+    if (shareUDSet.size() == 0) {
+      // All upgrade domains are unique, use the parent set.
+      return super.pickupReplicaSet(moreThanOne, exactlyOne);
+    } else if (moreThanOne != null) {
+      for (DatanodeStorageInfo storage : shareUDSet) {
+        if (moreThanOne.contains(storage)) {
+          shareRackAndUDSet.add(storage);
+        }
+      }
+    }
+    return (shareRackAndUDSet.size() > 0) ? shareRackAndUDSet : shareUDSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
new file mode 100644
index 0000000..e2e1486
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * An implementation of @see BlockPlacementStatus for
+ * @see BlockPlacementPolicyWithUpgradeDomain
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockPlacementStatusWithUpgradeDomain implements
+    BlockPlacementStatus {
+
+  private final BlockPlacementStatus parentBlockPlacementStatus;
+  private final Set<String> upgradeDomains;
+  private final int numberOfReplicas;
+  private final int upgradeDomainFactor;
+
+  /**
+   * @param parentBlockPlacementStatus the parent class' status
+   * @param upgradeDomains the set of upgrade domains of the replicas
+   * @param numberOfReplicas the number of replicas of the block
+   * @param upgradeDomainFactor the configured upgrade domain factor
+   */
+  public BlockPlacementStatusWithUpgradeDomain(
+      BlockPlacementStatus parentBlockPlacementStatus,
+      Set<String> upgradeDomains, int numberOfReplicas,
+      int upgradeDomainFactor){
+    this.parentBlockPlacementStatus = parentBlockPlacementStatus;
+    this.upgradeDomains = upgradeDomains;
+    this.numberOfReplicas = numberOfReplicas;
+    this.upgradeDomainFactor = upgradeDomainFactor;
+  }
+
+  @Override
+  public boolean isPlacementPolicySatisfied() {
+    return parentBlockPlacementStatus.isPlacementPolicySatisfied() &&
+        isUpgradeDomainPolicySatisfied();
+  }
+
+  private boolean isUpgradeDomainPolicySatisfied() {
+    if (numberOfReplicas <= upgradeDomainFactor) {
+      return (numberOfReplicas == upgradeDomains.size());
+    } else {
+      return upgradeDomains.size() >= upgradeDomainFactor;
+    }
+  }
+
+  @Override
+  public String getErrorDescription() {
+    if (isPlacementPolicySatisfied()) {
+      return null;
+    }
+    StringBuilder errorDescription = new StringBuilder();
+    if (!parentBlockPlacementStatus.isPlacementPolicySatisfied()) {
+      errorDescription.append(parentBlockPlacementStatus.getErrorDescription());
+    }
+    if (!isUpgradeDomainPolicySatisfied()) {
+      if (errorDescription.length() != 0) {
+        errorDescription.append(" ");
+      }
+      errorDescription.append("The block has " + numberOfReplicas +
+          " replicas. But it only has " + upgradeDomains.size() +
+              " upgrade domains " + upgradeDomains +".");
+    }
+    return errorDescription.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index cff3e88..cc4f146 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2491,4 +2491,18 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.upgrade.domain.factor</name>
+  <value>${dfs.replication}</value>
+  <description>
+    This is valid only when block placement policy is set to
+    BlockPlacementPolicyWithUpgradeDomain. It defines the number of
+    unique upgrade domains any block's replicas should have.
+    When the number of replicas is less or equal to this value, the policy
+    ensures each replica has an unique upgrade domain. When the number of
+    replicas is greater than this value, the policy ensures the number of
+    unique domains is at least this value.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java
index c541da3..6174447 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java
@@ -96,7 +96,6 @@ abstract public class BaseReplicationPolicyTest {
     // construct network topology
     for (int i=0; i < dataNodes.length; i++) {
       cluster.add(dataNodes[i]);
-      //bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]);
       bm.getDatanodeManager().getHeartbeatManager().addDatanode(
           dataNodes[i]);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index b8a7e77..fa9cc5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -83,7 +83,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
   @Parameterized.Parameters
   public static Iterable<Object[]> data() {
     return Arrays.asList(new Object[][] {
-        { BlockPlacementPolicyDefault.class.getName() } });
+        { BlockPlacementPolicyDefault.class.getName() },
+        { BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
   }
 
   private void updateHeartbeatForExtraStorage(long capacity,
@@ -231,10 +232,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
   public void testChooseTarget2() throws Exception { 
     Set<Node> excludedNodes;
     DatanodeStorageInfo[] targets;
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
-    
-    excludedNodes = new HashSet<Node>();
-    excludedNodes.add(dataNodes[1]); 
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
+
+    excludedNodes = new HashSet<>();
+    excludedNodes.add(dataNodes[1]);
     targets = chooseTarget(0, chosenNodes, excludedNodes);
     assertEquals(targets.length, 0);
     
@@ -422,9 +423,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
         "DS-xxxx", "7.7.7.7", "/d2/r3", "host7");
     DatanodeDescriptor newDn = storage.getDatanodeDescriptor();
     Set<Node> excludedNodes;
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
 
-    excludedNodes = new HashSet<Node>();
+    excludedNodes = new HashSet<>();
     excludedNodes.add(dataNodes[0]);
     excludedNodes.add(dataNodes[1]);
     excludedNodes.add(dataNodes[2]);
@@ -554,9 +555,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     assertEquals(targets.length, 1);
     assertEquals(storages[1], targets[0]);
 
-    Set<Node> excludedNodes = new HashSet<Node>();
+    Set<Node> excludedNodes = new HashSet<>();
     excludedNodes.add(dataNodes[1]);
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     targets = chooseTarget(1, chosenNodes, excludedNodes);
     assertEquals(targets.length, 1);
     assertFalse(isOnSameRack(targets[0], dataNodes[0]));
@@ -726,8 +727,8 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
    */
   @Test
   public void testRereplicate1() throws Exception {
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
-    chosenNodes.add(storages[0]);    
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
+    chosenNodes.add(storages[0]);
     DatanodeStorageInfo[] targets;
     
     targets = chooseTarget(0, chosenNodes);
@@ -757,7 +758,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
    */
   @Test
   public void testRereplicate2() throws Exception {
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     chosenNodes.add(storages[0]);
     chosenNodes.add(storages[1]);
 
@@ -784,7 +785,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
    */
   @Test
   public void testRereplicate3() throws Exception {
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     chosenNodes.add(storages[0]);
     chosenNodes.add(storages[2]);
     
@@ -950,7 +951,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
    */
   @Test
   public void testChooseReplicaToDelete() throws Exception {
-    List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> replicaList = new ArrayList<>();
     final Map<String, List<DatanodeStorageInfo>> rackMap
         = new HashMap<String, List<DatanodeStorageInfo>>();
     
@@ -971,14 +972,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
       DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
     }
     
-    List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
-    List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> first = new ArrayList<>();
+    List<DatanodeStorageInfo> second = new ArrayList<>();
     replicator.splitNodesWithRack(replicaList, rackMap, first, second);
     // storages[0] and storages[1] are in first set as their rack has two 
     // replica nodes, while storages[2] and dataNodes[5] are in second set.
     assertEquals(2, first.size());
     assertEquals(2, second.size());
-    List<StorageType> excessTypes = new ArrayList<StorageType>();
+    List<StorageType> excessTypes = new ArrayList<>();
     {
       // test returning null
       excessTypes.add(StorageType.SSD);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
index 74db283..a09cec2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
@@ -45,7 +45,8 @@ public class TestReplicationPolicyConsiderLoad
   @Parameterized.Parameters
   public static Iterable<Object[]> data() {
     return Arrays.asList(new Object[][] {
-        { BlockPlacementPolicyDefault.class.getName() } });
+        { BlockPlacementPolicyDefault.class.getName() },
+        { BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
   }
 
   @Override
@@ -111,7 +112,7 @@ public class TestReplicationPolicyConsiderLoad
               1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
 
       assertEquals(3, targets.length);
-      Set<DatanodeStorageInfo> targetSet = new HashSet<DatanodeStorageInfo>(
+      Set<DatanodeStorageInfo> targetSet = new HashSet<>(
           Arrays.asList(targets));
       for (int i = 3; i < storages.length; i++) {
         assertTrue(targetSet.contains(storages[i]));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
index 85598ca..86f10a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
@@ -137,7 +137,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
       DatanodeStorageInfo[] targets) {
     if(targets.length == 0)
       return true;
-    Set<String> targetSet = new HashSet<String>();
+    Set<String> targetSet = new HashSet<>();
     for(DatanodeStorageInfo storage:targets) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
@@ -217,7 +217,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
   }
 
   private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) {
-    Set<String> nodeGroupSet = new HashSet<String>();
+    Set<String> nodeGroupSet = new HashSet<>();
     for (DatanodeStorageInfo target: targets) {
       nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation());
     }
@@ -236,10 +236,10 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
   public void testChooseTarget2() throws Exception {
     DatanodeStorageInfo[] targets;
     BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
 
-    Set<Node> excludedNodes = new HashSet<Node>();
-    excludedNodes.add(dataNodes[1]); 
+    Set<Node> excludedNodes = new HashSet<>();
+    excludedNodes.add(dataNodes[1]);
     targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, 
         excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
     assertEquals(targets.length, 4);
@@ -415,7 +415,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
   @Test
   public void testRereplicate1() throws Exception {
     updateHeartbeatWithUsage();
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     chosenNodes.add(storages[0]);
     DatanodeStorageInfo[] targets;
     
@@ -448,7 +448,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
   @Test
   public void testRereplicate2() throws Exception {
     updateHeartbeatWithUsage();
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     chosenNodes.add(storages[0]);
     chosenNodes.add(storages[1]);
 
@@ -476,7 +476,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
   @Test
   public void testRereplicate3() throws Exception {
     updateHeartbeatWithUsage();
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     chosenNodes.add(storages[0]);
     chosenNodes.add(storages[3]);
 
@@ -511,9 +511,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
    */
   @Test
   public void testChooseReplicaToDelete() throws Exception {
-    List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
-    final Map<String, List<DatanodeStorageInfo>> rackMap
-        = new HashMap<String, List<DatanodeStorageInfo>>();
+    List<DatanodeStorageInfo> replicaList = new ArrayList<>();
+    final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
     dataNodes[0].setRemaining(4*1024*1024);
     replicaList.add(storages[0]);
 
@@ -526,13 +525,13 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
     dataNodes[5].setRemaining(1*1024*1024);
     replicaList.add(storages[5]);
 
-    List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
-    List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> first = new ArrayList<>();
+    List<DatanodeStorageInfo> second = new ArrayList<>();
     replicator.splitNodesWithRack(
         replicaList, rackMap, first, second);
     assertEquals(3, first.size());
     assertEquals(1, second.size());
-    List<StorageType> excessTypes = new ArrayList<StorageType>();
+    List<StorageType> excessTypes = new ArrayList<>();
     excessTypes.add(StorageType.DEFAULT);
     DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
         null, null, (short)3, first, second, excessTypes);
@@ -614,7 +613,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     chosenNodes.add(storagesInBoundaryCase[0]);
     chosenNodes.add(storagesInBoundaryCase[5]);
     DatanodeStorageInfo[] targets;
@@ -703,11 +702,10 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
-    
-    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
-    
+
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
     DatanodeStorageInfo[] targets;
-    Set<Node> excludedNodes = new HashSet<Node>();
+    Set<Node> excludedNodes = new HashSet<>();
     excludedNodes.add(dataNodesForDependencies[5]);
     
     //try to select three targets as there are three node groups

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0f5f9846/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
new file mode 100644
index 0000000..feb2b79
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.net.Node;
+import org.junit.Test;
+
+
+public class TestReplicationPolicyWithUpgradeDomain
+    extends BaseReplicationPolicyTest {
+  public TestReplicationPolicyWithUpgradeDomain() {
+    this.blockPlacementPolicy =
+        BlockPlacementPolicyWithUpgradeDomain.class.getName();
+  }
+
+  @Override
+  DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
+    final String[] racks = {
+        "/d1/r1",
+        "/d1/r1",
+        "/d1/r1",
+        "/d1/r2",
+        "/d1/r2",
+        "/d1/r2",
+        "/d1/r3",
+        "/d1/r3",
+        "/d1/r3"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    DatanodeDescriptor dataNodes[] =
+        DFSTestUtil.toDatanodeDescriptor(storages);
+    for (int i=0; i < dataNodes.length; i++) {
+      // each rack has 3 DNs with upgrade domain id 1,2,3 respectively.
+      String upgradeDomain = Integer.toString((i%3)+1);
+      dataNodes[i].setUpgradeDomain(upgradeDomain);
+    }
+    return dataNodes;
+  }
+
+
+  /**
+   * Verify the targets are chosen to honor both
+   * rack and upgrade domain policies when number of replica is
+   * 0, 1, 2, 3, 4 respectively.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTarget1() throws Exception {
+    updateHeartbeatWithUsage(dataNodes[0],
+        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        0L, 0L, 4, 0);
+
+    DatanodeStorageInfo[] targets;
+    targets = chooseTarget(0);
+    assertEquals(targets.length, 0);
+
+    targets = chooseTarget(1);
+    assertEquals(targets.length, 1);
+    assertEquals(storages[0], targets[0]);
+
+    targets = chooseTarget(2);
+    assertEquals(targets.length, 2);
+    assertEquals(storages[0], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
+    assertEquals(getUpgradeDomains(targets).size(), 2);
+
+    targets = chooseTarget(3);
+    assertEquals(targets.length, 3);
+    assertEquals(storages[0], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[1], targets[2]));
+    assertEquals(getUpgradeDomains(targets).size(), 3);
+
+    targets = chooseTarget(4);
+    assertEquals(targets.length, 4);
+    assertEquals(storages[0], targets[0]);
+    assertTrue(isOnSameRack(targets[1], targets[2]) ||
+        isOnSameRack(targets[2], targets[3]));
+    assertFalse(isOnSameRack(targets[0], targets[2]));
+    assertEquals(getUpgradeDomains(targets).size(), 3);
+
+    updateHeartbeatWithUsage(dataNodes[0],
+        2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
+  }
+
+  /**
+   * Verify the rack and upgrade domain policies when excludeNodes are
+   * specified.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTargetWithExcludeNodes() throws Exception {
+    Set<Node> excludedNodes = new HashSet<>();
+    DatanodeStorageInfo[] targets;
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.add(dataNodes[4]);
+    targets = chooseTarget(3, chosenNodes, excludedNodes);
+    assertEquals(targets.length, 3);
+    assertEquals(storages[0], targets[0]);
+    assertEquals(getRacks(targets).size(), 2);
+    assertEquals(getUpgradeDomains(targets).size(), 3);
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.add(dataNodes[4]);
+    excludedNodes.add(dataNodes[8]);
+    targets = chooseTarget(3, chosenNodes, excludedNodes);
+    assertEquals(targets.length, 3);
+    assertEquals(storages[0], targets[0]);
+    assertEquals(getRacks(targets).size(), 2);
+    assertEquals(getUpgradeDomains(targets).size(), 3);
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.add(dataNodes[4]);
+    excludedNodes.add(dataNodes[5]);
+    excludedNodes.add(dataNodes[8]);
+    targets = chooseTarget(3, chosenNodes, excludedNodes);
+    assertEquals(targets.length, 3);
+    assertEquals(storages[0], targets[0]);
+    assertEquals(storages[2], targets[1]);
+    assertEquals(storages[7], targets[2]);
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.add(dataNodes[4]);
+    targets = chooseTarget(4, chosenNodes, excludedNodes);
+    assertEquals(targets.length, 4);
+    assertEquals(storages[0], targets[0]);
+    assertTrue(getRacks(targets).size()>=2);
+    assertEquals(getUpgradeDomains(targets).size(), 3);
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.add(dataNodes[4]);
+    excludedNodes.add(dataNodes[8]);
+    targets = chooseTarget(4, chosenNodes, excludedNodes);
+    assertEquals(targets.length, 4);
+    assertEquals(storages[0], targets[0]);
+    assertTrue(getRacks(targets).size()>=2);
+    assertEquals(getUpgradeDomains(targets).size(), 3);
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.add(dataNodes[1]);
+    chosenNodes.add(storages[2]);
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes,
+        true, excludedNodes, BLOCK_SIZE,
+        TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
+    System.out.println("targets=" + Arrays.asList(targets));
+    assertEquals(2, targets.length);
+  }
+
+  /**
+   * Verify the correct replica is chosen to satisfy both rack and upgrade
+   * domain policy.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseReplicaToDelete() throws Exception {
+    BlockPlacementPolicyWithUpgradeDomain upgradeDomainPolicy =
+        (BlockPlacementPolicyWithUpgradeDomain)replicator;
+    List<DatanodeStorageInfo> first = new ArrayList<>();
+    List<DatanodeStorageInfo> second = new ArrayList<>();
+    List<StorageType> excessTypes = new ArrayList<>();
+    excessTypes.add(StorageType.DEFAULT);
+    first.add(storages[0]);
+    first.add(storages[1]);
+    second.add(storages[4]);
+    second.add(storages[8]);
+    DatanodeStorageInfo chosenStorage =
+        upgradeDomainPolicy.chooseReplicaToDelete(
+            null, null, (short)3, first, second, excessTypes);
+    assertEquals(chosenStorage, storages[1]);
+    first.clear();
+    second.clear();
+
+    excessTypes.add(StorageType.DEFAULT);
+    first.add(storages[0]);
+    first.add(storages[1]);
+    first.add(storages[4]);
+    first.add(storages[5]);
+    chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
+        null, null, (short)3, first, second, excessTypes);
+    assertTrue(chosenStorage.equals(storages[1]) ||
+        chosenStorage.equals(storages[4]));
+  }
+
+  /**
+   * Test the scenario where not enough replicas can't satisfy the policy.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTargetWithoutEnoughReplica() throws Exception {
+    Set<Node> excludedNodes = new HashSet<>();
+    DatanodeStorageInfo[] targets;
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.add(dataNodes[4]);
+    excludedNodes.add(dataNodes[5]);
+    excludedNodes.add(dataNodes[7]);
+    excludedNodes.add(dataNodes[8]);
+    targets = chooseTarget(3, chosenNodes, excludedNodes);
+    assertEquals(targets.length, 2);
+    assertEquals(storages[0], targets[0]);
+    assertTrue(targets[1].equals(storages[1]) ||
+        targets[1].equals(storages[2]));
+  }
+
+  /**
+   * Test the scenario where not enough replicas can't satisfy the policy.
+   * @throws Exception
+   */
+  @Test
+  public void testVerifyBlockPlacement() throws Exception {
+    LocatedBlock locatedBlock;
+    BlockPlacementStatus status;
+    ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
+    List<DatanodeStorageInfo> set = new ArrayList<>();
+
+    // 2 upgrade domains (not enough), 2 racks (enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[4]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+
+    // 3 upgrade domains (enough), 2 racks (enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[5]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
+    assertTrue(status.isPlacementPolicySatisfied());
+
+    // 3 upgrade domains (enough), 1 rack (not enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[2]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+    assertFalse(status.getErrorDescription().contains("upgrade domain"));
+
+    // 2 upgrade domains( not enough), 3 racks (enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[5]);
+    set.add(storages[8]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+    assertTrue(status.getErrorDescription().contains("upgrade domain"));
+
+    // 3 upgrade domains (enough), 3 racks (enough)
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[4]);
+    set.add(storages[8]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
+    assertTrue(status.isPlacementPolicySatisfied());
+
+
+    // 3 upgrade domains (enough), 3 racks (enough), 4 replicas
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[1]);
+    set.add(storages[5]);
+    set.add(storages[8]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
+    assertTrue(status.isPlacementPolicySatisfied());
+
+    // 2 upgrade domains (not enough), 3 racks (enough), 4 replicas
+    set.clear();
+    set.add(storages[0]);
+    set.add(storages[3]);
+    set.add(storages[5]);
+    set.add(storages[8]);
+    locatedBlock = BlockManager.newLocatedBlock(b,
+        set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
+    status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
+    assertFalse(status.isPlacementPolicySatisfied());
+  }
+
+  private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) {
+    HashSet<String> upgradeDomains = new HashSet<>();
+    for (DatanodeStorageInfo node : nodes) {
+      upgradeDomains.add(node.getDatanodeDescriptor().getUpgradeDomain());
+    }
+    return upgradeDomains;
+  }
+
+  private Set<String> getRacks(DatanodeStorageInfo[] nodes) {
+    HashSet<String> racks = new HashSet<>();
+    for (DatanodeStorageInfo node : nodes) {
+      String rack = node.getDatanodeDescriptor().getNetworkLocation();
+      racks.add(rack);
+    }
+    return racks;
+  }
+}


Mime
View raw message