hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1388305 - in /hadoop/common/branches/branch-1-win: ./ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/server/namenode/
Date Fri, 21 Sep 2012 02:49:37 GMT
Author: szetszwo
Date: Fri Sep 21 02:49:36 2012
New Revision: 1388305

URL: http://svn.apache.org/viewvc?rev=1388305&view=rev
Log:
HDFS-3566. Add AzureBlockPlacementPolicy to handle fault and upgrade domains in Azure.  Contributed by Sumadhur Reddy Bolli

Added:
    hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/AzureBlockPlacementPolicy.java
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestAzureBlockPlacementPolicy.java
Modified:
    hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
    hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java

Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1388305&r1=1388304&r2=1388305&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Fri Sep 21 02:49:36 2012
@@ -93,6 +93,9 @@ Branch-hadoop-1-win - unreleased
     BlockPlacementPolicyDefault to make it more extendible.  (Sumadhur Reddy
     Bolli via szetszwo)
 
+    HDFS-3566. Add AzureBlockPlacementPolicy to handle fault and upgrade
+    domains in Azure.  (Sumadhur Reddy Bolli via szetszwo)
+
 OPTIMAZATIONS
 
 BUG FIXES

Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1388305&r1=1388304&r2=1388305&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java Fri Sep 21 02:49:36 2012
@@ -19,7 +19,9 @@ package org.apache.hadoop.net;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -422,6 +424,37 @@ public class NetworkTopology {
     }
   }
     
+  /**
+   * Returns all the leaf nodes(data nodes) by traversing the topology
+   * 
+   * @return
+   */
+  public List<Node> getLeaves() {
+    netlock.readLock().lock();
+    try {
+      
+      ArrayList<Node> leaves = new ArrayList<Node>();
+      Queue<Node> nodesQueue = new LinkedList<Node>();
+      nodesQueue.add(clusterMap);
+
+      // Doing a breadth-first search to get all the leaf nodes
+      // that represent the data nodes in the system
+      while (!nodesQueue.isEmpty()) {
+        Node node = nodesQueue.poll();
+        if (node instanceof InnerNode) {
+          Collection<Node> children = ((InnerNode) node).getChildren();
+          if (children != null)
+            nodesQueue.addAll(children);
+        } else {
+          leaves.add(node);
+        }
+      }
+      return leaves;
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+  
   /** Return the distance between two nodes
    * It is assumed that the distance from one node to its parent is 1
    * The distance between two nodes is calculated by summing up their distances

Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/AzureBlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/AzureBlockPlacementPolicy.java?rev=1388305&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/AzureBlockPlacementPolicy.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/AzureBlockPlacementPolicy.java Fri Sep 21 02:49:36 2012
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+/**
+ * The class is responsible for choosing the desired number of targets for
+ * placing block replicas in azure. The replica placement strategy is that if
+ * the writer is on a datanode, the 1st replica is placed on the local machine,
+ * otherwise a random datanode. The 2nd replica is placed on a datanode that is
+ * in a different fault and upgrade domain than the first node. The 3rd replica
+ * is placed on a datanode which does not belong to the upgrade domains of the
+ * first two replicas.
+ */
+public class AzureBlockPlacementPolicy extends BlockPlacementPolicyDefault {
+
+  private static final Random random = new Random();
+
+  private static final int DEFAULT_MIN_FAULT_DOMAINS = 2;
+
+  private static final int DEFAULT_MIN_UPGRADE_DOMAINS = 3;
+
+  private static final int DEFAULT_MIN_RACKS = Math.max(
+      DEFAULT_MIN_FAULT_DOMAINS, DEFAULT_MIN_UPGRADE_DOMAINS);
+
+  private static final Log LOG = FSNamesystem.LOG;
+
+  /**
+   * Chooses a rack as per the placement policy for multiple replicas. Places
+   * only one replica at a time. The placement of the next replica depends on
+   * the placement of the previous replicas.
+   * 
+   * @param excludedNodes
+   *          nodes that should be selected
+   * @param blocksize
+   *          size of block
+   * @param maxReplicasPerRack
+   * @param results
+   *          selected nodes so far
+   * @throws NotEnoughReplicasException
+   */
+  private void chooseRack(HashMap<Node, Node> excludedNodes, long blocksize,
+      int maxReplicasPerRack, List<DatanodeDescriptor> results)
+      throws NotEnoughReplicasException {
+
+    // if results.size is 0, this function chooses a rack randomly.
+    // It makes more sense to call chooseLocalNode when there are no results at
+    // all.
+    assert (results != null);
+
+    ArrayList<DatanodeDescriptor> selectedNodes = selectNodes(excludedNodes,
+        results);
+    int numOfReplicas = 1;
+
+    if (selectedNodes.size() >= numOfReplicas) {
+      for (DatanodeDescriptor result : selectedNodes) {
+        if (isGoodTarget(result, blocksize, maxReplicasPerRack, results)) {
+          numOfReplicas--;
+          results.add(result);
+          break;
+        }
+      }
+    }
+
+    if (numOfReplicas > 0) {
+      throw new NotEnoughReplicasException("Not able to place enough replicas");
+    }
+  }
+
+  /**
+   * Selects the potential list of dataNodes for a block as per the placement
+   * policy. Nodes in excludeNodes list would not be selected. If fault and
+   * upgrade domains sets are empty(can happen if rack mapping is incorrect) all
+   * nodes except those in the excludedNodes would be selected.
+   * 
+   * @param excludedNodes
+   *          nodes that should not be selected
+   * @param results
+   *          nodes that are already selected
+   * @return potential list of dataNodes which would be further filtered by the
+   *         caller.
+   */
+  private ArrayList<DatanodeDescriptor> selectNodes(
+      HashMap<Node, Node> excludedNodes, List<DatanodeDescriptor> results) {
+    assert (results != null);
+
+    Set<String> upgradeDomains = new TreeSet<String>();
+
+    Set<String> faultDomains = new TreeSet<String>();
+
+    populateFaultAndUpgradeDomains(results, faultDomains,
+        upgradeDomains);
+
+    // Note that if upgradeDomains and faultDomains are empty
+    // all nodes except those in excludedNodes would be selected by the code
+    // below
+
+    // ensure that the blocks are split across
+    // at least DEFAULT_MIN_FAULT_DOMAINS fault domains and
+    // DEFAULT_MIN_UPGRADE_DOMAINS upgrade domains
+    boolean selectBasedOnFaultDomain = (faultDomains.size() < DEFAULT_MIN_FAULT_DOMAINS);
+
+    boolean selectBasedOnUpgradeDomain = (upgradeDomains.size() < DEFAULT_MIN_UPGRADE_DOMAINS);
+
+    List<Node> leaves = clusterMap.getLeaves();
+
+    ArrayList<DatanodeDescriptor> selectedNodes = new ArrayList<DatanodeDescriptor>();
+
+    for (Node node : leaves) {
+      assert (node instanceof DatanodeDescriptor);
+
+      DatanodeDescriptor dataNodeDescriptor = (DatanodeDescriptor) node;
+
+      if (excludedNodes.get(dataNodeDescriptor) == null) {
+
+        boolean isDataNodeSelected = true;
+
+        TreeSet<String> currentFaultDomain = new TreeSet<String>();
+        TreeSet<String> currentUpgradeDomain = new TreeSet<String>();
+
+        if (selectBasedOnFaultDomain || selectBasedOnUpgradeDomain) {
+          populateFaultAndUpgradeDomains(Arrays.asList(dataNodeDescriptor),
+              currentFaultDomain, currentUpgradeDomain);
+        }
+
+        if (selectBasedOnFaultDomain) {
+          // Do not select a data node in an existing fault domain
+          for (String faultDomain : faultDomains) {
+            // if currentFaultDomain is empty, then an incorrect mapping is
+            // being used. just select
+            // all nodes in that case.
+            if ((!currentFaultDomain.isEmpty())
+                && currentFaultDomain.first().equals(faultDomain)) {
+              isDataNodeSelected = false;
+              break;
+            }
+          }
+        }
+
+        if (isDataNodeSelected && selectBasedOnUpgradeDomain) {
+          // Do not select a data node in an existing upgrade domain
+          for (String upgradeDomain : upgradeDomains) {
+            // if currentUpgradeDomain is empty, then an incorrect mapping is
+            // being used. just select
+            // all nodes in that case.
+            if ((!currentUpgradeDomain.isEmpty())
+                && currentUpgradeDomain.first().equals(upgradeDomain)) {
+              isDataNodeSelected = false;
+              break;
+            }
+          }
+        }
+
+        if (isDataNodeSelected) {
+          selectedNodes.add(dataNodeDescriptor);
+        }
+      }
+    }
+
+    // We choose all possible nodes and then randomize them so that nodes
+    // are picked randomly.
+    Collections.shuffle(selectedNodes);
+
+    return selectedNodes;
+  }
+
+  /**
+   * Verify that the block is replicated on at least two fault domains and three
+   * upgrade domains
+   * 
+   * @param srcPath
+   *          the full pathname of the file to be verified
+   * @param lBlk
+   *          block with locations
+   * @param replication
+   *          target file replication
+   * @return the difference between the required and the actual number of racks
+   *         the block is replicated to.
+   */
+  @Override
+  public int verifyBlockPlacement(String srcPath, LocatedBlock lBlk,
+      short replication) {
+
+    int minRacks = Math.min(DEFAULT_MIN_RACKS, replication);
+
+    DatanodeInfo[] locs = lBlk.getLocations();
+    if (locs == null)
+      locs = new DatanodeInfo[0];
+    int numRacks = clusterMap.getNumOfRacks();
+    if (numRacks < DEFAULT_MIN_RACKS)
+      return 0;
+    minRacks = Math.min(minRacks, numRacks);
+
+    // ensure that the blocks are split across
+    // at least 2 fault domains and three upgrade domains
+    Set<String> upgradeDomains = new TreeSet<String>();
+    Set<String> faultDomains = new TreeSet<String>();
+    populateFaultAndUpgradeDomains(Arrays.asList(locs), faultDomains, upgradeDomains);
+
+    int requiredRacks = 0;
+    if (faultDomains.size() < DEFAULT_MIN_FAULT_DOMAINS
+        || upgradeDomains.size() < DEFAULT_MIN_UPGRADE_DOMAINS) {
+      requiredRacks = Math.max(
+          (DEFAULT_MIN_FAULT_DOMAINS - faultDomains.size()),
+          (DEFAULT_MIN_UPGRADE_DOMAINS - upgradeDomains.size()));
+    }
+
+    if (requiredRacks < 0)
+      requiredRacks = 0;
+
+    return requiredRacks;
+  }
+
+  /**
+   * Populate the faultDomains and upgradeDomains sets for a given set of data
+   * nodes
+   * 
+   * @param list
+   *          list of data nodes
+   * @param faultDomains
+   *          set of faultDomains for given data nodes. This will be populated
+   *          by the method.
+   * @param upgradeDomains
+   *          set of upgradeDomains for given data nodes. This will be populated
+   *          by the method.
+   */
+  private static void populateFaultAndUpgradeDomains(List<? extends DatanodeInfo> list,
+      Set<String> faultDomains, Set<String> upgradeDomains) {
+    assert (list != null);
+
+    assert ((faultDomains != null) && (faultDomains.size() == 0));
+
+    assert ((upgradeDomains != null) && (upgradeDomains.size() == 0));
+
+    for (DatanodeInfo dn : list) {
+      assert (dn != null);
+
+      // The assumption here is that the network location is of the format
+      // /fd1/ud1
+      // get the node corresponding to the network location
+      Node parent = dn.getParent();
+
+      // get the node corresponding to the parent of the network location
+      Node grandParent = null;
+
+      if (parent != null) {
+        grandParent = parent.getParent();
+      }
+
+      if (parent != null && grandParent != null
+          && (!grandParent.getName().equals(NodeBase.ROOT))) {
+        upgradeDomains.add(parent.getName());
+        faultDomains.add(grandParent.getName());
+      } else {
+        LOG.warn("This placement policy is not meant for the current network location mapping: "
+            + dn.getNetworkLocation());
+      }
+    }
+  }
+
+  /**
+   * Determines whether datanodeToDelete can be deleted from the collection of
+   * dataNodes that contain the replicas as per the placement policy.
+   */
+  protected boolean canDelete(DatanodeDescriptor datanodeToDelete,
+      List<DatanodeDescriptor> dataNodes, short replication) {
+    assert (datanodeToDelete != null);
+
+    assert (dataNodes != null);
+
+    // populate the fault domains and upgrade domains sets for the dataNodes
+    // list
+    Set<String> faultDomains = new TreeSet<String>();
+
+    Set<String> upgradeDomains = new TreeSet<String>();
+
+    populateFaultAndUpgradeDomains(dataNodes, faultDomains,
+        upgradeDomains);
+
+    // populate the fault domains and upgrade domains sets for list (dataNodes -
+    // datanodeToDelete)
+    List<DatanodeDescriptor> newDataNodes = new ArrayList<DatanodeDescriptor>();
+    for (DatanodeDescriptor d : dataNodes) {
+      if (d != datanodeToDelete)
+        newDataNodes.add(d);
+    }
+
+    Set<String> newUpgradeDomains = new TreeSet<String>();
+
+    Set<String> newFaultDomains = new TreeSet<String>();
+
+    populateFaultAndUpgradeDomains(newDataNodes,
+        newFaultDomains, newUpgradeDomains);
+
+    int minFaultDomains = Math.min(DEFAULT_MIN_FAULT_DOMAINS, replication);
+    int minUpgradeDomains = Math.min(DEFAULT_MIN_UPGRADE_DOMAINS, replication);
+
+    if (
+    // if the fault and upgrade domains count does not change
+    // or is not less than the minimum requirement for fault and upgrade
+    // domains by deleting the node, it can be deleted.
+    ((newFaultDomains.size() >= Math.min(minFaultDomains, faultDomains.size())) && (newUpgradeDomains
+        .size() >= Math.min(minUpgradeDomains, upgradeDomains.size())))) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Chooses numOfReplicas for block placement
+   * 
+   * @param numOfReplicas
+   * @param writer
+   * @param excludedNodes
+   *          Nodes that should not be used for selection
+   * @param blocksize
+   * @param maxNodesPerRack
+   * @param results
+   *          all nodes that already contain the block being replicated
+   * @return
+   */
+  @Override
+  protected DatanodeDescriptor chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
+      long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results) {
+
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves() == 0) {
+      return writer;
+    }
+
+    int numOfResults = results.size();
+    boolean newBlock = (numOfResults == 0);
+    if (writer == null && !newBlock) {
+      writer = (DatanodeDescriptor) results.get(0);
+    }
+
+    try {
+      switch (numOfResults) {
+      case 0:
+        writer = chooseLocalNode(writer, excludedNodes, blocksize,
+            maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          break;
+        }
+      case 1:
+        chooseRack(excludedNodes, blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          break;
+        }
+      case 2:
+        chooseRack(excludedNodes, blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          break;
+        }
+      default:
+        chooseRack(excludedNodes, blocksize, maxNodesPerRack, results);
+
+      }
+    } catch (NotEnoughReplicasException e) {
+      LOG.warn("Not able to place enough replicas, still in need of "
+          + numOfReplicas);
+    }
+    return writer;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+      Block block, short replicationFactor,
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+    long minSpace = Long.MAX_VALUE;
+    DatanodeDescriptor cur = null;
+
+    assert first != null;
+    assert second != null;
+
+    List<DatanodeDescriptor> allReplicas = new ArrayList<DatanodeDescriptor>();
+    allReplicas.addAll(first);
+    allReplicas.addAll(second);
+
+    // pick replica from the first Set. If first is empty, then pick replicas
+    // from second set.
+    Iterator<DatanodeDescriptor> iter = first.isEmpty() ? second.iterator()
+        : first.iterator();
+
+    // pick node with least free space
+    while (iter.hasNext()) {
+      DatanodeDescriptor node = iter.next();
+      long free = node.getRemaining();
+      // if a set
+      if ((first.isEmpty() ? canDelete(node, allReplicas, replicationFactor)
+          : true) && minSpace > free) {
+        minSpace = free;
+        cur = node;
+      }
+    }
+    return cur;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean canMove(Block block, DatanodeInfo source, DatanodeInfo target,
+      List<DatanodeInfo> dataNodes) {
+
+    // populate the fault domains and upgrade domains sets for the dataNodes
+    Set<String> faultDomains = new TreeSet<String>();
+    Set<String> upgradeDomains = new TreeSet<String>();
+
+    populateFaultAndUpgradeDomains(dataNodes, faultDomains,
+        upgradeDomains);
+
+    // populate the fault domains and upgrade domains sets for list (dataNodes -
+    // source+target)
+    List<DatanodeInfo> newDataNodes = new ArrayList<DatanodeInfo>();
+    for (DatanodeInfo d : dataNodes) {
+      if (d != source)
+        newDataNodes.add(d);
+    }
+    newDataNodes.add(target);
+
+    Set<String> newUpgradeDomains = new TreeSet<String>();
+    Set<String> newFaultDomains = new TreeSet<String>();
+
+    populateFaultAndUpgradeDomains(newDataNodes,
+        newFaultDomains, newUpgradeDomains);
+
+    if (
+    // if the fault and upgrade domains count does not change
+    // or is not less than the minimum requirement(or old counts)
+    // for fault and upgrade
+    // domains by moving to a new node, then the move is valid.
+    ((newFaultDomains.size() >= Math.min(DEFAULT_MIN_FAULT_DOMAINS,
+        faultDomains.size())) && (newUpgradeDomains.size() >= Math.min(
+        DEFAULT_MIN_UPGRADE_DOMAINS, upgradeDomains.size())))) {
+      return true;
+    }
+
+    return false;
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestAzureBlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestAzureBlockPlacementPolicy.java?rev=1388305&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestAzureBlockPlacementPolicy.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestAzureBlockPlacementPolicy.java Fri Sep 21 02:49:36 2012
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+
+public class TestAzureBlockPlacementPolicy extends TestCase {
+  private static final int BLOCK_SIZE = 1024;
+  private static final int NUM_OF_DATANODES = 7;
+  private static final Configuration CONF = new Configuration();
+  private static final NetworkTopology cluster;
+  private static final NameNode namenode;
+  private static final BlockPlacementPolicy replicator;
+  private static final String filename = "/dummyfile.txt";
+  private static final Block dummyBlock = new Block();
+  private static final DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+      new DatanodeDescriptor(new DatanodeID("h1:5020"), "/fd0/ud0"),
+      new DatanodeDescriptor(new DatanodeID("h2:5020"), "/fd0/ud1"),
+      new DatanodeDescriptor(new DatanodeID("h3:5020"), "/fd0/ud2"),
+      new DatanodeDescriptor(new DatanodeID("h4:5020"), "/fd1/ud0"),
+      new DatanodeDescriptor(new DatanodeID("h5:5020"), "/fd1/ud1"),
+      new DatanodeDescriptor(new DatanodeID("h6:5020"), "/fd1/ud2"),
+      new DatanodeDescriptor(new DatanodeID("h7:5020"), "/fd0/ud0") };
+
+  // using meaningful names for the above so that they are more readable
+  private static final DatanodeDescriptor NODE1_FD0_UD0 = dataNodes[0];
+  private static final DatanodeDescriptor NODE_FD0_UD1 = dataNodes[1];
+  private static final DatanodeDescriptor NODE_FD0_UD2 = dataNodes[2];
+  private static final DatanodeDescriptor NODE_FD1_UD0 = dataNodes[3];
+  private static final DatanodeDescriptor NODE_FD1_UD1 = dataNodes[4];
+  private static final DatanodeDescriptor NODE_FD1_UD2 = dataNodes[5];
+  private static final DatanodeDescriptor NODE2_FD0_UD0 = dataNodes[6];
+
+  private final static DatanodeDescriptor NODE_FD1_UD3 = new DatanodeDescriptor(
+      new DatanodeID("h8:5020"), "/fd1/ud3");
+
+  static {
+    try {
+      FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
+      CONF.set("dfs.http.address", "0.0.0.0:0");
+      CONF.set("dfs.block.replicator.classname",
+          "org.apache.hadoop.hdfs.server.namenode.AzureBlockPlacementPolicy");
+      NameNode.format(CONF);
+      namenode = new NameNode(CONF);
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw (RuntimeException) new RuntimeException().initCause(e);
+    }
+    FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+    replicator = fsNamesystem.replicator;
+    cluster = fsNamesystem.clusterMap;
+    // construct network topology
+    for (int i = 0; i < NUM_OF_DATANODES; i++) {
+      cluster.add(dataNodes[i]);
+    }
+    for (int i = 0; i < NUM_OF_DATANODES; i++) {
+      dataNodes[i].updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+          * BLOCK_SIZE, 0L, 2 * FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
+          0);
+    }
+  }
+
+  /**
+   * In this test case, client is NODE1_FD0_UD0. So the 1st replica should be
+   * placed on NODE1_FD0_UD0, the 2nd replica should be placed in a different
+   * upgrade and fault domain and third should be placed in an upgrade domain
+   * different from the first two.
+   */
+  public void testChooseTarget1() throws Exception {
+    NODE1_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 4); // overloaded
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+
+    targets = replicator.chooseTarget(filename, 2, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+    assertTrue(AreFaultAndUpgradeDomainsDifferent(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 3) == 0);
+
+    targets = replicator.chooseTarget(filename, 4, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 4) == 0);
+
+    NODE1_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0);
+  }
+
+  /**
+   * In this test case, client is NODE1_FD0_UD0, but the NODE_FD1_UD0 is not
+   * allowed to be chosen. So the 1st replica should be placed on NODE1_FD0_UD0,
+   * the 2nd replica should be placed on a different fault and upgrade domain
+   * from the first, the 3rd should be on an upgrade domain different from the
+   * first and second
+   * 
+   * @throws Exception
+   */
+  public void testChooseTarget2() throws Exception {
+    List<Node> invalidNodes;
+    HashMap<Node, Node> excludedNodes;
+    DatanodeDescriptor[] targets;
+    invalidNodes = new ArrayList<Node>();
+    invalidNodes.add(NODE_FD1_UD0);
+    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault) replicator;
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    excludedNodes = getMap(invalidNodes);
+    targets = repl.chooseTarget(0, NODE1_FD0_UD0, chosenNodes, excludedNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    excludedNodes = getMap(invalidNodes);
+    chosenNodes.clear();
+    targets = repl.chooseTarget(1, NODE1_FD0_UD0, chosenNodes, excludedNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+
+    excludedNodes = getMap(invalidNodes);
+    chosenNodes.clear();
+    targets = repl.chooseTarget(2, NODE1_FD0_UD0, chosenNodes, excludedNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+    assertTrue(AreFaultAndUpgradeDomainsDifferent(targets[0], targets[1]));
+
+    excludedNodes = getMap(invalidNodes);
+    chosenNodes.clear();
+    targets = repl.chooseTarget(3, NODE1_FD0_UD0, chosenNodes, excludedNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 3) == 0);
+
+    excludedNodes = getMap(invalidNodes);
+    chosenNodes.clear();
+    targets = repl.chooseTarget(4, NODE1_FD0_UD0, chosenNodes, excludedNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], NODE1_FD0_UD0);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 4) == 0);
+  }
+
+  /**
+   * In this test case, client is NODE1_FD0_UD0, but NODE1_FD0_UD0 is not
+   * qualified to be chosen. So the 1st replica should be placed on
+   * NODE_FD0_UD1, the 2nd replica should be placed on a different fault and
+   * upgrade domain, the 3rd replica should be placed on an upgrade domain
+   * different from the first two
+   * 
+   * @throws Exception
+   */
+  public void testChooseTarget3() throws Exception {
+    // make data node 0 to be not qualified to choose
+    NODE1_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, (FSConstants.MIN_BLOCKS_FOR_WRITE - 1) * BLOCK_SIZE,
+        0); // no space
+
+    // nodes that should not be selected
+    ArrayList<Node> invalidNodes = new ArrayList<Node>();
+    invalidNodes.add(NODE1_FD0_UD0);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+
+    targets = replicator.chooseTarget(filename, 2, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+    assertTrue(AreFaultAndUpgradeDomainsDifferent(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 3) == 0);
+
+    targets = replicator.chooseTarget(filename, 4, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertFalse(areInvalidNodesSelected(invalidNodes, targets));
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 3) == 0);
+
+    NODE1_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0);
+  }
+
+  /**
+   * In this testcase, client is NODE1_FD0_UD0, but none of the nodes on fault
+   * domain 0 and upgrade domain 0 is qualified to be chosen. So the 1st replica
+   * should be placed on other fault and upgrade domains. the 2nd replica should
+   * be placed on a fault and upgrade domain different from the first and the
+   * 3rd replica should be placed on an upgrade domain different from both.
+   * 
+   * @throws Exception
+   */
+  public void testChooseTarget4() throws Exception {
+    // make data node 0 & 6 to be not qualified to choose: not enough disk space
+
+    NODE1_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, (FSConstants.MIN_BLOCKS_FOR_WRITE - 1) * BLOCK_SIZE,
+        0);
+    NODE2_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, (FSConstants.MIN_BLOCKS_FOR_WRITE - 1) * BLOCK_SIZE,
+        0);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(targets[0], NODE1_FD0_UD0));
+
+    targets = replicator.chooseTarget(filename, 2, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], NODE1_FD0_UD0));
+    assertTrue(AreFaultAndUpgradeDomainsDifferent(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, NODE1_FD0_UD0, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 3) == 0);
+
+    NODE1_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0);
+    NODE2_FD0_UD0.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE
+        * BLOCK_SIZE, 0L, FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0);
+  }
+
+  /**
+   * In this testcase, client is a node outside of file system. So the 1st
+   * replica can be placed on any node. the 2nd replica should be placed on a
+   * fault and upgrade domain different from the first and the 3rd replica
+   * should be placed on an upgrade domain different from both.
+   * 
+   * @throws Exception
+   */
+  public void testChooseTarget5() throws Exception {
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE_FD1_UD3, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE_FD1_UD3, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+
+    targets = replicator.chooseTarget(filename, 2, NODE_FD1_UD3, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(AreFaultAndUpgradeDomainsDifferent(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, NODE_FD1_UD3, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(targets), (short) 3) == 0);
+  }
+
+  /**
+   * This testcase tests re-replication, when NODE1_FD0_UD0 is already chosen.
+   * the 1st replica should be placed on a fault and upgrade domain different
+   * from the first and the 2nd replica should be placed on an upgrade domain
+   * different from both.The 3rd replica can be placed randomly.
+   * 
+   * @throws Exception
+   */
+  public void testRereplicate1() throws Exception {
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(NODE1_FD0_UD0);
+    DatanodeDescriptor[] targets;
+
+    targets = replicator.chooseTarget(filename, 0, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(AreFaultAndUpgradeDomainsDifferent(NODE1_FD0_UD0, targets[0]));
+
+    targets = replicator.chooseTarget(filename, 2, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 3) == 0);
+
+    targets = replicator.chooseTarget(filename, 3, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 4) == 0);
+  }
+
+  /**
+   * This test case tests re-replication, when NODE1_FD0_UD0 and NODE_FD0_UD1
+   * are already chosen. So the 1st replica should be placed in an upgrade and
+   * fault domain different from both and the other replicas can be placed
+   * randomly.
+   * 
+   * @throws Exception
+   */
+  public void testRereplicate2() throws Exception {
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(NODE1_FD0_UD0);
+    chosenNodes.add(NODE_FD0_UD1);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 3) == 0);
+
+    targets = replicator.chooseTarget(filename, 2, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 4) == 0);
+  }
+
+  /**
+   * This testcase tests re-replication, when NODE1_FD0_UD0 and NODE_FD0_UD2 are
+   * already chosen. So the 1st replica should be placed on an upgrade domain
+   * different from both.
+   * 
+   * @throws Exception
+   */
+  public void testRereplicate3() throws Exception {
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(NODE1_FD0_UD0);
+    chosenNodes.add(NODE_FD0_UD2);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 3) == 0);
+
+    targets = replicator.chooseTarget(filename, 2, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 4) == 0);
+  }
+
+  /**
+   * This testcase tests re-replication, when NODE1_FD0_UD0 and NODE2_FD0_UD0
+   * are already chosen. So the 1st replica should be placed on a different
+   * upgrade and fault domain. The 2nd replica should be placed on an upgrade
+   * domain different from previous replicas
+   * 
+   * @throws Exception
+   */
+  public void testRereplicate4() throws Exception {
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(NODE1_FD0_UD0);
+    chosenNodes.add(NODE2_FD0_UD0);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 3) == 1);
+
+    targets = replicator.chooseTarget(filename, 2, NODE1_FD0_UD0, chosenNodes,
+        BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(replicator.verifyBlockPlacement(filename,
+        getLocatedBlock(getSelectedNodes(chosenNodes, targets)), (short) 4) == 0);
+  }
+
+  /**
+   * tests canDelete method for the following cases a) nodes in same network
+   * location b) nodes in same upgrade domains c) nodes in same fault domains d)
+   * nodes in similar fault and upgrade domains with high replication e) nodes
+   * in different fault and upgrade domains with high replication
+   */
+  public void testCanDelete() {
+
+    AzureBlockPlacementPolicy policy = (AzureBlockPlacementPolicy) replicator;
+    short replication = 3;
+    List<DatanodeDescriptor> nodesWithReplicas = new ArrayList<DatanodeDescriptor>();
+    nodesWithReplicas.add(NODE1_FD0_UD0);
+    nodesWithReplicas.add(NODE2_FD0_UD0);
+    nodesWithReplicas.add(NODE_FD0_UD1);
+
+    // NODE1_FD0_UD0 can be deleted as it is in the same network location
+    // as datanode[6].NODE_FD0_UD1 cannot be deleted as its removal decreases
+    // the number of fault and upgrade domains containing the replicas.
+    assertTrue(policy.canDelete(NODE1_FD0_UD0, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD0_UD1, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE2_FD0_UD0, nodesWithReplicas, replication));
+
+    nodesWithReplicas.clear();
+    nodesWithReplicas.add(NODE1_FD0_UD0);
+    nodesWithReplicas.add(NODE_FD0_UD1);
+    nodesWithReplicas.add(NODE_FD1_UD0);
+
+    // NODE1_FD0_UD0 can be deleted as its removal does not decrease
+    // the number of fault or upgrade domains containing the replicas.
+    // NODE_FD0_UD1 cannot be deleted as it would
+    // decrease the no of upgrade domains containing the replica.
+    // NODE_FD1_UD0 cannot be deleted as it would
+    // decrease the no of fault domains containing the replica.
+    assertTrue(policy.canDelete(NODE1_FD0_UD0, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD0_UD1, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD1_UD0, nodesWithReplicas, replication));
+
+    nodesWithReplicas.clear();
+    nodesWithReplicas.add(NODE1_FD0_UD0);
+    nodesWithReplicas.add(NODE_FD0_UD1);
+    nodesWithReplicas.add(NODE_FD0_UD2);
+    nodesWithReplicas.add(NODE_FD1_UD0);
+
+    // Though the replication factor is greater than 3,
+    // datanodes 1,2 cannot be deleted as their removal decreases
+    // the number of upgrade domains containing the replicas below the min
+    // upgrade domains criteria.
+    // NODE_FD1_UD0 cannot be deleted as the removal would
+    // decrease the no of fault domains containing the replicas.
+    assertTrue(policy.canDelete(NODE1_FD0_UD0, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD0_UD1, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD0_UD2, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD1_UD0, nodesWithReplicas, replication));
+
+    nodesWithReplicas.clear();
+    nodesWithReplicas.add(NODE1_FD0_UD0);
+    nodesWithReplicas.add(NODE_FD0_UD2);
+    nodesWithReplicas.add(NODE_FD1_UD1);
+    cluster.add(NODE_FD1_UD3);
+    nodesWithReplicas.add(NODE_FD1_UD3);
+
+    // any node can be deleted because, though it could
+    // decrease the number of fault and upgrade domains containing
+    // the replicas, the min criteria of 2 fault and 3 upgrade
+    // domains is being satisfied.
+    assertTrue(policy.canDelete(NODE1_FD0_UD0, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE_FD0_UD2, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE_FD1_UD1, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE_FD1_UD3, nodesWithReplicas, replication));
+
+    nodesWithReplicas.clear();
+    nodesWithReplicas.add(NODE_FD1_UD0);
+    nodesWithReplicas.add(NODE_FD1_UD1);
+    nodesWithReplicas.add(NODE_FD1_UD2);
+    nodesWithReplicas.add(NODE_FD1_UD3);
+
+    // any node can be deleted because, though it could
+    // decrease the number of fault and upgrade domains containing
+    // the replicas, the min criteria of (min of 2 or orginal no of
+    // fault domains) and (min of 3 or original no of upgrade
+    // domains) is being satisfied.
+    assertTrue(policy.canDelete(NODE_FD1_UD0, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE_FD1_UD1, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE_FD1_UD2, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE_FD1_UD3, nodesWithReplicas, replication));
+
+    cluster.remove(NODE_FD1_UD3);
+
+    replication = 1;
+    nodesWithReplicas.clear();
+    nodesWithReplicas.add(NODE1_FD0_UD0);
+    nodesWithReplicas.add(NODE_FD0_UD1);
+
+    // Any node can be deleted as replication factor is just 1
+    assertTrue(policy.canDelete(NODE1_FD0_UD0, nodesWithReplicas, replication));
+    assertTrue(policy.canDelete(NODE_FD0_UD1, nodesWithReplicas, replication));
+
+    replication = 2;
+    nodesWithReplicas.clear();
+    nodesWithReplicas.add(NODE1_FD0_UD0);
+    nodesWithReplicas.add(NODE_FD0_UD1);
+    nodesWithReplicas.add(NODE_FD1_UD0);
+
+    // NODE1_FD0_UD0 can be deleted as it does not decrease the number of
+    // upgrade
+    // domains, containing the replica whereas NODE_FD0_UD1 cannot be as it
+    // would decrease the number of upgrade domains. NODE_FD1_UD0 cannot be
+    // deleted as it would decrease the fault domains.
+    assertTrue(policy.canDelete(NODE1_FD0_UD0, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD0_UD1, nodesWithReplicas, replication));
+    assertFalse(policy.canDelete(NODE_FD1_UD0, nodesWithReplicas, replication));
+  }
+
+  /**
+   * Tests can move for every possible node when replicas are spread as per the
+   * default placement i.e. across 2 fault and 3 upgrade domains
+   */
+  public void testcanMove1() {
+    List<DatanodeInfo> blockLocations = new ArrayList<DatanodeInfo>();
+    blockLocations.add(NODE1_FD0_UD0);
+    blockLocations.add(NODE_FD0_UD1);
+    blockLocations.add(NODE_FD1_UD2);
+    cluster.add(NODE_FD1_UD3);
+
+    // check if you can move NODE1_FD0_UD0 to various nodes
+    assertFalse(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD0_UD1,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD0_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD0,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD1,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD3,
+        blockLocations));
+
+    // check if you can move NODE_FD0_UD1 to various nodes
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE1_FD0_UD0,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD0_UD2,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD1,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD2,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD3,
+        blockLocations));
+
+    // check if you can move NODE_FD1_UD2 to various nodes
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD2, NODE1_FD0_UD0,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD2, NODE_FD0_UD1,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD2, NODE_FD0_UD2,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD2, NODE_FD1_UD0,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD2, NODE_FD1_UD1,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD2, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD1_UD2, NODE_FD1_UD3,
+        blockLocations));
+
+    cluster.remove(NODE_FD1_UD3);
+  }
+
+  /**
+   * Tests can move for every possible node when replicas have lesser spread
+   * than the default placement i.e. across 1 fault and 2 upgrade domains
+   */
+  public void testcanMove2() {
+    List<DatanodeInfo> blockLocations = new ArrayList<DatanodeInfo>();
+    // replicas are spread across 1 fault domain and 2 upgrade domains
+    blockLocations.add(NODE1_FD0_UD0);
+    blockLocations.add(NODE_FD0_UD1);
+    cluster.add(NODE_FD1_UD3);
+
+    // check if you can move NODE1_FD0_UD0 to various nodes
+    assertFalse(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD0_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD0_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD0,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD3,
+        blockLocations));
+
+    // check if you can move NODE_FD0_UD1 to various nodes
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE1_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD0_UD2,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD2,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD3,
+        blockLocations));
+
+    cluster.remove(NODE_FD1_UD3);
+  }
+
+  /**
+   * Tests can move for every possible node when replicas have more spread than
+   * the default placement i.e. across 2 fault and 4 upgrade domains
+   */
+  public void testcanMove3() {
+    List<DatanodeInfo> blockLocations = new ArrayList<DatanodeInfo>();
+    // replicas are spread across 2 fault domains and 4 upgrade domains
+    blockLocations.add(NODE1_FD0_UD0);
+    blockLocations.add(NODE_FD0_UD1);
+    blockLocations.add(NODE_FD0_UD2);
+    blockLocations.add(NODE_FD1_UD3);
+    cluster.add(NODE_FD1_UD3);
+
+    // check if you can move NODE1_FD0_UD0 to various nodes
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD0_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD0_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE1_FD0_UD0, NODE_FD1_UD3,
+        blockLocations));
+
+    // check if you can move NODE_FD0_UD1 to various nodes
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE1_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD0_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD1, NODE_FD1_UD3,
+        blockLocations));
+
+    // check if you can move NODE_FD0_UD2 to various nodes
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD2, NODE1_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD2, NODE_FD0_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD2, NODE_FD1_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD2, NODE_FD1_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD2, NODE_FD1_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD2, NODE2_FD0_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD0_UD2, NODE_FD1_UD3,
+        blockLocations));
+
+    // check if you can move NODE to various nodes
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD3, NODE1_FD0_UD0,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD3, NODE_FD0_UD1,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD3, NODE_FD0_UD2,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD1_UD3, NODE_FD1_UD0,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD1_UD3, NODE_FD1_UD1,
+        blockLocations));
+    assertTrue(replicator.canMove(dummyBlock, NODE_FD1_UD3, NODE_FD1_UD2,
+        blockLocations));
+    assertFalse(replicator.canMove(dummyBlock, NODE_FD1_UD3, NODE2_FD0_UD0,
+        blockLocations));
+
+    cluster.remove(NODE_FD1_UD3);
+  }
+
+  // This is a sanity test to ensure that
+  // when there is no script to map ips to racks or the mapping script fails
+  // and default-racks are used for all nodes, the azure placement policy
+  // handles it gracefully for various replication factors.
+  // This tests the replication, re-replication, deletion
+  // and movement of nodes for replication factors 1 to 4
+  public void testPolicyWithDefaultRacks() {
+    // clear the old topology
+    for (Node node : dataNodes) {
+      cluster.remove(node);
+    }
+
+    // create a new topology with same racks for all nodes
+    String rackLocation = NetworkTopology.DEFAULT_RACK;
+
+    DatanodeDescriptor newDataNodes[] = new DatanodeDescriptor[] {
+        new DatanodeDescriptor(new DatanodeID("n1:5020"), rackLocation),
+        new DatanodeDescriptor(new DatanodeID("n2:5020"), rackLocation),
+        new DatanodeDescriptor(new DatanodeID("n3:5020"), rackLocation),
+        new DatanodeDescriptor(new DatanodeID("n4:5020"), rackLocation),
+        new DatanodeDescriptor(new DatanodeID("n5:5020"), rackLocation) };
+
+    for (DatanodeDescriptor node : newDataNodes) {
+      cluster.add(node);
+      node.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
+          0L, 2 * FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0);
+    }
+
+    for (int replication = 1; replication < newDataNodes.length - 1; replication++) {
+
+      // test replication
+      DatanodeDescriptor[] targets;
+      targets = replicator.chooseTarget(filename, replication, newDataNodes[0],
+          BLOCK_SIZE);
+      assertEquals(targets.length, replication);
+
+      assertTrue(replicator.verifyBlockPlacement(filename,
+          getLocatedBlock(targets), (short) replication) == 0);
+
+      // test re-replication
+      List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+      chosenNodes.add(newDataNodes[0]);
+      targets = replicator.chooseTarget(filename, replication - 1,
+          newDataNodes[0], chosenNodes, BLOCK_SIZE);
+      assertEquals(targets.length, replication - 1);
+
+      // test deletion and movement
+      List<DatanodeDescriptor> nodesWithReplicas = new ArrayList<DatanodeDescriptor>();
+      List<DatanodeInfo> blockLocations = new ArrayList<DatanodeInfo>();
+      for (DatanodeDescriptor dn : newDataNodes) {
+        nodesWithReplicas.add(dn);
+        blockLocations.add(dn);
+      }
+
+      for (int i = 0; i < newDataNodes.length; i++) {
+        // test deletion
+        assertTrue(((AzureBlockPlacementPolicy) replicator).canDelete(
+            newDataNodes[i], nodesWithReplicas, (short) replication));
+
+        // just pick the next node for move. all of them are equivalent
+        DatanodeInfo target = newDataNodes[(i + 1) % newDataNodes.length];
+
+        // test can move
+        replicator.canMove(dummyBlock, newDataNodes[i], target, blockLocations);
+      }
+    }
+
+    // clear the new topology
+    for (Node node : newDataNodes) {
+      cluster.remove(node);
+    }
+
+    // get back to original topology
+    for (DatanodeDescriptor node : dataNodes) {
+      cluster.add(node);
+      node.updateHeartbeat(2 * FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
+          0L, 2 * FSConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0);
+    }
+  }
+
+  private static boolean AreFaultAndUpgradeDomainsDifferent(
+      DatanodeDescriptor d1, DatanodeDescriptor d2) {
+    assert ((d1 != null) && (d2 != null));
+
+    Node parent1 = d1.getParent();
+    Node parent2 = d2.getParent();
+
+    assert ((parent1 != null) && (parent2 != null));
+
+    Node grandParent1 = parent1.getParent();
+    Node grandParent2 = parent2.getParent();
+
+    assert ((grandParent1 != null) && (grandParent2 != null));
+
+    if ((!parent1.getName().equals(parent2.getName()))
+        && (!grandParent1.getName().equals(grandParent2.getName()))) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private static LocatedBlock getLocatedBlock(DatanodeDescriptor[] locations) {
+    LocatedBlock block = new LocatedBlock(new Block(), locations);
+    return block;
+  }
+
+  private static boolean areInvalidNodesSelected(List<Node> invalidNodes,
+      DatanodeDescriptor[] targets) {
+    TreeSet<Node> dataNodeSet = new TreeSet<Node>();
+    for (Node dn : invalidNodes) {
+      dataNodeSet.add(dn);
+    }
+
+    for (DatanodeDescriptor dnd : targets) {
+      if (dataNodeSet.contains(dnd))
+        return true;
+    }
+    return false;
+  }
+
+  private static DatanodeDescriptor[] getSelectedNodes(
+      List<DatanodeDescriptor> chooseNodes, DatanodeDescriptor[] targets) {
+    DatanodeDescriptor[] datanodes = new DatanodeDescriptor[chooseNodes.size()
+        + targets.length];
+    int i = 0;
+    for (DatanodeDescriptor dn : chooseNodes) {
+      datanodes[i++] = dn;
+    }
+
+    for (int j = 0; j < targets.length; j++) {
+      datanodes[i++] = targets[j];
+    }
+    return datanodes;
+  }
+
+  private static HashMap<Node, Node> getMap(List<Node> nodes) {
+    HashMap<Node, Node> map = new HashMap<Node, Node>();
+    for (Node node : nodes) {
+      map.put(node, node);
+    }
+    return map;
+  }
+}
+



Mime
View raw message