hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1097671 [3/3] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/co...
Date Fri, 29 Apr 2011 03:03:27 GMT
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
Fri Apr 29 03:03:25 2011
@@ -18,141 +18,458 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.ArrayList;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import junit.framework.TestCase;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
 
-public class TestBlocksWithNotEnoughRacks extends TestCase {
+import static org.junit.Assert.*;
+import org.junit.Test;
 
+public class TestBlocksWithNotEnoughRacks {
+  public static final Log LOG = LogFactory.getLog(TestBlocksWithNotEnoughRacks.class);
   static {
-    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL) ;
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
   }
 
-  private static final Log LOG =
-    LogFactory.getLog(TestBlocksWithNotEnoughRacks.class.getName());
-  //Creates a block with all datanodes on same rack
-  //Adds additional datanode on a different rack
-  //The block should be replicated to the new rack
-  public void testSufficientlyReplicatedBlocksWithNotEnoughRacks() throws Exception {
+  /*
+   * Return a configuration object with low timeouts for testing and 
+   * a topology script set (which enables rack awareness).  
+   */
+  private Configuration getConf() {
     Configuration conf = new HdfsConfiguration();
+
+    // Lower the heart beat interval so the NN quickly learns of dead
+    // or decommissioned DNs and the NN issues replication and invalidation
+    // commands quickly (as replies to heartbeats)
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+
+    // Have the NN ReplicationMonitor compute the replication and
+    // invalidation commands to send DNs every second.
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+
+    // Have the NN check for pending replications every second so it
+    // quickly schedules additional replicas as they are identified.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
+
+    // The DNs report blocks every second.
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
+
+    // Indicates we have multiple racks
     conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+    return conf;
+  }
+
+  /*
+   * Creates a block with all datanodes on the same rack, though the block
+   * is sufficiently replicated. Adds an additional datanode on a new rack. 
+   * The block should be replicated to the new rack.
+   */
+  @Test
+  public void testSufficientlyReplBlocksUsesNewRack() throws Exception {
+    Configuration conf = getConf();
     final short REPLICATION_FACTOR = 3;
-    final String FILE_NAME = "/testFile";
-    final Path FILE_PATH = new Path(FILE_NAME);
-    //All datanodes are on the same rack
-    String racks[] = {"/rack1","/rack1","/rack1",} ;
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).racks(racks).build();
+    final Path filePath = new Path("/testFile");
+    // All datanodes are on the same rack
+    String racks[] = {"/rack1", "/rack1", "/rack1"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+
+    try {
+      // Create a file with one block with a replication factor of 3
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+
+      // Add a new datanode on a different rack
+      String newRacks[] = {"/rack2"};
+      cluster.startDataNodes(conf, 1, true, null, newRacks);
+      cluster.waitActive();
+
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Like the previous test but the block starts with a single replica,
+   * and therefore unlike the previous test the block does not start
+   * off needing replicas.
+   */
+  @Test
+  public void testSufficientlySingleReplBlockUsesNewRack() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 1;
+    final Path filePath = new Path("/testFile");
+
+    String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block with a replication factor of 1
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
+
+      REPLICATION_FACTOR = 2;
+      ns.setReplication("/testFile", REPLICATION_FACTOR);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Creates a block with all datanodes on the same rack. Add additional
+   * datanodes on a different rack and increase the replication factor, 
+   * making sure there are enough replicas across racks. If the previous
+   * test passes this one should too, however this test may pass when
+   * the previous one fails because the replication code is explicitly
+   * triggered by setting the replication factor.
+   */
+  @Test
+  public void testUnderReplicatedUsesNewRacks() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 3;
+    final Path filePath = new Path("/testFile");
+    // All datanodes are on the same rack
+    String racks[] = {"/rack1", "/rack1", "/rack1", "/rack1", "/rack1"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
     try {
-      // create a file with one block with a replication factor of 3
+      // Create a file with one block
       final FileSystem fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
-      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
       
-      Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
-      final FSNamesystem namesystem = cluster.getNamesystem();
-      int numRacks = namesystem.blockManager.getNumberOfRacks(b);
-      NumberReplicas number = namesystem.blockManager.countNodes(b);
-      int curReplicas = number.liveReplicas();
-      int neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
+      // Add new datanodes on a different rack and increase the
+      // replication factor so the block is underreplicated and make
+      // sure at least one of the hosts on the new rack is used. 
+      String newRacks[] = {"/rack2", "/rack2"};
+      cluster.startDataNodes(conf, 2, true, null, newRacks);
+      REPLICATION_FACTOR = 5;
+      ns.setReplication("/testFile", REPLICATION_FACTOR);
+
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Test that a block that is re-replicated because one of its replicas
+   * is found to be corrupt and is re-replicated across racks.
+   */
+  @Test
+  public void testCorruptBlockRereplicatedAcrossRacks() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 2;
+    int fileLen = 512;
+    final Path filePath = new Path("/testFile");
+    // Datanodes are spread across two racks
+    String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block with a replication factor of 2
+      final FileSystem fs = cluster.getFileSystem();
       
-      //Add a new datanode on a different rack
-      String newRacks[] = {"/rack2"} ;
-      cluster.startDataNodes(conf, 1, true, null, newRacks);
+      DFSTestUtil.createFile(fs, filePath, fileLen, REPLICATION_FACTOR, 1L);
+      final String fileContent = DFSTestUtil.readFile(fs, filePath);
+
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
 
-      while ( (numRacks < 2) || (curReplicas != REPLICATION_FACTOR) ||
-              (neededReplicationSize > 0) ) {
-        LOG.info("Waiting for replication");
-        Thread.sleep(600);
-        numRacks = namesystem.blockManager.getNumberOfRacks(b);
-        number = namesystem.blockManager.countNodes(b);
-        curReplicas = number.liveReplicas();
-        neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
+      // Corrupt a replica of the block
+      int dnToCorrupt = DFSTestUtil.firstDnWithBlock(cluster, b);
+      assertTrue(cluster.corruptReplica(b.getBlockName(), dnToCorrupt));
+
+      // Restart the datanode so blocks are re-scanned, and the corrupt
+      // block is detected.
+      cluster.restartDataNode(dnToCorrupt);
+
+      // Wait for the namenode to notice the corrupt replica
+      DFSTestUtil.waitCorruptReplicas(fs, ns, filePath, b, 1);
+
+      // The rack policy is still respected
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Ensure all replicas are valid (the corrupt replica may not
+      // have been cleaned up yet).
+      for (int i = 0; i < racks.length; i++) {
+        String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+        if (blockContent != null && i != dnToCorrupt) {
+          assertEquals("Corrupt replica", fileContent, blockContent);
+        }
       }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Reduce the replication factor of a file, making sure that the only
+   * cross rack replica is not removed when deleting replicas.
+   */
+  @Test
+  public void testReduceReplFactorRespectsRackPolicy() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 3;
+    final Path filePath = new Path("/testFile");
+    String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
 
-      LOG.info("curReplicas = " + curReplicas);
-      LOG.info("numRacks = " + numRacks);
-      LOG.info("Size = " + namesystem.blockManager.neededReplications.size());
+    try {
+      // Create a file with one block
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Decrease the replication factor, make sure the deleted replica
+      // was not the one that lived on the rack with only one replica,
+      // ie we should still have 2 racks after reducing the repl factor.
+      REPLICATION_FACTOR = 2;
+      ns.setReplication("/testFile", REPLICATION_FACTOR); 
 
-      assertEquals(2,numRacks);
-      assertTrue(curReplicas == REPLICATION_FACTOR);
-      assertEquals(0,namesystem.blockManager.neededReplications.size());
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
     } finally {
       cluster.shutdown();
     }
-    
   }
 
-  public void testUnderReplicatedNotEnoughRacks() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
-    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+  /*
+   * Test that when a block is replicated because a replica is lost due
+   * to host failure the the rack policy is preserved.
+   */
+  @Test
+  public void testReplDueToNodeFailRespectsRackPolicy() throws Exception {
+    Configuration conf = getConf();
     short REPLICATION_FACTOR = 3;
-    final String FILE_NAME = "/testFile";
-    final Path FILE_PATH = new Path(FILE_NAME);
-    //All datanodes are on the same rack
-    String racks[] = {"/rack1","/rack1","/rack1",} ;
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).racks(racks).build();
+    final Path filePath = new Path("/testFile");
+    // Last datanode is on a different rack
+    String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
     try {
-      // create a file with one block with a replication factor of 3
+      // Create a file with one block with a replication factor of 2
       final FileSystem fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
-      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Make the last datanode look like it failed to heartbeat by 
+      // calling removeDatanode and stopping it.
+      ArrayList<DataNode> datanodes = cluster.getDataNodes();
+      int idx = datanodes.size() - 1;
+      DataNode dataNode = datanodes.get(idx);
+      cluster.stopDataNode(idx);
+      ns.removeDatanode(dataNode.dnRegistration);
+
+      // The block should still have sufficient # replicas, across racks.
+      // The last node may not have contained a replica, but if it did
+      // it should have been replicated within the same rack.
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
       
-      Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
-      final FSNamesystem namesystem = cluster.getNamesystem();
-      int numRacks = namesystem.blockManager.getNumberOfRacks(b);
-      NumberReplicas number = namesystem.blockManager.countNodes(b);
-      int curReplicas = number.liveReplicas();
-      int neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
+      // Fail the last datanode again, it's also on rack2 so there is
+      // only 1 rack for all the replicas
+      datanodes = cluster.getDataNodes();
+      idx = datanodes.size() - 1;
+      dataNode = datanodes.get(idx);
+      cluster.stopDataNode(idx);
+      ns.removeDatanode(dataNode.dnRegistration);
+
+      // Make sure we have enough live replicas even though we are
+      // short one rack and therefore need one replica
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /*
+   * Test that when the excess replicas of a block are reduced due to
+   * a node re-joining the cluster the rack policy is not violated.
+   */
+  @Test
+  public void testReduceReplFactorDueToRejoinRespectsRackPolicy() 
+      throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 2;
+    final Path filePath = new Path("/testFile");
+    // Last datanode is on a different rack
+    String racks[] = {"/rack1", "/rack1", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Make the last (cross rack) datanode look like it failed
+      // to heartbeat by stopping it and calling removeDatanode.
+      ArrayList<DataNode> datanodes = cluster.getDataNodes();
+      assertEquals(3, datanodes.size());
+      DataNode dataNode = datanodes.get(2);
+      cluster.stopDataNode(2);
+      ns.removeDatanode(dataNode.dnRegistration);
+
+      // The block gets re-replicated to another datanode so it has a 
+      // sufficient # replicas, but not across racks, so there should
+      // be 1 rack, and 1 needed replica (even though there are 2 hosts 
+      // available and only 2 replicas required).
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+
+      // Start the "failed" datanode, which has a replica so the block is
+      // now over-replicated and therefore a replica should be removed but
+      // not on the restarted datanode as that would violate the rack policy.
+      String rack2[] = {"/rack2"};
+      cluster.startDataNodes(conf, 1, true, null, rack2);
+      cluster.waitActive();      
       
-      //Add a new datanode on a different rack
-      String newRacks[] = {"/rack2","/rack2","/rack2"} ;
-      cluster.startDataNodes(conf, 3, true, null, newRacks);
-      REPLICATION_FACTOR = 5;
-      namesystem.setReplication(FILE_NAME, REPLICATION_FACTOR); 
+      // The block now has sufficient # replicas, across racks
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 
-      while ( (numRacks < 2) || (curReplicas < REPLICATION_FACTOR) ||
-              (neededReplicationSize > 0) ) {
-        LOG.info("Waiting for replication");
-        Thread.sleep(600);
-        numRacks = namesystem.blockManager.getNumberOfRacks(b);
-        number = namesystem.blockManager.countNodes(b);
-        curReplicas = number.liveReplicas();
-        neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
-      }
+  /*
+   * Test that rack policy is still respected when blocks are replicated
+   * due to node decommissioning.
+   */
+  @Test
+  public void testNodeDecomissionRespectsRackPolicy() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 2;
+    final Path filePath = new Path("/testFile");
+
+    // Configure an excludes file
+    FileSystem localFileSys = FileSystem.getLocal(conf);
+    Path workingDir = localFileSys.getWorkingDirectory();
+    Path dir = new Path(workingDir, "build/test/data/temp/decommission");
+    Path excludeFile = new Path(dir, "exclude");
+    assertTrue(localFileSys.mkdirs(dir));
+    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
+    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+
+    // Two blocks and four racks
+    String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Decommission one of the hosts with the block, this should cause 
+      // the block to get replicated to another host on the same rack,
+      // otherwise the rack policy is violated.
+      BlockLocation locs[] = fs.getFileBlockLocations(
+          fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
+      String name = locs[0].getNames()[0];
+      DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+      ns.refreshNodes(conf);
+      DFSTestUtil.waitForDecommission(fs, name);
 
-      LOG.info("curReplicas = " + curReplicas);
-      LOG.info("numRacks = " + numRacks);
-      LOG.info("Size = " + namesystem.blockManager.neededReplications.size());
+      // Check the block still has sufficient # replicas across racks
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Test that rack policy is still respected when blocks are replicated
+   * due to node decommissioning, when the blocks are over-replicated.
+   */
+  @Test
+  public void testNodeDecomissionWithOverreplicationRespectsRackPolicy() 
+      throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 5;
+    final Path filePath = new Path("/testFile");
+
+    // Configure an excludes file
+    FileSystem localFileSys = FileSystem.getLocal(conf);
+    Path workingDir = localFileSys.getWorkingDirectory();
+    Path dir = new Path(workingDir, "build/test/data/temp/decommission");
+    Path excludeFile = new Path(dir, "exclude");
+    assertTrue(localFileSys.mkdirs(dir));
+    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
+    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+
+    // All hosts are on two racks, only one host on /rack2
+    String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Lower the replication factor so the blocks are over replicated
+      REPLICATION_FACTOR = 2;
+      fs.setReplication(filePath, REPLICATION_FACTOR);
+
+      // Decommission one of the hosts with the block that is not on
+      // the lone host on rack2 (if we decomission that host it would
+      // be impossible to respect the rack policy).
+      BlockLocation locs[] = fs.getFileBlockLocations(
+          fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
+      for (String top : locs[0].getTopologyPaths()) {
+        if (!top.startsWith("/rack2")) {
+          String name = top.substring("/rack1".length()+1);
+          DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+          ns.refreshNodes(conf);
+          DFSTestUtil.waitForDecommission(fs, name);
+          break;
+        }
+      }
 
-      assertEquals(2,numRacks);
-      assertTrue(curReplicas == REPLICATION_FACTOR);
-      assertEquals(0,namesystem.blockManager.neededReplications.size());
+      // Check the block still has sufficient # replicas across racks,
+      // ie we didn't remove the replica on the host on /rack1.
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
     } finally {
       cluster.shutdown();
     }
-    
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
Fri Apr 29 03:03:25 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.tools.DFSA
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
@@ -736,11 +737,13 @@ public class TestCheckpoint extends Test
   public void testSaveNamespace() throws IOException {
     MiniDFSCluster cluster = null;
     DistributedFileSystem fs = null;
+    FileContext fc;
     try {
       Configuration conf = new HdfsConfiguration();
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
       cluster.waitActive();
       fs = (DistributedFileSystem)(cluster.getFileSystem());
+      fc = FileContext.getFileContext(cluster.getURI());
 
       // Saving image without safe mode should fail
       DFSAdmin admin = new DFSAdmin(conf);
@@ -756,6 +759,12 @@ public class TestCheckpoint extends Test
       Path file = new Path("namespace.dat");
       writeFile(fs, file, replication);
       checkFile(fs, file, replication);
+
+      // create new link
+      Path symlink = new Path("file.link");
+      fc.createSymlink(file, symlink, false);
+      assertTrue(fc.getFileLinkStatus(symlink).isSymlink());
+
       // verify that the edits file is NOT empty
       Collection<URI> editsDirs = cluster.getNameEditsDirs();
       for(URI uri : editsDirs) {
@@ -784,6 +793,8 @@ public class TestCheckpoint extends Test
       cluster.waitActive();
       fs = (DistributedFileSystem)(cluster.getFileSystem());
       checkFile(fs, file, replication);
+      fc = FileContext.getFileContext(cluster.getURI());
+      assertTrue(fc.getFileLinkStatus(symlink).isSymlink());
     } finally {
       if(fs != null) fs.close();
       if(cluster!= null) cluster.shutdown();

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
Fri Apr 29 03:03:25 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.net.URL;
 import java.util.Collection;
@@ -31,7 +32,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
 import org.junit.Test;
 
 /** A JUnit test for corrupt_files.jsp */
@@ -82,7 +82,7 @@ public class TestCorruptFilesJsp  {
       for (int idx = 0; idx < filepaths.length - 1; idx++) {
         String blockName = DFSTestUtil.getFirstBlock(fs, filepaths[idx])
             .getBlockName();
-        TestDatanodeBlockScanner.corruptReplica(blockName, 0);
+        assertTrue(cluster.corruptReplica(blockName, 0));
 
         // read the file so that the corrupt block is reported to NN
         FSDataInputStream in = fs.open(filepaths[idx]);

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
Fri Apr 29 03:03:25 2011
@@ -270,7 +270,7 @@ public class TestEditLogRace {
 
 
         LOG.info("Save " + i + ": entering safe mode");
-        namesystem.enterSafeMode();
+        namesystem.enterSafeMode(false);
 
         // Verify edit logs before the save
         // They should start with the first edit after the checkpoint
@@ -289,7 +289,6 @@ public class TestEditLogRace {
 
         namesystem.leaveSafeMode(false);
         LOG.info("Save " + i + ": complete");
-
       }
     } finally {
       stopTransactionWorkers();

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNodeCount.java
Fri Apr 29 03:03:25 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,12 +36,21 @@ import junit.framework.TestCase;
 /**
  * Test if live nodes count per node is correct 
  * so NN makes right decision for under/over-replicated blocks
+ * 
+ * Two of the "while" loops below use "busy wait"
+ * because they are detecting transient states.
  */
 public class TestNodeCount extends TestCase {
+  final short REPLICATION_FACTOR = (short)2;
+  final long TIMEOUT = 20000L;
+  long timeout = 0;
+  long failtime = 0;
+  Block lastBlock = null;
+  NumberReplicas lastNum = null;
+
   public void testNodeCount() throws Exception {
     // start a mini dfs cluster of 2 nodes
     final Configuration conf = new HdfsConfiguration();
-    final short REPLICATION_FACTOR = (short)2;
     final MiniDFSCluster cluster = 
       new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).build();
     try {
@@ -83,13 +93,11 @@ public class TestNodeCount extends TestC
       cluster.restartDataNode(dnprop);
       cluster.waitActive();
       
-      // check if excessive replica is detected
-      NumberReplicas num = null;
-      do {
-       synchronized (namesystem) {
-         num = namesystem.blockManager.countNodes(block);
-       }
-      } while (num.excessReplicas() == 0);
+      // check if excessive replica is detected (transient)
+      initializeTimeout(TIMEOUT);
+      while (countNodes(block, namesystem).excessReplicas() == 0) {
+        checkTimeout("excess replicas not detected");
+      }
       
       // find out a non-excess node
       Iterator<DatanodeDescriptor> iter = namesystem.blockManager.blocksMap.nodeIterator(block);
@@ -119,20 +127,65 @@ public class TestNodeCount extends TestC
       }
 
       // The block should be replicated
-      do {
-        num = namesystem.blockManager.countNodes(block);
-      } while (num.liveReplicas() != REPLICATION_FACTOR);
+      initializeTimeout(TIMEOUT);
+      while (countNodes(block, namesystem).liveReplicas() != REPLICATION_FACTOR) {
+        checkTimeout("live replica count not correct", 1000);
+      }
 
       // restart the first datanode
       cluster.restartDataNode(dnprop);
       cluster.waitActive();
 
-      // check if excessive replica is detected
-      do {
-        num = namesystem.blockManager.countNodes(block);
-      } while (num.excessReplicas() != 2);
+      // check if excessive replica is detected (transient)
+      initializeTimeout(TIMEOUT);
+      while (countNodes(block, namesystem).excessReplicas() != 2) {
+        checkTimeout("excess replica count not equal to 2");
+      }
+
     } finally {
       cluster.shutdown();
     }
   }
+  
+  void initializeTimeout(long timeout) {
+    this.timeout = timeout;
+    this.failtime = System.currentTimeMillis()
+        + ((timeout <= 0) ? Long.MAX_VALUE : timeout);
+  }
+  
+  /* busy wait on transient conditions */
+  void checkTimeout(String testLabel) throws TimeoutException {
+    checkTimeout(testLabel, 0);
+  }
+  
+  /* check for timeout, then wait for cycleTime msec */
+  void checkTimeout(String testLabel, long cycleTime) throws TimeoutException {
+    if (System.currentTimeMillis() > failtime) {
+      throw new TimeoutException("Timeout: "
+          + testLabel + " for block " + lastBlock + " after " + timeout 
+          + " msec.  Last counts: live = " + lastNum.liveReplicas()
+          + ", excess = " + lastNum.excessReplicas()
+          + ", corrupt = " + lastNum.corruptReplicas());   
+    }
+    if (cycleTime > 0) {
+      try {
+        Thread.sleep(cycleTime);
+      } catch (InterruptedException ie) {
+        //ignore
+      }
+    }
+  }
+
+  /* threadsafe read of the replication counts for this block */
+  NumberReplicas countNodes(Block block, FSNamesystem namesystem) {
+    namesystem.readLock();
+    try {
+      lastBlock = block;
+      lastNum = namesystem.blockManager.countNodes(block);
+      return lastNum;
+    }
+    finally {
+      namesystem.readUnlock();
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
Fri Apr 29 03:03:25 2011
@@ -54,7 +54,7 @@ public class TestOverReplicatedBlocks ex
       
       // corrupt the block on datanode 0
       Block block = DFSTestUtil.getFirstBlock(fs, fileName);
-      TestDatanodeBlockScanner.corruptReplica(block.getBlockName(), 0);
+      assertTrue(cluster.corruptReplica(block.getBlockName(), 0));
       DataNodeProperties dnProps = cluster.stopDataNode(0);
       // remove block scanner log to trigger block scanning
       File scanLog = new File(System.getProperty("test.build.data"),

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
(original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
Fri Apr 29 03:03:25 2011
@@ -238,11 +238,11 @@ public class TestOfflineEditsViewer {
     // compares position to limit
     if(!small.equals(large)) { return false; }
 
-    // everything after limit should be zero 
+    // everything after limit should be 0xFF
     int i = large.limit();
     large.clear();
     for(; i < large.capacity(); i++) {
-      if(large.get(i) != 0) {
+      if(large.get(i) != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
         return false;
       }
     }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
Binary files - no diff available.

Propchange: hadoop/hdfs/branches/HDFS-1073/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/webapps/datanode:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/datanode:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/webapps/datanode:820487
-/hadoop/hdfs/trunk/src/webapps/datanode:1086482-1095244
+/hadoop/hdfs/trunk/src/webapps/datanode:1086482-1097628

Propchange: hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/webapps/hdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/webapps/hdfs:820487
-/hadoop/hdfs/trunk/src/webapps/hdfs:1086482-1095244
+/hadoop/hdfs/trunk/src/webapps/hdfs:1086482-1097628

Propchange: hadoop/hdfs/branches/HDFS-1073/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/webapps/secondary:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/webapps/secondary:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/webapps/secondary:820487
-/hadoop/hdfs/trunk/src/webapps/secondary:1086482-1095244
+/hadoop/hdfs/trunk/src/webapps/secondary:1086482-1097628



Mime
View raw message