hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1550363 [6/8] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/...
Date Thu, 12 Dec 2013 07:17:58 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Thu Dec 12 07:17:51 2013
@@ -17,13 +17,16 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
@@ -57,17 +61,9 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
@@ -149,12 +145,18 @@ public class TestPBHelper {
   void compare(DatanodeID dn, DatanodeID dn2) {
     assertEquals(dn.getIpAddr(), dn2.getIpAddr());
     assertEquals(dn.getHostName(), dn2.getHostName());
-    assertEquals(dn.getStorageID(), dn2.getStorageID());
+    assertEquals(dn.getDatanodeUuid(), dn2.getDatanodeUuid());
     assertEquals(dn.getXferPort(), dn2.getXferPort());
     assertEquals(dn.getInfoPort(), dn2.getInfoPort());
     assertEquals(dn.getIpcPort(), dn2.getIpcPort());
   }
 
+  void compare(DatanodeStorage dns1, DatanodeStorage dns2) {
+    assertThat(dns2.getStorageID(), is(dns1.getStorageID()));
+    assertThat(dns2.getState(), is(dns1.getState()));
+    assertThat(dns2.getStorageType(), is(dns1.getStorageType()));
+  }
+
   @Test
   public void testConvertBlock() {
     Block b = new Block(1, 100, 3);
@@ -164,8 +166,10 @@ public class TestPBHelper {
   }
 
   private static BlockWithLocations getBlockWithLocations(int bid) {
-    return new BlockWithLocations(new Block(bid, 0, 1), new String[] { "dn1",
-        "dn2", "dn3" });
+    final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
+    final String[] storageIDs = {"s1", "s2", "s3"};
+    return new BlockWithLocations(new Block(bid, 0, 1),
+        datanodeUuids, storageIDs);
   }
 
   private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
@@ -428,6 +432,30 @@ public class TestPBHelper {
         DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", 
             AdminStates.NORMAL)
     };
+    String[] storageIDs = {"s1", "s2", "s3"};
+    StorageType[] media = {
+        StorageType.DISK,
+        StorageType.SSD,
+        StorageType.DISK
+    };
+    LocatedBlock lb = new LocatedBlock(
+        new ExtendedBlock("bp12", 12345, 10, 53),
+        dnInfos, storageIDs, media, 5, false, new DatanodeInfo[]{});
+    lb.setBlockToken(new Token<BlockTokenIdentifier>(
+        "identifier".getBytes(), "password".getBytes(), new Text("kind"),
+        new Text("service")));
+    return lb;
+  }
+
+  private LocatedBlock createLocatedBlockNoStorageMedia() {
+    DatanodeInfo[] dnInfos = {
+        DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1",
+                                         AdminStates.DECOMMISSION_INPROGRESS),
+        DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
+                                         AdminStates.DECOMMISSIONED),
+        DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
+                                         AdminStates.NORMAL)
+    };
     LocatedBlock lb = new LocatedBlock(
         new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
     lb.setBlockToken(new Token<BlockTokenIdentifier>(
@@ -445,6 +473,14 @@ public class TestPBHelper {
   }
 
   @Test
+  public void testConvertLocatedBlockNoStorageMedia() {
+    LocatedBlock lb = createLocatedBlockNoStorageMedia();
+    LocatedBlockProto lbProto = PBHelper.convert(lb);
+    LocatedBlock lb2 = PBHelper.convert(lbProto);
+    compare(lb,lb2);
+  }
+
+  @Test
   public void testConvertLocatedBlockList() {
     ArrayList<LocatedBlock> lbl = new ArrayList<LocatedBlock>();
     for (int i=0;i<3;i++) {
@@ -487,6 +523,16 @@ public class TestPBHelper {
     compare(reg, reg2);
     assertEquals(reg.getSoftwareVersion(), reg2.getSoftwareVersion());
   }
+
+  @Test
+  public void TestConvertDatanodeStorage() {
+    DatanodeStorage dns1 = new DatanodeStorage(
+        "id1", DatanodeStorage.State.NORMAL, StorageType.SSD);
+
+    DatanodeStorageProto proto = PBHelper.convert(dns1);
+    DatanodeStorage dns2 = PBHelper.convert(proto);
+    compare(dns1, dns2);
+  }
   
   @Test
   public void testConvertBlockCommand() {
@@ -496,8 +542,9 @@ public class TestPBHelper {
     dnInfos[0][0] = DFSTestUtil.getLocalDatanodeInfo();
     dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
     dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
+    String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
     BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
-        blocks, dnInfos);
+        blocks, dnInfos, storageIDs);
     BlockCommandProto bcProto = PBHelper.convert(bc);
     BlockCommand bc2 = PBHelper.convert(bcProto);
     assertEquals(bc.getAction(), bc2.getAction());

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Thu Dec 12 07:17:51 2013
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutExcep
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.server.bal
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
+import org.apache.log4j.Level;
 import org.junit.Test;
 
 /**
@@ -58,7 +60,10 @@ import org.junit.Test;
 public class TestBalancer {
   private static final Log LOG = LogFactory.getLog(
   "org.apache.hadoop.hdfs.TestBalancer");
-  
+  static {
+    ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   final static long CAPACITY = 500L;
   final static String RACK0 = "/rack0";
   final static String RACK1 = "/rack1";
@@ -292,6 +297,16 @@ public class TestBalancer {
     } while (!balanced);
   }
 
+  String long2String(long[] array) {
+    if (array.length == 0) {
+      return "<empty>";
+    }
+    StringBuilder b = new StringBuilder("[").append(array[0]);
+    for(int i = 1; i < array.length; i++) {
+      b.append(", ").append(array[i]);
+    }
+    return b.append("]").toString();
+  }
   /** This test start a cluster with specified number of nodes, 
    * and fills it to be 30% full (with a single file replicated identically
    * to all datanodes);
@@ -308,6 +323,11 @@ public class TestBalancer {
    */
   private void doTest(Configuration conf, long[] capacities, String[] racks, 
       long newCapacity, String newRack, boolean useTool) throws Exception {
+    LOG.info("capacities = " +  long2String(capacities)); 
+    LOG.info("racks      = " +  Arrays.asList(racks)); 
+    LOG.info("newCapacity= " +  newCapacity); 
+    LOG.info("newRack    = " +  newRack); 
+    LOG.info("useTool    = " +  useTool); 
     assertEquals(capacities.length, racks.length);
     int numOfDatanodes = capacities.length;
     cluster = new MiniDFSCluster.Builder(conf)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Thu Dec 12 07:17:51 2013
@@ -18,15 +18,18 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.Daemon;
 import org.junit.Assert;
 
@@ -83,9 +86,8 @@ public class BlockManagerTestUtil {
     final Set<String> rackSet = new HashSet<String>(0);
     final Collection<DatanodeDescriptor> corruptNodes = 
        getCorruptReplicas(blockManager).getNodes(b);
-    for (Iterator<DatanodeDescriptor> it = blockManager.blocksMap.nodeIterator(b); 
-         it.hasNext();) {
-      DatanodeDescriptor cur = it.next();
+    for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
+      final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
         if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
           String rackName = cur.getNetworkLocation();
@@ -215,4 +217,52 @@ public class BlockManagerTestUtil {
   public static void checkHeartbeat(BlockManager bm) {
     bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
   }
+  
+  public static DatanodeStorageInfo updateStorage(DatanodeDescriptor dn,
+      DatanodeStorage s) {
+    return dn.updateStorage(s);
+  }
+
+  public static DatanodeDescriptor getLocalDatanodeDescriptor(
+      boolean initializeStorage) {
+    DatanodeDescriptor dn = new DatanodeDescriptor(DFSTestUtil.getLocalDatanodeID());
+    if (initializeStorage) {
+      dn.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
+    }
+    return dn;
+  }
+  
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, boolean initializeStorage) {
+    return getDatanodeDescriptor(ipAddr, rackLocation,
+        initializeStorage? new DatanodeStorage(DatanodeStorage.generateUuid()): null);
+  }
+
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, DatanodeStorage storage) {
+      DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
+          DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
+      if (storage != null) {
+        dn.updateStorage(storage);
+      }
+      return dn;
+  }
+
+  public static DatanodeStorageInfo newDatanodeStorageInfo(
+      DatanodeDescriptor dn, DatanodeStorage s) {
+    return new DatanodeStorageInfo(dn, s);
+  }
+
+  public static StorageReport[] getStorageReportsForDatanode(
+      DatanodeDescriptor dnd) {
+    ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
+    for (DatanodeStorageInfo storage : dnd.getStorageInfos()) {
+      StorageReport report = new StorageReport(
+          storage.getStorageID(), false, storage.getCapacity(),
+          storage.getDfsUsed(), storage.getRemaining(),
+          storage.getBlockPoolUsed());
+      reports.add(report);
+    }
+    return reports.toArray(StorageReport.EMPTY_ARRAY);
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java Thu Dec 12 07:17:51 2013
@@ -31,18 +31,19 @@ import org.junit.Test;
 public class TestBlockInfoUnderConstruction {
   @Test
   public void testInitializeBlockRecovery() throws Exception {
-    DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1",
-        "default");
-    DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2",
-        "default");
-    DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3",
-        "default");
+    DatanodeStorageInfo s1 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.1", "s1");
+    DatanodeDescriptor dd1 = s1.getDatanodeDescriptor();
+    DatanodeStorageInfo s2 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.2", "s2");
+    DatanodeDescriptor dd2 = s2.getDatanodeDescriptor();
+    DatanodeStorageInfo s3 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.3", "s3");
+    DatanodeDescriptor dd3 = s3.getDatanodeDescriptor();
+
     dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
     BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
         new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP),
         3,
         BlockUCState.UNDER_CONSTRUCTION,
-        new DatanodeDescriptor[] {dd1, dd2, dd3});
+        new DatanodeStorageInfo[] {s1, s2, s3});
 
     // Recovery attempt #1.
     long currentTime = System.currentTimeMillis();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Thu Dec 12 07:17:51 2013
@@ -22,9 +22,14 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -39,11 +44,11 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.net.NetworkTopology;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import static org.mockito.Mockito.*;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
@@ -51,6 +56,7 @@ import com.google.common.collect.LinkedL
 import com.google.common.collect.Lists;
 
 public class TestBlockManager {
+  private DatanodeStorageInfo[] storages;
   private List<DatanodeDescriptor> nodes;
   private List<DatanodeDescriptor> rackA;
   private List<DatanodeDescriptor> rackB;
@@ -79,28 +85,29 @@ public class TestBlockManager {
     fsn = Mockito.mock(FSNamesystem.class);
     Mockito.doReturn(true).when(fsn).hasWriteLock();
     bm = new BlockManager(fsn, fsn, conf);
-    nodes = ImmutableList.of(
-        DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackB"),
-        DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackB"),
-        DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackB")
-      );
+    final String[] racks = {
+        "/rackA",
+        "/rackA",
+        "/rackA",
+        "/rackB",
+        "/rackB",
+        "/rackB"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    nodes = Arrays.asList(DFSTestUtil.toDatanodeDescriptor(storages));
     rackA = nodes.subList(0, 3);
     rackB = nodes.subList(3, 6);
   }
-  
+
   private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {
     NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
     // construct network topology
     for (DatanodeDescriptor dn : nodesToAdd) {
       cluster.add(dn);
+      dn.getStorageInfos()[0].setUtilizationForTesting(
+          2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
       dn.updateHeartbeat(
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          0L, 0L,
-          0, 0);
+          BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0);
       bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
     }
   }
@@ -125,17 +132,18 @@ public class TestBlockManager {
   }
   
   private void doBasicTest(int testIndex) {
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1);
-    BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
+    BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
 
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertEquals(2, pipeline.length);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertTrue("Destination of replication should be on the other rack. " +
         "Was: " + pipeline[1],
-        rackB.contains(pipeline[1]));
+        rackB.contains(pipeline[1].getDatanodeDescriptor()));
   }
   
 
@@ -156,21 +164,22 @@ public class TestBlockManager {
   
   private void doTestTwoOfThreeNodesDecommissioned(int testIndex) throws Exception {
     // Block originally on A1, A2, B1
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
     
     // Decommission two of the nodes (A1, A2)
     List<DatanodeDescriptor> decomNodes = startDecommission(0, 1);
     
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertEquals("Should have three targets", 3, pipeline.length);
     
     boolean foundOneOnRackA = false;
     for (int i = 1; i < pipeline.length; i++) {
-      DatanodeDescriptor target = pipeline[i];
+      DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
       if (rackA.contains(target)) {
         foundOneOnRackA = true;
       }
@@ -199,22 +208,23 @@ public class TestBlockManager {
 
   private void doTestAllNodesHoldingReplicasDecommissioned(int testIndex) throws Exception {
     // Block originally on A1, A2, B1
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
     
     // Decommission all of the nodes
     List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 3);
     
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertEquals("Should have three targets", 4, pipeline.length);
     
     boolean foundOneOnRackA = false;
     boolean foundOneOnRackB = false;
     for (int i = 1; i < pipeline.length; i++) {
-      DatanodeDescriptor target = pipeline[i];
+      DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
       if (rackA.contains(target)) {
         foundOneOnRackA = true;
       } else if (rackB.contains(target)) {
@@ -251,21 +261,22 @@ public class TestBlockManager {
   
   private void doTestOneOfTwoRacksDecommissioned(int testIndex) throws Exception {
     // Block originally on A1, A2, B1
-    List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
+    List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
+    List<DatanodeDescriptor> origNodes = getNodes(origStorages);
     BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
     
     // Decommission all of the nodes in rack A
     List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 2);
     
-    DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
+    DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origStorages.contains(pipeline[0]));
     assertEquals("Should have three targets", 3, pipeline.length);
     
     boolean foundOneOnRackB = false;
     for (int i = 1; i < pipeline.length; i++) {
-      DatanodeDescriptor target = pipeline[i];
+      DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
       if (rackB.contains(target)) {
         foundOneOnRackB = true;
       }
@@ -284,11 +295,12 @@ public class TestBlockManager {
     // the third off-rack replica.
     DatanodeDescriptor rackCNode =
       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/rackC");
+    rackCNode.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
     addNodes(ImmutableList.of(rackCNode));
     try {
-      DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
+      DatanodeStorageInfo[] pipeline2 = scheduleSingleReplication(blockInfo);
       assertEquals(2, pipeline2.length);
-      assertEquals(rackCNode, pipeline2[1]);
+      assertEquals(rackCNode, pipeline2[1].getDatanodeDescriptor());
     } finally {
       removeNode(rackCNode);
     }
@@ -309,30 +321,30 @@ public class TestBlockManager {
   private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) {
     // Originally on only nodes in rack A.
     List<DatanodeDescriptor> origNodes = rackA;
-    BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
-    DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
+    BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
+    DatanodeStorageInfo pipeline[] = scheduleSingleReplication(blockInfo);
     
     assertEquals(2, pipeline.length); // single new copy
     assertTrue("Source of replication should be one of the nodes the block " +
         "was on. Was: " + pipeline[0],
-        origNodes.contains(pipeline[0]));
+        origNodes.contains(pipeline[0].getDatanodeDescriptor()));
     assertTrue("Destination of replication should be on the other rack. " +
         "Was: " + pipeline[1],
-        rackB.contains(pipeline[1]));
+        rackB.contains(pipeline[1].getDatanodeDescriptor()));
   }
   
   @Test
   public void testBlocksAreNotUnderreplicatedInSingleRack() throws Exception {
     List<DatanodeDescriptor> nodes = ImmutableList.of(
-        DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackA"),
-        DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackA")
+        BlockManagerTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA", true),
+        BlockManagerTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA", true),
+        BlockManagerTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA", true),
+        BlockManagerTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackA", true),
+        BlockManagerTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackA", true),
+        BlockManagerTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackA", true)
       );
     addNodes(nodes);
-    List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);;
+    List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);
     for (int i = 0; i < NUM_TEST_ITERS; i++) {
       doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
     }
@@ -342,7 +354,7 @@ public class TestBlockManager {
       List<DatanodeDescriptor> origNodes)
       throws Exception {
     assertEquals(0, bm.numOfUnderReplicatedBlocks());
-    addBlockOnNodes((long)testIndex, origNodes);
+    addBlockOnNodes(testIndex, origNodes);
     bm.processMisReplicatedBlocks();
     assertEquals(0, bm.numOfUnderReplicatedBlocks());
   }
@@ -353,9 +365,11 @@ public class TestBlockManager {
    * pipeline.
    */
   private void fulfillPipeline(BlockInfo blockInfo,
-      DatanodeDescriptor[] pipeline) throws IOException {
+      DatanodeStorageInfo[] pipeline) throws IOException {
     for (int i = 1; i < pipeline.length; i++) {
-      bm.addBlock(pipeline[i], blockInfo, null);
+      DatanodeStorageInfo storage = pipeline[i];
+      bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
+      blockInfo.addStorage(storage);
     }
   }
 
@@ -364,7 +378,9 @@ public class TestBlockManager {
     BlockInfo blockInfo = new BlockInfo(block, 3);
 
     for (DatanodeDescriptor dn : nodes) {
-      blockInfo.addNode(dn);
+      for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
+        blockInfo.addStorage(storage);
+      }
     }
     return blockInfo;
   }
@@ -376,6 +392,22 @@ public class TestBlockManager {
     }
     return ret;
   }
+
+  private List<DatanodeDescriptor> getNodes(List<DatanodeStorageInfo> storages) {
+    List<DatanodeDescriptor> ret = Lists.newArrayList();
+    for (DatanodeStorageInfo s : storages) {
+      ret.add(s.getDatanodeDescriptor());
+    }
+    return ret;
+  }
+
+  private List<DatanodeStorageInfo> getStorages(int ... indexes) {
+    List<DatanodeStorageInfo> ret = Lists.newArrayList();
+    for (int idx : indexes) {
+      ret.add(storages[idx]);
+    }
+    return ret;
+  }
   
   private List<DatanodeDescriptor> startDecommission(int ... indexes) {
     List<DatanodeDescriptor> nodes = getNodes(indexes);
@@ -394,7 +426,7 @@ public class TestBlockManager {
     return blockInfo;
   }
 
-  private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
+  private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
     // list for priority 1
     List<Block> list_p1 = new ArrayList<Block>();
     list_p1.add(block);
@@ -412,27 +444,29 @@ public class TestBlockManager {
     assertTrue("replication is pending after work is computed",
         bm.pendingReplications.getNumReplicas(block) > 0);
 
-    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
+    LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
     assertEquals(1, repls.size());
-    Entry<DatanodeDescriptor, BlockTargetPair> repl =
+    Entry<DatanodeStorageInfo, BlockTargetPair> repl =
       repls.entries().iterator().next();
         
-    DatanodeDescriptor[] targets = repl.getValue().targets;
+    DatanodeStorageInfo[] targets = repl.getValue().targets;
 
-    DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
+    DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
     pipeline[0] = repl.getKey();
     System.arraycopy(targets, 0, pipeline, 1, targets.length);
 
     return pipeline;
   }
 
-  private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
-    LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
+  private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
+    LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
       LinkedListMultimap.create();
     for (DatanodeDescriptor dn : nodes) {
       List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
       if (thisRepls != null) {
-        repls.putAll(dn, thisRepls);
+        for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
+          repls.putAll(storage, thisRepls);
+        }
       }
     }
     return repls;
@@ -455,7 +489,7 @@ public class TestBlockManager {
     addBlockOnNodes(blockId,origNodes.subList(0,1));
 
     List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
-    List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
+    List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
 
     assertNotNull("Chooses source node for a highest-priority replication"
         + " even if all available source nodes have reached their replication"
@@ -478,7 +512,7 @@ public class TestBlockManager {
             UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
 
     // Increase the replication count to test replication count > hard limit
-    DatanodeDescriptor targets[] = { origNodes.get(1) };
+    DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
     origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
 
     assertNull("Does not choose a source node for a highest-priority"
@@ -494,7 +528,11 @@ public class TestBlockManager {
   @Test
   public void testSafeModeIBR() throws Exception {
     DatanodeDescriptor node = spy(nodes.get(0));
-    node.setStorageID("dummy-storage");
+    DatanodeStorageInfo ds = node.getStorageInfos()[0];
+
+    // TODO: Needs to be fixed. DatanodeUuid is not storageID.
+    node.setDatanodeUuidForTesting(ds.getStorageID());
+
     node.isAlive = true;
 
     DatanodeRegistration nodeReg =
@@ -507,35 +545,40 @@ public class TestBlockManager {
     bm.getDatanodeManager().registerDatanode(nodeReg);
     bm.getDatanodeManager().addDatanode(node); // swap in spy    
     assertEquals(node, bm.getDatanodeManager().getDatanode(node));
-    assertTrue(node.isFirstBlockReport());
+    assertEquals(0, ds.getBlockReportCount());
     // send block report, should be processed
     reset(node);
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
-    verify(node).receivedBlockReport();
-    assertFalse(node.isFirstBlockReport());
+    
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", 
+        new BlockListAsLongs(null, null));
+    assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
-    verify(node, never()).receivedBlockReport();
-    assertFalse(node.isFirstBlockReport());
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+        new BlockListAsLongs(null, null));
+    assertEquals(1, ds.getBlockReportCount());
 
     // re-register as if node restarted, should update existing node
     bm.getDatanodeManager().removeDatanode(node);
     reset(node);
     bm.getDatanodeManager().registerDatanode(nodeReg);
     verify(node).updateRegInfo(nodeReg);
-    assertTrue(node.isFirstBlockReport()); // ready for report again
+    assertEquals(0, ds.getBlockReportCount()); // ready for report again
     // send block report, should be processed after restart
     reset(node);
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
-    verify(node).receivedBlockReport();
-    assertFalse(node.isFirstBlockReport());
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+        new BlockListAsLongs(null, null));
+    assertEquals(1, ds.getBlockReportCount());
   }
   
   @Test
   public void testSafeModeIBRAfterIncremental() throws Exception {
     DatanodeDescriptor node = spy(nodes.get(0));
-    node.setStorageID("dummy-storage");
+    DatanodeStorageInfo ds = node.getStorageInfos()[0];
+
+    // TODO: Needs to be fixed. DatanodeUuid is not storageID.
+    node.setDatanodeUuidForTesting(ds.getStorageID());
+
     node.isAlive = true;
 
     DatanodeRegistration nodeReg =
@@ -548,12 +591,13 @@ public class TestBlockManager {
     bm.getDatanodeManager().registerDatanode(nodeReg);
     bm.getDatanodeManager().addDatanode(node); // swap in spy    
     assertEquals(node, bm.getDatanodeManager().getDatanode(node));
-    assertTrue(node.isFirstBlockReport());
+    assertEquals(0, ds.getBlockReportCount());
     // send block report while pretending to already have blocks
     reset(node);
     doReturn(1).when(node).numBlocks();
-    bm.processReport(node, "pool", new BlockListAsLongs(null, null));
-    verify(node).receivedBlockReport();
-    assertFalse(node.isFirstBlockReport());
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+        new BlockListAsLongs(null, null));
+    assertEquals(1, ds.getBlockReportCount());
   }
 }
+

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Thu Dec 12 07:17:51 2013
@@ -55,21 +55,24 @@ public class TestDatanodeDescriptor {
   
   @Test
   public void testBlocksCounter() throws Exception {
-    DatanodeDescriptor dd = DFSTestUtil.getLocalDatanodeDescriptor();
+    DatanodeDescriptor dd = BlockManagerTestUtil.getLocalDatanodeDescriptor(true);
     assertEquals(0, dd.numBlocks());
     BlockInfo blk = new BlockInfo(new Block(1L), 1);
     BlockInfo blk1 = new BlockInfo(new Block(2L), 2);
+    DatanodeStorageInfo[] storages = dd.getStorageInfos();
+    assertTrue(storages.length > 0);
+    final String storageID = storages[0].getStorageID();
     // add first block
-    assertTrue(dd.addBlock(blk));
+    assertTrue(dd.addBlock(storageID, blk));
     assertEquals(1, dd.numBlocks());
     // remove a non-existent block
     assertFalse(dd.removeBlock(blk1));
     assertEquals(1, dd.numBlocks());
     // add an existent block
-    assertFalse(dd.addBlock(blk));
+    assertFalse(dd.addBlock(storageID, blk));
     assertEquals(1, dd.numBlocks());
     // add second block
-    assertTrue(dd.addBlock(blk1));
+    assertTrue(dd.addBlock(storageID, blk1));
     assertEquals(2, dd.numBlocks());
     // remove first block
     assertTrue(dd.removeBlock(blk));

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java Thu Dec 12 07:17:51 2013
@@ -76,7 +76,7 @@ public class TestDatanodeManager {
           it.next();
         }
         DatanodeRegistration toRemove = it.next().getValue();
-        Log.info("Removing node " + toRemove.getStorageID() + " ip " +
+        Log.info("Removing node " + toRemove.getDatanodeUuid() + " ip " +
         toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion());
 
         //Remove that random node
@@ -90,7 +90,7 @@ public class TestDatanodeManager {
         String storageID = "someStorageID" + rng.nextInt(5000);
 
         DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
-        Mockito.when(dr.getStorageID()).thenReturn(storageID);
+        Mockito.when(dr.getDatanodeUuid()).thenReturn(storageID);
 
         //If this storageID had already been registered before
         if(sIdToDnReg.containsKey(storageID)) {
@@ -110,7 +110,7 @@ public class TestDatanodeManager {
         Mockito.when(dr.getSoftwareVersion()).thenReturn(
           "version" + rng.nextInt(5));
 
-        Log.info("Registering node storageID: " + dr.getStorageID() +
+        Log.info("Registering node storageID: " + dr.getDatanodeUuid() +
           ", version: " + dr.getSoftwareVersion() + ", IP address: "
           + dr.getXferAddr());
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java Thu Dec 12 07:17:51 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Test;
 
 /**
@@ -63,6 +63,8 @@ public class TestHeartbeatHandling {
       final DatanodeRegistration nodeReg =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
       final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
+      final String storageID = DatanodeStorage.generateUuid();
+      dd.updateStorage(new DatanodeStorage(storageID));
 
       final int REMAINING_BLOCKS = 1;
       final int MAX_REPLICATE_LIMIT =
@@ -70,7 +72,7 @@ public class TestHeartbeatHandling {
       final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
       final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
       final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
-      final DatanodeDescriptor[] ONE_TARGET = new DatanodeDescriptor[1];
+      final DatanodeStorageInfo[] ONE_TARGET = {dd.getStorageInfo(storageID)};
 
       try {
         namesystem.writeLock();
@@ -144,12 +146,15 @@ public class TestHeartbeatHandling {
       final DatanodeRegistration nodeReg1 =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
       final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
+      dd1.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
       final DatanodeRegistration nodeReg2 =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
       final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
+      dd2.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
       final DatanodeRegistration nodeReg3 = 
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
       final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
+      dd3.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
 
       try {
         namesystem.writeLock();
@@ -162,10 +167,13 @@ public class TestHeartbeatHandling {
           dd1.setLastUpdate(System.currentTimeMillis());
           dd2.setLastUpdate(System.currentTimeMillis());
           dd3.setLastUpdate(System.currentTimeMillis());
+          final DatanodeStorageInfo[] storages = {
+              dd1.getStorageInfos()[0],
+              dd2.getStorageInfos()[0],
+              dd3.getStorageInfos()[0]};
           BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
               new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
-              BlockUCState.UNDER_RECOVERY,
-              new DatanodeDescriptor[] {dd1, dd2, dd3});
+              BlockUCState.UNDER_RECOVERY, storages);
           dd1.addBlockToBeRecovered(blockInfo);
           DatanodeCommand[] cmds =
               NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
@@ -187,8 +195,7 @@ public class TestHeartbeatHandling {
           dd3.setLastUpdate(System.currentTimeMillis());
           blockInfo = new BlockInfoUnderConstruction(
               new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
-              BlockUCState.UNDER_RECOVERY,
-              new DatanodeDescriptor[] {dd1, dd2, dd3});
+              BlockUCState.UNDER_RECOVERY, storages);
           dd1.addBlockToBeRecovered(blockInfo);
           cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
           assertEquals(1, cmds.length);
@@ -209,8 +216,7 @@ public class TestHeartbeatHandling {
           dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
           blockInfo = new BlockInfoUnderConstruction(
               new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
-              BlockUCState.UNDER_RECOVERY,
-              new DatanodeDescriptor[] {dd1, dd2, dd3});
+              BlockUCState.UNDER_RECOVERY, storages);
           dd1.addBlockToBeRecovered(blockInfo);
           cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
           assertEquals(1, cmds.length);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java Thu Dec 12 07:17:51 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -98,12 +97,10 @@ public class TestNodeCount {
       }
       
       // find out a non-excess node
-      final Iterator<DatanodeDescriptor> iter = bm.blocksMap
-          .nodeIterator(block.getLocalBlock());
       DatanodeDescriptor nonExcessDN = null;
-      while (iter.hasNext()) {
-        DatanodeDescriptor dn = iter.next();
-        Collection<Block> blocks = bm.excessReplicateMap.get(dn.getStorageID());
+      for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
+        final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
+        Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
         if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
           nonExcessDN = dn;
           break;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java Thu Dec 12 07:17:51 2013
@@ -103,7 +103,10 @@ public class TestOverReplicatedBlocks {
           String corruptMachineName = corruptDataNode.getXferAddr();
           for (DatanodeDescriptor datanode : hm.getDatanodes()) {
             if (!corruptMachineName.equals(datanode.getXferAddr())) {
-              datanode.updateHeartbeat(100L, 100L, 0L, 100L, 0L, 0L, 0, 0);
+              datanode.getStorageInfos()[0].setUtilizationForTesting(100L, 100L, 0, 100L);
+              datanode.updateHeartbeat(
+                  BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
+                  0L, 0L, 0, 0);
             }
           }
 
@@ -155,7 +158,7 @@ public class TestOverReplicatedBlocks {
       DataNode lastDN = cluster.getDataNodes().get(3);
       DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
           lastDN, namesystem.getBlockPoolId());
-      String lastDNid = dnReg.getStorageID();
+      String lastDNid = dnReg.getDatanodeUuid();
 
       final Path fileName = new Path("/foo2");
       DFSTestUtil.createFile(fs, fileName, SMALL_FILE_LENGTH, (short)4, 0L);
@@ -220,3 +223,4 @@ public class TestOverReplicatedBlocks {
     }
   }
 }
+

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Thu Dec 12 07:17:51 2013
@@ -43,8 +43,8 @@ public class TestPendingDataNodeMessages
   @Test
   public void testQueues() {
     DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
-    msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED);
-    msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
 
     assertEquals(2, msgs.count());
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Thu Dec 12 07:17:51 2013
@@ -43,8 +43,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.junit.Test;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This class tests the internals of PendingReplicationBlocks.java,
  * as well as how PendingReplicationBlocks acts in BlockManager
@@ -54,22 +52,7 @@ public class TestPendingReplication {
   private static final int DFS_REPLICATION_INTERVAL = 1;
   // Number of datanodes in the cluster
   private static final int DATANODE_COUNT = 5;
-  
-  private DatanodeDescriptor genDatanodeId(int seed) {
-    seed = seed % 256;
-    String ip = seed + "." + seed + "." + seed + "." + seed;
-    return DFSTestUtil.getDatanodeDescriptor(ip, null);
-  }
 
-  private DatanodeDescriptor[] genDatanodes(int number) {
-    Preconditions.checkArgument(number >= 0);
-    DatanodeDescriptor[] nodes = new DatanodeDescriptor[number];
-    for (int i = 0; i < number; i++) {
-      nodes[i] = genDatanodeId(i);
-    }
-    return nodes;
-  }
-  
   @Test
   public void testPendingReplication() {
     PendingReplicationBlocks pendingReplications;
@@ -79,9 +62,13 @@ public class TestPendingReplication {
     //
     // Add 10 blocks to pendingReplications.
     //
-    for (int i = 0; i < 10; i++) {
+    DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
+    for (int i = 0; i < storages.length; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.increment(block, genDatanodes(i));
+      DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
+      System.arraycopy(storages, 0, targets, 0, i);
+      pendingReplications.increment(block,
+          DatanodeStorageInfo.toDatanodeDescriptors(targets));
     }
     assertEquals("Size of pendingReplications ",
                  10, pendingReplications.size());
@@ -91,16 +78,18 @@ public class TestPendingReplication {
     // remove one item and reinsert it
     //
     Block blk = new Block(8, 8, 0);
-    pendingReplications.decrement(blk, genDatanodeId(7)); // removes one replica
+    pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
     assertEquals("pendingReplications.getNumReplicas ",
                  7, pendingReplications.getNumReplicas(blk));
 
     for (int i = 0; i < 7; i++) {
       // removes all replicas
-      pendingReplications.decrement(blk, genDatanodeId(i));
+      pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor());
     }
     assertTrue(pendingReplications.size() == 9);
-    pendingReplications.increment(blk, genDatanodes(8));
+    pendingReplications.increment(blk,
+        DatanodeStorageInfo.toDatanodeDescriptors(
+            DFSTestUtil.createDatanodeStorageInfos(8)));
     assertTrue(pendingReplications.size() == 10);
 
     //
@@ -128,7 +117,9 @@ public class TestPendingReplication {
 
     for (int i = 10; i < 15; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.increment(block, genDatanodes(i));
+      pendingReplications.increment(block,
+          DatanodeStorageInfo.toDatanodeDescriptors(
+              DFSTestUtil.createDatanodeStorageInfos(i)));
     }
     assertTrue(pendingReplications.size() == 15);
 
@@ -210,7 +201,7 @@ public class TestPendingReplication {
           DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
               poolId);
           StorageReceivedDeletedBlocks[] report = { 
-              new StorageReceivedDeletedBlocks(dnR.getStorageID(),
+              new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored",
               new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
                   blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
           cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
@@ -227,7 +218,7 @@ public class TestPendingReplication {
           DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
               poolId);
           StorageReceivedDeletedBlocks[] report = 
-            { new StorageReceivedDeletedBlocks(dnR.getStorageID(),
+            { new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored",
               new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
                   blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
           cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
@@ -291,9 +282,9 @@ public class TestPendingReplication {
       cluster.getNamesystem().writeLock();
       try {
         bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
-            "TEST");
+            "STORAGE_ID", "TEST");
         bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
-            "TEST");
+            "STORAGE_ID", "TEST");
       } finally {
         cluster.getNamesystem().writeUnlock();
       }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1550363&r1=1550362&r2=1550363&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Thu Dec 12 07:17:51 2013
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,6 +36,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.test.PathUtils;
@@ -67,6 +71,10 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 public class TestReplicationPolicy {
+  {
+    ((Log4JLogger)BlockPlacementPolicy.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private Random random = DFSUtil.getRandom();
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 6;
@@ -75,6 +83,7 @@ public class TestReplicationPolicy {
   private static BlockPlacementPolicy replicator;
   private static final String filename = "/dummyfile.txt";
   private static DatanodeDescriptor dataNodes[];
+  private static DatanodeStorageInfo[] storages;
   // The interval for marking a datanode as stale,
   private static long staleInterval = 
       DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@@ -82,17 +91,28 @@ public class TestReplicationPolicy {
   @Rule
   public ExpectedException exception = ExpectedException.none();
   
+  private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
+    long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+    long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
+    dn.getStorageInfos()[0].setUtilizationForTesting(
+        capacity, dfsUsed, remaining, blockPoolUsed);
+    dn.updateHeartbeat(
+        BlockManagerTestUtil.getStorageReportsForDatanode(dn),
+        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
+  }
+
   @BeforeClass
   public static void setupCluster() throws Exception {
     Configuration conf = new HdfsConfiguration();
-    dataNodes = new DatanodeDescriptor[] {
-        DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
-        DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),
-        DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"),
-        DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"),
-        DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"),
-        DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
-      };
+    final String[] racks = {
+        "/d1/r1",
+        "/d1/r1",
+        "/d1/r2",
+        "/d1/r2",
+        "/d2/r3",
+        "/d2/r3"};
+    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
+    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
 
     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
     conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
@@ -117,12 +137,19 @@ public class TestReplicationPolicy {
           dataNodes[i]);
     }
     for (int i=0; i < NUM_OF_DATANODES; i++) {
-      dataNodes[i].updateHeartbeat(
+      updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }    
   }
 
+  private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
+    return isOnSameRack(left, right.getDatanodeDescriptor());
+  }
+
+  private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
+    return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
+  }
   /**
    * In this testcase, client is dataNodes[0]. So the 1st replica should be
    * placed on dataNodes[0], the 2nd replica should be placed on 
@@ -134,74 +161,74 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testChooseTarget1() throws Exception {
-    dataNodes[0].updateHeartbeat(
+    updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         0L, 0L, 4, 0); // overloaded
 
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[0]);
+    assertEquals(storages[0], targets[0]);
     
     targets = chooseTarget(2);
     assertEquals(targets.length, 2);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[0], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3);
     assertEquals(targets.length, 3);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertEquals(storages[0], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[1], targets[2]));
 
     targets = chooseTarget(4);
     assertEquals(targets.length, 4);
-    assertEquals(targets[0], dataNodes[0]);
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
-               cluster.isOnSameRack(targets[2], targets[3]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    assertEquals(storages[0], targets[0]);
+    assertTrue(isOnSameRack(targets[1], targets[2]) ||
+               isOnSameRack(targets[2], targets[3]));
+    assertFalse(isOnSameRack(targets[0], targets[2]));
     
-    dataNodes[0].updateHeartbeat(
+    updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
     return chooseTarget(numOfReplicas, dataNodes[0]);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
       DatanodeDescriptor writer) {
     return chooseTarget(numOfReplicas, writer,
-        new ArrayList<DatanodeDescriptor>());
+        new ArrayList<DatanodeStorageInfo>());
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-      List<DatanodeDescriptor> chosenNodes) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      List<DatanodeStorageInfo> chosenNodes) {
     return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-      DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
     return chooseTarget(numOfReplicas, writer, chosenNodes, null);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-      List<DatanodeDescriptor> chosenNodes, Set<Node> excludedNodes) {
+  private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
+      List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
     return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
   }
 
-  private static DatanodeDescriptor[] chooseTarget(
+  private static DatanodeStorageInfo[] chooseTarget(
       int numOfReplicas,
       DatanodeDescriptor writer,
-      List<DatanodeDescriptor> chosenNodes,
+      List<DatanodeStorageInfo> chosenNodes,
       Set<Node> excludedNodes) {
     return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
-        false, excludedNodes, BLOCK_SIZE);
+        false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
   }
 
   /**
@@ -215,8 +242,8 @@ public class TestReplicationPolicy {
   @Test
   public void testChooseTarget2() throws Exception { 
     Set<Node> excludedNodes;
-    DatanodeDescriptor[] targets;
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    DatanodeStorageInfo[] targets;
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     
     excludedNodes = new HashSet<Node>();
     excludedNodes.add(dataNodes[1]); 
@@ -228,49 +255,52 @@ public class TestReplicationPolicy {
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(1, chosenNodes, excludedNodes);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[0]);
+    assertEquals(storages[0], targets[0]);
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(2, chosenNodes, excludedNodes);
     assertEquals(targets.length, 2);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[0], targets[0]);
+
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(3, chosenNodes, excludedNodes);
     assertEquals(targets.length, 3);
-    assertEquals(targets[0], dataNodes[0]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertEquals(storages[0], targets[0]);
+
+    assertFalse(isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[1], targets[2]));
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
     targets = chooseTarget(4, chosenNodes, excludedNodes);
     assertEquals(targets.length, 4);
-    assertEquals(targets[0], dataNodes[0]);
+    assertEquals(storages[0], targets[0]);
+
     for(int i=1; i<4; i++) {
-      assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+      assertFalse(isOnSameRack(targets[0], targets[i]));
     }
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
-               cluster.isOnSameRack(targets[2], targets[3]));
-    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+    assertTrue(isOnSameRack(targets[1], targets[2]) ||
+               isOnSameRack(targets[2], targets[3]));
+    assertFalse(isOnSameRack(targets[1], targets[3]));
 
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.add(dataNodes[1]); 
-    chosenNodes.add(dataNodes[2]);
+    chosenNodes.add(storages[2]);
     targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
-        excludedNodes, BLOCK_SIZE);
+        excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
     System.out.println("targets=" + Arrays.asList(targets));
     assertEquals(2, targets.length);
     //make sure that the chosen node is in the target.
     int i = 0;
-    for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+    for (; i < targets.length && !storages[2].equals(targets[i]); i++);
     assertTrue(i < targets.length);
   }
 
@@ -285,41 +315,41 @@ public class TestReplicationPolicy {
   @Test
   public void testChooseTarget3() throws Exception {
     // make data node 0 to be not qualified to choose
-    dataNodes[0].updateHeartbeat(
+    updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
         0L, 0L, 0, 0); // no space
         
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[1]);
+    assertEquals(storages[1], targets[0]);
     
     targets = chooseTarget(2);
     assertEquals(targets.length, 2);
-    assertEquals(targets[0], dataNodes[1]);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[1], targets[0]);
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3);
     assertEquals(targets.length, 3);
-    assertEquals(targets[0], dataNodes[1]);
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertEquals(storages[1], targets[0]);
+    assertTrue(isOnSameRack(targets[1], targets[2]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(4);
     assertEquals(targets.length, 4);
-    assertEquals(targets[0], dataNodes[1]);
+    assertEquals(storages[1], targets[0]);
     for(int i=1; i<4; i++) {
-      assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+      assertFalse(isOnSameRack(targets[0], targets[i]));
     }
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
-               cluster.isOnSameRack(targets[2], targets[3]));
-    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+    assertTrue(isOnSameRack(targets[1], targets[2]) ||
+               isOnSameRack(targets[2], targets[3]));
+    assertFalse(isOnSameRack(targets[1], targets[3]));
 
-    dataNodes[0].updateHeartbeat(
+    updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
@@ -336,35 +366,35 @@ public class TestReplicationPolicy {
   public void testChoooseTarget4() throws Exception {
     // make data node 0 & 1 to be not qualified to choose: not enough disk space
     for(int i=0; i<2; i++) {
-      dataNodes[i].updateHeartbeat(
+      updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
       
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2);
     assertEquals(targets.length, 2);
-    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     for(int i=0; i<3; i++) {
-      assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+      assertFalse(isOnSameRack(targets[i], dataNodes[0]));
     }
-    assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
-               cluster.isOnSameRack(targets[1], targets[2]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    assertTrue(isOnSameRack(targets[0], targets[1]) ||
+               isOnSameRack(targets[1], targets[2]));
+    assertFalse(isOnSameRack(targets[0], targets[2]));
     
     for(int i=0; i<2; i++) {
-      dataNodes[i].updateHeartbeat(
+      updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
@@ -382,7 +412,7 @@ public class TestReplicationPolicy {
     DatanodeDescriptor writerDesc =
       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
 
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0, writerDesc);
     assertEquals(targets.length, 0);
 
@@ -391,12 +421,12 @@ public class TestReplicationPolicy {
 
     targets = chooseTarget(2, writerDesc);
     assertEquals(targets.length, 2);
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
 
     targets = chooseTarget(3, writerDesc);
     assertEquals(targets.length, 3);
-    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[1], targets[2]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
   }
 
   /**
@@ -426,7 +456,7 @@ public class TestReplicationPolicy {
   public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
     // make data node 0 & 1 to be not qualified to choose: not enough disk space
     for(int i=0; i<2; i++) {
-      dataNodes[i].updateHeartbeat(
+      updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
@@ -437,7 +467,7 @@ public class TestReplicationPolicy {
     
     // try to choose NUM_OF_DATANODES which is more than actually available
     // nodes.
-    DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES);
+    DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES);
     assertEquals(targets.length, NUM_OF_DATANODES - 2);
 
     final List<LoggingEvent> log = appender.getLog();
@@ -445,30 +475,42 @@ public class TestReplicationPolicy {
     assertFalse(log.size() == 0);
     final LoggingEvent lastLogEntry = log.get(log.size() - 1);
     
-    assertEquals(lastLogEntry.getLevel(), Level.WARN);
+    assertTrue(Level.WARN.isGreaterOrEqual(lastLogEntry.getLevel()));
     // Suppose to place replicas on each node but two data nodes are not
     // available for placing replica, so here we expect a short of 2
     assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
     
     for(int i=0; i<2; i++) {
-      dataNodes[i].updateHeartbeat(
+      updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
   }
 
-  private boolean containsWithinRange(DatanodeDescriptor target,
+  private boolean containsWithinRange(DatanodeStorageInfo target,
       DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
     assert startIndex >= 0 && startIndex < nodes.length;
     assert endIndex >= startIndex && endIndex < nodes.length;
     for (int i = startIndex; i <= endIndex; i++) {
-      if (nodes[i].equals(target)) {
+      if (nodes[i].equals(target.getDatanodeDescriptor())) {
         return true;
       }
     }
     return false;
   }
   
+  private boolean containsWithinRange(DatanodeDescriptor target,
+      DatanodeStorageInfo[] nodes, int startIndex, int endIndex) {
+    assert startIndex >= 0 && startIndex < nodes.length;
+    assert endIndex >= startIndex && endIndex < nodes.length;
+    for (int i = startIndex; i <= endIndex; i++) {
+      if (nodes[i].getDatanodeDescriptor().equals(target)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Test
   public void testChooseTargetWithStaleNodes() throws Exception {
     // Set dataNodes[0] as stale
@@ -477,19 +519,19 @@ public class TestReplicationPolicy {
       .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
     assertTrue(namenode.getNamesystem().getBlockManager()
         .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     // We set the datanode[0] as stale, thus should choose datanode[1] since
     // datanode[1] is on the same rack with datanode[0] (writer)
     targets = chooseTarget(1);
     assertEquals(targets.length, 1);
-    assertEquals(targets[0], dataNodes[1]);
+    assertEquals(storages[1], targets[0]);
 
     Set<Node> excludedNodes = new HashSet<Node>();
     excludedNodes.add(dataNodes[1]);
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
     targets = chooseTarget(1, chosenNodes, excludedNodes);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     // reset
     dataNodes[0].setLastUpdate(Time.now());
@@ -514,7 +556,7 @@ public class TestReplicationPolicy {
     namenode.getNamesystem().getBlockManager()
       .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
 
-    DatanodeDescriptor[] targets = chooseTarget(0);
+    DatanodeStorageInfo[] targets = chooseTarget(0);
     assertEquals(targets.length, 0);
 
     // Since we have 6 datanodes total, stale nodes should
@@ -586,11 +628,12 @@ public class TestReplicationPolicy {
           .getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
       BlockPlacementPolicy replicator = miniCluster.getNameNode()
           .getNamesystem().getBlockManager().getBlockPlacementPolicy();
-      DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
-          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+      DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
+          staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
+          BLOCK_SIZE, StorageType.DEFAULT);
 
       assertEquals(targets.length, 3);
-      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      assertFalse(isOnSameRack(targets[0], staleNodeInfo));
       
       // Step 2. Set more than half of the datanodes as stale
       for (int i = 0; i < 4; i++) {
@@ -611,10 +654,11 @@ public class TestReplicationPolicy {
       assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
           .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
       // Call chooseTarget
-      targets = replicator.chooseTarget(filename, 3,
-          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+      targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
+          new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
+          StorageType.DEFAULT);
       assertEquals(targets.length, 3);
-      assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      assertTrue(isOnSameRack(targets[0], staleNodeInfo));
       
       // Step 3. Set 2 stale datanodes back to healthy nodes, 
       // still have 2 stale nodes
@@ -636,7 +680,7 @@ public class TestReplicationPolicy {
       // Call chooseTarget
       targets = chooseTarget(3, staleNodeInfo);
       assertEquals(targets.length, 3);
-      assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
+      assertFalse(isOnSameRack(targets[0], staleNodeInfo));
     } finally {
       miniCluster.shutdown();
     }
@@ -651,26 +695,26 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testRereplicate1() throws Exception {
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
-    chosenNodes.add(dataNodes[0]);    
-    DatanodeDescriptor[] targets;
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    chosenNodes.add(storages[0]);    
+    DatanodeStorageInfo[] targets;
     
     targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], targets[1]));
     
     targets = chooseTarget(3, chosenNodes);
     assertEquals(targets.length, 3);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], targets[2]));
   }
 
   /**
@@ -682,22 +726,22 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testRereplicate2() throws Exception {
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
-    chosenNodes.add(dataNodes[0]);
-    chosenNodes.add(dataNodes[1]);
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    chosenNodes.add(storages[0]);
+    chosenNodes.add(storages[1]);
 
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[1], dataNodes[0]));
   }
 
   /**
@@ -709,31 +753,31 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testRereplicate3() throws Exception {
-    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
-    chosenNodes.add(dataNodes[0]);
-    chosenNodes.add(dataNodes[2]);
+    List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
+    chosenNodes.add(storages[0]);
+    chosenNodes.add(storages[2]);
     
-    DatanodeDescriptor[] targets;
+    DatanodeStorageInfo[] targets;
     targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
     targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
-    assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[2]));
     
     targets = chooseTarget(1, dataNodes[2], chosenNodes);
     assertEquals(targets.length, 1);
-    assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
-    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[2]));
+    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
 
     targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
-    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
     
     targets = chooseTarget(2, dataNodes[2], chosenNodes);
     assertEquals(targets.length, 2);
-    assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
+    assertTrue(isOnSameRack(targets[0], dataNodes[2]));
   }
   
   /**
@@ -1077,7 +1121,8 @@ public class TestReplicationPolicy {
     // Adding this block will increase its current replication, and that will
     // remove it from the queue.
     bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
-        ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0]);
+              ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
+            "STORAGE");
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
     // from QUEUE_VERY_UNDER_REPLICATED.
@@ -1125,11 +1170,12 @@ public class TestReplicationPolicy {
     info.setBlockCollection(mbc);
     bm.addBlockCollection(info, mbc);
 
-    DatanodeDescriptor[] dnAry = {dataNodes[0]};
+    DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
+        dataNodes[0], new DatanodeStorage("s1"))};
     final BlockInfoUnderConstruction ucBlock =
         info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
-            dnAry);
-    when(mbc.setLastBlock((BlockInfo) any(), (DatanodeDescriptor[]) any()))
+            storageAry);
+    when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
     .thenReturn(ucBlock);
 
     bm.convertLastBlockToUnderConstruction(mbc);



Mime
View raw message