hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r1040417 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/raid/
Date Tue, 30 Nov 2010 06:23:56 GMT
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
Tue Nov 30 06:23:55 2010
@@ -33,7 +33,9 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.zip.CRC32;
 
-import junit.framework.TestCase;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -62,12 +64,12 @@ import org.apache.hadoop.raid.RaidNode;
 import org.apache.hadoop.raid.RaidUtils;
 
 
-public class TestBlockFixer extends TestCase {
+public class TestBlockFixer {
   final static Log LOG = LogFactory.getLog(
                             "org.apache.hadoop.raid.TestBlockFixer");
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
       "build/contrib/raid/test/data")).getAbsolutePath();
-  final static String CONFIG_FILE = new File(TEST_DIR,
+  final static String CONFIG_FILE = new File(TEST_DIR, 
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
   final static int NUM_DATANODES = 3;
@@ -82,13 +84,37 @@ public class TestBlockFixer extends Test
   Random rand = new Random();
 
   /**
+   * Tests isXorParityFile and isRsParityFile
+   */
+  @Test
+  public void testIsParityFile() throws IOException {
+    Configuration testConf = new Configuration();
+    testConf.set("hdfs.raid.locations", "/raid");
+    testConf.set("hdfs.raidrs.locations", "/raidrs");
+
+    BlockFixer.BlockFixerHelper helper =
+      new BlockFixer.BlockFixerHelper(testConf);
+
+    assertFalse("incorrectly identified rs parity file as xor parity file",
+                helper.isXorParityFile(new Path("/raidrs/test/test")));
+    assertTrue("could not identify rs parity file",
+               helper.isRsParityFile(new Path("/raidrs/test/test")));
+    assertTrue("could not identify xor parity file",
+               helper.isXorParityFile(new Path("/raid/test/test")));
+    assertFalse("incorrectly identified xor parity file as rs parity file",
+                helper.isRsParityFile(new Path("/raid/test/test")));
+  }
+
+
+  /**
    * Test the filtering of trash files from the list of corrupt files.
    */
+  @Test
   public void testTrashFilter() {
     List<Path> files = new LinkedList<Path>();
     // Paths that do not match the trash pattern.
     Path p1 = new Path("/user/raid/raidtest/f1");
-    Path p2 = new Path("/user/.Trash/");
+    Path p2 = new Path("/user/.Trash/"); 
     // Paths that match the trash pattern.
     Path p3 = new Path("/user/raid/.Trash/raidtest/f1");
     Path p4 = new Path("/user/raid/.Trash/");
@@ -100,21 +126,33 @@ public class TestBlockFixer extends Test
     Configuration conf = new Configuration();
     RaidUtils.filterTrash(conf, files);
 
-    assertEquals(2, files.size());
+    assertEquals("expected 2 non-trash files but got " + files.size(),
+                 2, files.size());
     for (Path p: files) {
-      assertTrue(p == p1 || p == p2);
+      assertTrue("wrong file returned by filterTrash",
+                 p == p1 || p == p2);
     }
   }
 
+  @Test
+  public void testBlockFixDist() throws Exception {
+    implBlockFix(false);
+  }
+
+  @Test
+  public void testBlockFixLocal() throws Exception {
+    implBlockFix(true);
+  }
+
   /**
    * Create a file with three stripes, corrupt a block each in two stripes,
    * and wait for the the file to be fixed.
    */
-  public void testBlockFix() throws Exception {
+  private void implBlockFix(boolean local) throws Exception {
     LOG.info("Test testBlockFix started.");
     long blockSize = 8192L;
     int stripeLength = 3;
-    mySetup(stripeLength, -1);
+    mySetup(stripeLength, -1); // never har
     Path file1 = new Path("/user/dhruba/raidtest/file1");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
@@ -126,32 +164,41 @@ public class TestBlockFixer extends Test
     Configuration localConf = new Configuration(conf);
     localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
     localConf.setInt("raid.blockfix.interval", 1000);
+    if (local) {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.LocalBlockFixer");
+    } else {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.DistBlockFixer");
+    }
     localConf.setLong("raid.blockfix.filespertask", 2L);
 
     try {
       cnode = RaidNode.createRaidNode(null, localConf);
       TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
       cnode.stop(); cnode.join();
-
+      
       FileStatus srcStat = fileSys.getFileStatus(file1);
       DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
       LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
         dfs, file1.toUri().getPath(), 0, srcStat.getLen());
 
-      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 0);
-      assertEquals(0, cnode.blockFixer.filesFixed());
-
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
+      
       // Corrupt blocks in two different stripes. We can fix them.
       int[] corruptBlockIdxs = new int[]{0, 4, 6};
       for (int idx: corruptBlockIdxs)
         corruptBlock(locs.get(idx).getBlock().getBlockName());
       reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
-
-      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 1);
-      assertEquals(corruptFiles[0], file1.toUri().getPath());
-      assertEquals(3,
+      
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("file not corrupted", 1, corruptFiles.length);
+      assertEquals("wrong file corrupted",
+                   corruptFiles[0], file1.toUri().getPath());
+      assertEquals("wrong number of corrupt blocks", 3,
         RaidDFSUtil.corruptBlocksInFile(dfs, file1.toUri().getPath(), 0,
           srcStat.getLen()).size());
 
@@ -162,13 +209,15 @@ public class TestBlockFixer extends Test
         LOG.info("Test testBlockFix waiting for files to be fixed.");
         Thread.sleep(1000);
       }
-      assertEquals(1, cnode.blockFixer.filesFixed());
-
+      assertEquals("file not fixed", 1, cnode.blockFixer.filesFixed());
+      
       dfs = getDFS(conf, dfs);
-      assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
 
     } catch (Exception e) {
-      LOG.info("Test testBlockFix Exception " + e + StringUtils.stringifyException(e));
+      LOG.info("Test testBlockFix Exception " + e +
+               StringUtils.stringifyException(e));
       throw e;
     } finally {
       myTearDown();
@@ -182,58 +231,71 @@ public class TestBlockFixer extends Test
    * regenerated. Now stop RaidNode and corrupt the generated block.
    * Test that corruption in the generated block can be detected by clients.
    */
-  public void testGeneratedBlock() throws Exception {
-    LOG.info("Test testGeneratedBlock started.");
+  private void generatedBlockTestCommon(String testName, int blockToCorrupt,
+                                        boolean local) throws Exception {
+    LOG.info("Test " + testName + " started.");
     long blockSize = 8192L;
     int stripeLength = 3;
-    mySetup(stripeLength, -1);
+    mySetup(stripeLength, -1); // never har
     Path file1 = new Path("/user/dhruba/raidtest/file1");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     long crc1 = TestRaidDfs.createTestFile(fileSys, file1, 1, 7, blockSize);
     long file1Len = fileSys.getFileStatus(file1).getLen();
-    LOG.info("Test testGeneratedBlock created test files");
+    LOG.info("Test " + testName + " created test files");
 
     // create an instance of the RaidNode
     Configuration localConf = new Configuration(conf);
     localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
     localConf.setInt("raid.blockfix.interval", 1000);
+    if (local) {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.LocalBlockFixer");
+    } else {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.DistBlockFixer");
+    }
     localConf.setLong("raid.blockfix.filespertask", 2L);
     try {
       cnode = RaidNode.createRaidNode(null, localConf);
       TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
       cnode.stop(); cnode.join();
-
+      
       FileStatus srcStat = fileSys.getFileStatus(file1);
       DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
       LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
         dfs, file1.toUri().getPath(), 0, srcStat.getLen());
 
-      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 0);
-      assertEquals(0, cnode.blockFixer.filesFixed());
-
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
+      
       corruptBlock(locs.get(0).getBlock().getBlockName());
       reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize);
-
-      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 1);
-      assertEquals(corruptFiles[0], file1.toUri().getPath());
-
+      
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("file not corrupted",
+                   1, corruptFiles.length);
+      assertEquals("wrong file corrupted",
+                   corruptFiles[0], file1.toUri().getPath());
+      
       cnode = RaidNode.createRaidNode(null, localConf);
       long start = System.currentTimeMillis();
       while (cnode.blockFixer.filesFixed() < 1 &&
              System.currentTimeMillis() - start < 120000) {
-        LOG.info("Test testGeneratedBlock waiting for files to be fixed.");
+        LOG.info("Test " + testName + " waiting for files to be fixed.");
         Thread.sleep(1000);
       }
-      assertEquals(1, cnode.blockFixer.filesFixed());
-
+      assertEquals("file not fixed",
+                   1, cnode.blockFixer.filesFixed());
+      
       // Stop RaidNode
       cnode.stop(); cnode.join(); cnode = null;
 
       // The block has successfully been reconstructed.
       dfs = getDFS(conf, dfs);
-      assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
 
       // Now corrupt the generated block.
       locs = RaidDFSUtil.getBlockLocations(
@@ -252,34 +314,97 @@ public class TestBlockFixer extends Test
       } catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
       }
     } catch (Exception e) {
-      LOG.info("Test testGeneratedBlock Exception " + e + StringUtils.stringifyException(e));
+      LOG.info("Test " + testName + " Exception " + e +
+               StringUtils.stringifyException(e));
       throw e;
     } finally {
       myTearDown();
     }
-    LOG.info("Test testGeneratedBlock completed.");
+    LOG.info("Test " + testName + " completed.");
+  }
+
+  /**
+   * Tests integrity of generated block.
+   * Create a file and delete a block entirely. Wait for the block to be
+   * regenerated. Now stop RaidNode and corrupt the generated block.
+   * Test that corruption in the generated block can be detected by clients.
+   */
+  @Test
+  public void testGeneratedBlockDist() throws Exception {
+    generatedBlockTestCommon("testGeneratedBlock", 3, false);
+  }
+
+  /**
+   * Tests integrity of generated block.
+   * Create a file and delete a block entirely. Wait for the block to be
+   * regenerated. Now stop RaidNode and corrupt the generated block.
+   * Test that corruption in the generated block can be detected by clients.
+   */
+  @Test
+  public void testGeneratedBlockLocal() throws Exception {
+    generatedBlockTestCommon("testGeneratedBlock", 3, true);
+  }
+
+  /**
+   * Tests integrity of generated last block.
+   * Create a file and delete a block entirely. Wait for the block to be
+   * regenerated. Now stop RaidNode and corrupt the generated block.
+   * Test that corruption in the generated block can be detected by clients.
+   */
+  @Test
+  public void testGeneratedLastBlockDist() throws Exception {
+    generatedBlockTestCommon("testGeneratedLastBlock", 6, false);
+  }
+
+  /**
+   * Tests integrity of generated last block.
+   * Create a file and delete a block entirely. Wait for the block to be
+   * regenerated. Now stop RaidNode and corrupt the generated block.
+   * Test that corruption in the generated block can be detected by clients.
+   */
+  @Test
+  public void testGeneratedLastBlockLocal() throws Exception {
+    generatedBlockTestCommon("testGeneratedLastBlock", 6, true);
+  }
+
+  @Test
+  public void testParityBlockFixDist() throws Exception {
+    implParityBlockFix("testParityBlockFixDist", false);
+  }
+
+  @Test
+  public void testParityBlockFixLocal() throws Exception {
+    implParityBlockFix("testParityBlockFixLocal", true);
   }
 
   /**
    * Corrupt a parity file and wait for it to get fixed.
    */
-  public void testParityBlockFix() throws Exception {
-    LOG.info("Test testParityBlockFix started.");
+  private void implParityBlockFix(String testName, boolean local)
+    throws Exception {
+    LOG.info("Test " + testName + " started.");
     long blockSize = 8192L;
     int stripeLength = 3;
-    mySetup(stripeLength, -1);
+    mySetup(stripeLength, -1); // never har
     Path file1 = new Path("/user/dhruba/raidtest/file1");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     Path parityFile = new Path("/destraid/user/dhruba/raidtest/file1");
     TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
                                                           1, 7, blockSize);
     long file1Len = fileSys.getFileStatus(file1).getLen();
-    LOG.info("Test testParityBlockFix created test files");
+    LOG.info("Test " + testName + " created test files");
 
     // create an instance of the RaidNode
     Configuration localConf = new Configuration(conf);
     localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
     localConf.setInt("raid.blockfix.interval", 1000);
+    if (local) {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.LocalBlockFixer");
+    } else {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.DistBlockFixer");
+    }
     localConf.setLong("raid.blockfix.filespertask", 2L);
 
     try {
@@ -294,9 +419,10 @@ public class TestBlockFixer extends Test
       LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
         dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
 
-      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 0);
-      assertEquals(0, cnode.blockFixer.filesFixed());
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
 
       // Corrupt parity blocks for different stripes.
       int[] corruptBlockIdxs = new int[]{0, 1, 2};
@@ -304,34 +430,50 @@ public class TestBlockFixer extends Test
         corruptBlock(locs.get(idx).getBlock().getBlockName());
       reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs, blockSize);
 
-      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 1);
-      assertEquals(corruptFiles[0], parityFile.toUri().getPath());
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("file not corrupted",
+                   1, corruptFiles.length);
+      assertEquals("wrong file corrupted",
+                   corruptFiles[0], parityFile.toUri().getPath());
 
       cnode = RaidNode.createRaidNode(null, localConf);
       long start = System.currentTimeMillis();
       while (cnode.blockFixer.filesFixed() < 1 &&
              System.currentTimeMillis() - start < 120000) {
-        LOG.info("Test testParityBlockFix waiting for files to be fixed.");
+        LOG.info("Test " + testName + " waiting for files to be fixed.");
         Thread.sleep(1000);
       }
-      assertEquals(1, cnode.blockFixer.filesFixed());
+      assertEquals("file not fixed",
+                   1, cnode.blockFixer.filesFixed());
 
       long checkCRC = getCRC(fileSys, parityFile);
 
-      assertEquals(parityCRC, checkCRC);
+      assertEquals("file not fixed",
+                   parityCRC, checkCRC);
 
     } catch (Exception e) {
-      LOG.info("Test testParityBlockFix Exception " + e + StringUtils.stringifyException(e));
+      LOG.info("Test " + testName + " Exception " + e +
+               StringUtils.stringifyException(e));
       throw e;
     } finally {
       myTearDown();
     }
-    LOG.info("Test testParityBlockFix completed.");
+    LOG.info("Test " + testName + " completed.");
+  }
+
+  @Test
+  public void testParityHarBlockFixDist() throws Exception {
+    implParityHarBlockFix("testParityHarBlockFixDist", false);
   }
 
-  public void testParityHarBlockFix() throws Exception {
-    LOG.info("Test testParityHarBlockFix started.");
+  @Test
+  public void testParityHarBlockFixLocal() throws Exception {
+    implParityHarBlockFix("testParityHarBlockFixLocal", true);
+  }
+
+  private void implParityHarBlockFix(String testName, boolean local)
+    throws Exception {
+    LOG.info("Test " + testName + " started.");
     long blockSize = 8192L;
     int stripeLength = 3;
     mySetup(stripeLength, 0); // Time before har = 0 days.
@@ -339,9 +481,9 @@ public class TestBlockFixer extends Test
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     // Parity file will have 7 blocks.
     TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
-                                                          1, 20, blockSize);
+                                               1, 20, blockSize);
     long file1Len = fileSys.getFileStatus(file1).getLen();
-    LOG.info("Test testParityHarBlockFix created test files");
+    LOG.info("Test " + testName + " created test files");
 
     // create an instance of the RaidNode
     // HAR block size = 2 * src block size = 2 * parity block size.
@@ -349,18 +491,26 @@ public class TestBlockFixer extends Test
     localConf.setLong("har.block.size", blockSize * 2);
     localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
     localConf.setInt("raid.blockfix.interval", 1000);
+    if (local) {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.LocalBlockFixer");
+    } else {
+      localConf.set("raid.blockfix.classname",
+                    "org.apache.hadoop.raid.DistBlockFixer");
+    }
     localConf.setLong("raid.blockfix.filespertask", 2L);
 
     try {
       cnode = RaidNode.createRaidNode(null, localConf);
       Path harDirectory =
-        new Path("/destraid/user/dhruba/raidtest/raidtest" + RaidNode.HAR_SUFFIX);
+        new Path("/destraid/user/dhruba/raidtest/raidtest" +
+                 RaidNode.HAR_SUFFIX);
       long start = System.currentTimeMillis();
       while (System.currentTimeMillis() - start < 1000 * 120) {
         if (fileSys.exists(harDirectory)) {
           break;
         }
-        LOG.info("Test testParityHarBlockFix waiting for har");
+        LOG.info("Test " + testName + " waiting for har");
         Thread.sleep(1000);
       }
 
@@ -371,12 +521,14 @@ public class TestBlockFixer extends Test
       LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
         dfs, partFile.toUri().getPath(), 0, partStat.getLen());
       // 7 parity blocks => 4 har blocks.
-      assertEquals(4, locs.getLocatedBlocks().size());
+      assertEquals("wrong number of har blocks",
+                   4, locs.getLocatedBlocks().size());
       cnode.stop(); cnode.join();
 
-      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 0);
-      assertEquals(0, cnode.blockFixer.filesFixed());
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
 
       // Corrupt parity blocks for different stripes.
       int[] corruptBlockIdxs = new int[]{0, 3};
@@ -385,35 +537,252 @@ public class TestBlockFixer extends Test
       reportCorruptBlocks(dfs, partFile, corruptBlockIdxs,
         partStat.getBlockSize());
 
-      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
-      assertEquals(corruptFiles.length, 1);
-      assertEquals(corruptFiles[0], partFile.toUri().getPath());
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("file not corrupted", 1, corruptFiles.length);
+      assertEquals("wrong file corrupted",
+                   corruptFiles[0], partFile.toUri().getPath());
 
       cnode = RaidNode.createRaidNode(null, localConf);
       start = System.currentTimeMillis();
       while (cnode.blockFixer.filesFixed() < 1 &&
              System.currentTimeMillis() - start < 120000) {
-        LOG.info("Test testParityHarBlockFix waiting for files to be fixed.");
+        LOG.info("Test " + testName + " waiting for files to be fixed.");
         Thread.sleep(1000);
       }
-      assertEquals(1, cnode.blockFixer.filesFixed());
+      assertEquals("file not fixed",
+                   1, cnode.blockFixer.filesFixed());
 
       long checkCRC = getCRC(fileSys, partFile);
 
-      assertEquals(partCRC, checkCRC);
+      assertEquals("file not fixed",
+                   partCRC, checkCRC);
     } catch (Exception e) {
-      LOG.info("Test testParityHarBlockFix Exception " + e + StringUtils.stringifyException(e));
+      LOG.info("Test " + testName + " Exception " + e +
+               StringUtils.stringifyException(e));
       throw e;
     } finally {
       myTearDown();
     }
-    LOG.info("Test testParityHarBlockFix completed.");
+    LOG.info("Test " + testName + " completed.");
+  }
+
+
+  /**
+   * tests that we can have 2 concurrent jobs fixing files 
+   * (dist block fixer)
+   */
+  @Test
+  public void testConcurrentJobs() throws Exception {
+    LOG.info("Test testConcurrentJobs started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1); // never har
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path file2 = new Path("/user/dhruba/raidtest/file2");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 20, blockSize);
+    long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
+                                                          1, 20, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    long file2Len = fileSys.getFileStatus(file2).getLen();
+    LOG.info("Test testConcurrentJobs created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.set("raid.blockfix.classname", 
+                  "org.apache.hadoop.raid.DistBlockFixer");
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
+      cnode.stop(); cnode.join();
+
+      FileStatus file1Stat = fileSys.getFileStatus(file1);
+      FileStatus file2Stat = fileSys.getFileStatus(file2);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks file1Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
+                                      0, file1Stat.getLen());
+      LocatedBlocks file2Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
+                                      0, file2Stat.getLen());
+      
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
+
+      // corrupt file1
+      int[] corruptBlockIdxs = new int[]{0, 4, 6};
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
+      long start = System.currentTimeMillis();
+
+      while (blockFixer.jobsRunning() < 1 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for fixing job 1 to start");
+        Thread.sleep(10);
+      }
+      assertEquals("job 1 not running", 1, blockFixer.jobsRunning());
+
+      // corrupt file2
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
+      
+      while (blockFixer.jobsRunning() < 2 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for fixing job 2 to start");
+        Thread.sleep(10);
+      }
+      assertEquals("2 jobs not running", 2, blockFixer.jobsRunning());
+
+      while (blockFixer.filesFixed() < 2 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for files to be fixed.");
+        Thread.sleep(10);
+      }
+      assertEquals("files not fixed", 2, blockFixer.filesFixed());
+
+      dfs = getDFS(conf, dfs);
+      
+      try {
+        Thread.sleep(5*1000);
+      } catch (InterruptedException ignore) {
+      }
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
+    } catch (Exception e) {
+      LOG.info("Test testConcurrentJobs exception " + e +
+               StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+
+  }
+
+  /**
+   * tests that the distributed block fixer obeys
+   * the limit on how many files to fix simultaneously
+   */
+  @Test
+  public void testMaxPendingFiles() throws Exception {
+    LOG.info("Test testMaxPendingFiles started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1); // never har
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path file2 = new Path("/user/dhruba/raidtest/file2");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 20, blockSize);
+    long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
+                                                          1, 20, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    long file2Len = fileSys.getFileStatus(file2).getLen();
+    LOG.info("Test testMaxPendingFiles created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.set("raid.blockfix.classname", 
+                  "org.apache.hadoop.raid.DistBlockFixer");
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+    localConf.setLong("raid.blockfix.maxpendingfiles", 1L);
+
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
+      cnode.stop(); cnode.join();
+
+      FileStatus file1Stat = fileSys.getFileStatus(file1);
+      FileStatus file2Stat = fileSys.getFileStatus(file2);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks file1Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
+                                      0, file1Stat.getLen());
+      LocatedBlocks file2Loc =
+        RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
+                                      0, file2Stat.getLen());
+
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      assertEquals("no corrupt files expected", 0, corruptFiles.length);
+      assertEquals("filesFixed() should return 0 before fixing files",
+                   0, cnode.blockFixer.filesFixed());
+
+      // corrupt file1
+      int[] corruptBlockIdxs = new int[]{0, 4, 6};
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
+      long start = System.currentTimeMillis();
+
+      while (blockFixer.jobsRunning() < 1 &&
+             System.currentTimeMillis() - start < 240000) {
+        LOG.info("Test testBlockFix waiting for fixing job 1 to start");
+        Thread.sleep(10);
+      }
+      assertEquals("job not running", 1, blockFixer.jobsRunning());
+
+      // corrupt file2
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+      
+      // wait until both files are fixed
+      while (blockFixer.filesFixed() < 2 &&
+             System.currentTimeMillis() - start < 240000) {
+        // make sure the block fixer does not start a second job while
+        // the first one is still running
+        assertTrue("too many jobs running", blockFixer.jobsRunning() <= 1);
+        Thread.sleep(10);
+      }
+      assertEquals("files not fixed", 2, blockFixer.filesFixed());
+
+      dfs = getDFS(conf, dfs);
+      
+      try {
+        Thread.sleep(5*1000);
+      } catch (InterruptedException ignore) {
+      }
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+      assertTrue("file not fixed",
+                 TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
+    } catch (Exception e) {
+      LOG.info("Test testMaxPendingFiles exception " + e +
+               StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+
   }
 
   private static DistributedFileSystem getDFS(
         Configuration conf, FileSystem dfs) throws IOException {
     Configuration clientConf = new Configuration(conf);
-    clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+    clientConf.set("fs.hdfs.impl",
+                   "org.apache.hadoop.hdfs.DistributedFileSystem");
     clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
     URI dfsUri = dfs.getUri();
     FileSystem.closeAll();
@@ -439,7 +808,9 @@ public class TestBlockFixer extends Test
     conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
     conf.set("raid.server.address", "localhost:0");
     conf.setInt("hdfs.raid.stripeLength", stripeLength);
-    conf.set("hdfs.raid.locs", "/destraid");
+    conf.set("hdfs.raid.locations", "/destraid");
+
+    conf.setBoolean("dfs.permissions", false);
 
     dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
     dfs.waitActive();
@@ -453,7 +824,7 @@ public class TestBlockFixer extends Test
 
     FileSystem.setDefaultUri(conf, namenode);
     conf.set("mapred.job.tracker", jobTrackerName);
-
+    
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     String str = "<configuration> " +
@@ -463,22 +834,22 @@ public class TestBlockFixer extends Test
                         "<destPath> /destraid</destPath> " +
                         "<property> " +
                           "<name>targetReplication</name> " +
-                          "<value>1</value> " +
+                          "<value>1</value> " + 
                           "<description>after RAIDing, decrease the replication factor
of a file to this value." +
-                          "</description> " +
+                          "</description> " + 
                         "</property> " +
                         "<property> " +
                           "<name>metaReplication</name> " +
-                          "<value>1</value> " +
+                          "<value>1</value> " + 
                           "<description> replication factor of parity file" +
-                          "</description> " +
+                          "</description> " + 
                         "</property> " +
                         "<property> " +
                           "<name>modTimePeriod</name> " +
-                          "<value>2000</value> " +
+                          "<value>2000</value> " + 
                           "<description> time (milliseconds) after a file is modified
to make it " +
                                          "a candidate for RAIDing " +
-                          "</description> " +
+                          "</description> " + 
                         "</property> ";
     if (timeBeforeHar >= 0) {
       str +=
@@ -486,7 +857,7 @@ public class TestBlockFixer extends Test
                           "<name>time_before_har</name> " +
                           "<value>" + timeBeforeHar + "</value> " +
                           "<description> amount of time waited before har'ing parity
files" +
-                          "</description> " +
+                          "</description> " + 
                         "</property> ";
     }
 
@@ -514,14 +885,14 @@ public class TestBlockFixer extends Test
     return crc.getValue();
   }
 
-  void corruptBlock(String blockName) throws IOException {
+  static void corruptBlock(String blockName) throws IOException {
     boolean corrupted = false;
     for (int i = 0; i < NUM_DATANODES; i++) {
       corrupted |= TestDatanodeBlockScanner.corruptReplica(blockName, i);
     }
-    assertTrue(corrupted);
+    assertTrue("could not corrupt block", corrupted);
   }
-
+  
   static void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
     long blockSize) throws IOException {
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
Tue Nov 30 06:23:55 2010
@@ -102,6 +102,8 @@ public class TestRaidShell extends TestC
     localConf.setInt("raid.blockfix.interval", 1000);
     // the RaidNode does the raiding inline (instead of submitting to map/reduce)
     conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+    conf.set("raid.blockfix.classname",
+             "org.apache.hadoop.raid.LocalBlockFixer");
     cnode = RaidNode.createRaidNode(null, localConf);
 
     try {

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
Tue Nov 30 06:23:55 2010
@@ -85,7 +85,7 @@ public class TestReedSolomonDecoder exte
 
       // Ensure there are no corrupt files yet.
       DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
-      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
       assertEquals(corruptFiles.length, 0);
 
       // Now corrupt the file.
@@ -99,7 +99,7 @@ public class TestReedSolomonDecoder exte
           srcStat.getBlockSize());
 
       // Ensure file is corrupted.
-      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
       assertEquals(corruptFiles.length, 1);
       assertEquals(corruptFiles[0], file1.toString());
 



Mime
View raw message