hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1028580 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/raid/
Date Fri, 29 Oct 2010 01:23:54 GMT
Author: schen
Date: Fri Oct 29 01:23:54 2010
New Revision: 1028580

URL: http://svn.apache.org/viewvc?rev=1028580&view=rev
Log:
RaidNode periodically fixes corrupt blocks. (Ramkumar Vadali via schen)


Added:
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1028580&r1=1028579&r2=1028580&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Oct 29 01:23:54 2010
@@ -356,6 +356,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2146.  Raid does not affect access time of a source file.
     (Ramkumar Vadali via dhruba)
 
+    MAPREDUCE-2150.  RaidNode periodically fixes corrupt blocks. (Ramkumar Vadali via
+    schen)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java?rev=1028580&r1=1028579&r2=1028580&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
Fri Oct 29 01:23:54 2010
@@ -19,11 +19,21 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.tools.DFSck;
+import org.apache.hadoop.util.ToolRunner;
 
 public abstract class RaidDFSUtil {
   /**
@@ -49,4 +59,30 @@ public abstract class RaidDFSUtil {
     throws IOException {
     return dfs.getClient().namenode.getBlockLocations(path, offset, length);
   }
+
+  public static String[] getCorruptFiles(Configuration conf)
+    throws IOException {
+    ByteArrayOutputStream baseOut = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(baseOut, true);
+    DFSck fsck = new DFSck(conf, out);
+    String[] args = new String[]{"-list-corruptfileblocks"};
+    try {
+      ToolRunner.run(fsck, args);
+    } catch (Exception e) {
+      throw new IOException("DFSck.run exception ", e);
+    }
+    byte[] output = baseOut.toByteArray();
+    BufferedReader in = new BufferedReader(new InputStreamReader(
+      new ByteArrayInputStream(output)));
+    String line;
+    Set<String> corruptFiles = new HashSet<String>();
+    while ((line = in.readLine()) != null) {
+      // The interesting lines are of the form: blkid<tab>path
+      int separatorPos = line.indexOf('\t');
+      if (separatorPos != -1) {
+        corruptFiles.add(line.substring(separatorPos + 1));
+      }
+    }
+    return corruptFiles.toArray(new String[corruptFiles.size()]);
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java?rev=1028580&r1=1028579&r2=1028580&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
Fri Oct 29 01:23:54 2010
@@ -79,7 +79,7 @@ import org.apache.hadoop.raid.RaidUtils;
  * and figures out the location of the bad block by reading through
  * the corrupt file.
  */
-public class BlockFixer {
+public class BlockFixer implements Runnable {
   public static final Log LOG = LogFactory.getLog(
                                   "org.apache.hadoop.raid.BlockFixer");
   private java.util.HashMap<String, java.util.Date> history;
@@ -90,6 +90,8 @@ public class BlockFixer {
   private XOREncoder xorEncoder;
   private XORDecoder xorDecoder;
 
+  boolean running = true;
+
   public BlockFixer(Configuration conf) throws IOException {
     this.conf = conf;
     history = new java.util.HashMap<String, java.util.Date>();
@@ -101,10 +103,56 @@ public class BlockFixer {
     xorDecoder = new XORDecoder(conf, stripeLength);
   }
 
+  public void run() {
+    while (running) {
+      try {
+        LOG.info("BlockFixer continuing to run...");
+        doFix();
+      } catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+      } catch (Error err) {
+        LOG.error("Exiting after encountering " +
+                    StringUtils.stringifyException(err));
+        throw err;
+      }
+    }
+  }
+
   public long filesFixed() {
     return numFilesFixed;
   }
 
+  void doFix() throws InterruptedException, IOException {
+    while (running) {
+      // Sleep before proceeding to fix files.
+      Thread.sleep(blockFixInterval);
+
+      // Purge history older than the history interval.
+      purgeHistory();
+
+      List<Path> corruptFiles = getCorruptFiles();
+      if (corruptFiles.isEmpty()) {
+        // If there are no corrupt files, retry after some time.
+        continue;
+      }
+      LOG.info("Found " + corruptFiles.size() + " corrupt files.");
+
+      sortCorruptFiles(corruptFiles);
+
+      for (Path srcPath: corruptFiles) {
+        if (!running) break;
+        try {
+          fixFile(srcPath);
+        } catch (IOException ie) {
+          LOG.error("Hit error while processing " + srcPath +
+            ": " + StringUtils.stringifyException(ie));
+          // Do nothing, move on to the next file.
+        }
+      }
+    }
+  }
+
+
   void fixFile(Path srcPath) throws IOException {
     if (RaidNode.isParityHarPartFile(srcPath)) {
       processCorruptParityHarPartFile(srcPath);
@@ -169,14 +217,11 @@ public class BlockFixer {
   List<Path> getCorruptFiles() throws IOException {
     DistributedFileSystem dfs = getDFS(new Path("/"));
 
-    // TODO: need an RPC here.
-    // FileStatus[] files =  dfs.getClient().namenode.getCorruptFiles();
-    FileStatus[] files = new FileStatus[0];
+    String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(conf);
     List<Path> corruptFiles = new LinkedList<Path>();
-    for (FileStatus f: files) {
-      Path p = f.getPath();
-      if (!history.containsKey(p.toString())) {
-        corruptFiles.add(p);
+    for (String file: nnCorruptFiles) {
+      if (!history.containsKey(file)) {
+        corruptFiles.add(new Path(file));
       }
     }
     RaidUtils.filterTrash(conf, corruptFiles);

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java?rev=1028580&r1=1028579&r2=1028580&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
Fri Oct 29 01:23:54 2010
@@ -102,7 +102,7 @@ public class HarIndex {
       long startIndex = Long.parseLong(splits[3]);
       long length = Long.parseLong(splits[4]);
       String[] newsplits = URLDecoder.decode(splits[5],"UTF-8").split(" ");
-      if (newsplits != null && newsplits.length >= 5) {
+      if (newsplits != null && newsplits.length >= 4) {
         long mtime = Long.parseLong(newsplits[0]);
         IndexEntry entry = new IndexEntry(
           name, startIndex, length, mtime, partName);

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1028580&r1=1028579&r2=1028580&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Fri Oct 29 01:23:54 2010
@@ -110,6 +110,10 @@ public class RaidNode implements RaidPro
   /** Deamon thread to har raid directories */
   Daemon harThread = null;
 
+  /** Daemon thread to fix corrupt files */
+  BlockFixer blockFixer = null;
+  Daemon blockFixerThread = null;
+
   /** Daemon thread to monitor distributed raid job progress */
   JobMonitor jobMonitor = null;
   Daemon jobMonitorThread = null;
@@ -207,6 +211,7 @@ public class RaidNode implements RaidPro
     try {
       if (server != null) server.join();
       if (triggerThread != null) triggerThread.join();
+      if (blockFixerThread != null) blockFixerThread.join();
       if (jobMonitorThread != null) jobMonitorThread.join();
       if (purgeThread != null) purgeThread.join();
     } catch (InterruptedException ie) {
@@ -225,6 +230,8 @@ public class RaidNode implements RaidPro
     running = false;
     if (server != null) server.stop();
     if (triggerThread != null) triggerThread.interrupt();
+    if (blockFixer != null) blockFixer.running = false;
+    if (blockFixerThread != null) blockFixerThread.interrupt();
     if (jobMonitor != null) jobMonitor.running = false;
     if (jobMonitorThread != null) jobMonitorThread.interrupt();
     if (purgeThread != null) purgeThread.interrupt();
@@ -269,6 +276,10 @@ public class RaidNode implements RaidPro
     running = true;
     this.server.start(); // start RPC server
 
+    this.blockFixer = new BlockFixer(conf);
+    this.blockFixerThread = new Daemon(this.blockFixer);
+    this.blockFixerThread.start();
+
     this.jobMonitor = new JobMonitor(conf);
     this.jobMonitorThread = new Daemon(this.jobMonitor);
     this.jobMonitorThread.start();
@@ -1074,29 +1085,33 @@ public class RaidNode implements RaidPro
 
     if ( shouldHar ) {
       LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath );
-      singleHar(destFs, dest, tmpHarPath);
+      singleHar(info, destFs, dest, tmpHarPath);
     }
   } 
 
   
-  private void singleHar(FileSystem destFs, FileStatus dest, String tmpHarPath) throws IOException
{
-    
+  private void singleHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
+    String tmpHarPath) throws IOException {
+
     Random rand = new Random();
     Path root = new Path("/");
     Path qualifiedPath = dest.getPath().makeQualified(destFs);
     String harFileDst = qualifiedPath.getName() + HAR_SUFFIX;
     String harFileSrc = qualifiedPath.getName() + "-" + 
                                 rand.nextLong() + "-" + HAR_SUFFIX;
+    short metaReplication =
+      (short) Integer.parseInt(info.getProperty("metaReplication"));
     // HadoopArchives.HAR_PARTFILE_LABEL is private, so hard-coding the label.
     conf.setLong("har.partfile.size", configMgr.getHarPartfileSize());
     HadoopArchives har = new HadoopArchives(conf);
-    String[] args = new String[6];
-    args[0] = "-archiveName";
-    args[1] = harFileSrc;
-    args[2] = "-p"; 
-    args[3] = root.makeQualified(destFs).toString();
-    args[4] = qualifiedPath.toUri().getPath().substring(1);
-    args[5] = tmpHarPath.toString();
+    String[] args = new String[7];
+    args[0] = "-Ddfs.replication=" + metaReplication;
+    args[1] = "-archiveName";
+    args[2] = harFileSrc;
+    args[3] = "-p"; 
+    args[4] = root.makeQualified(destFs).toString();
+    args[5] = qualifiedPath.toUri().getPath().substring(1);
+    args[6] = tmpHarPath.toString();
     int ret = 0;
     try {
       ret = ToolRunner.run(har, args);

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java?rev=1028580&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
Fri Oct 29 01:23:54 2010
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.RaidUtils;
+
+
+public class TestBlockFixer extends TestCase {
+  final static Log LOG = LogFactory.getLog(
+                            "org.apache.hadoop.raid.TestBlockFixer");
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+  final static String CONFIG_FILE = new File(TEST_DIR,
+      "test-raid.xml").getAbsolutePath();
+  final static long RELOAD_INTERVAL = 1000;
+  final static int NUM_DATANODES = 3;
+  Configuration conf;
+  String namenode = null;
+  MiniDFSCluster dfs = null;
+  String hftp = null;
+  MiniMRCluster mr = null;
+  FileSystem fileSys = null;
+  RaidNode cnode = null;
+  String jobTrackerName = null;
+  Random rand = new Random();
+
+  /**
+   * Test the filtering of trash files from the list of corrupt files.
+   */
+  public void testTrashFilter() {
+    List<Path> files = new LinkedList<Path>();
+    // Paths that do not match the trash pattern.
+    Path p1 = new Path("/user/raid/raidtest/f1");
+    Path p2 = new Path("/user/.Trash/");
+    // Paths that match the trash pattern.
+    Path p3 = new Path("/user/raid/.Trash/raidtest/f1");
+    Path p4 = new Path("/user/raid/.Trash/");
+    files.add(p1);
+    files.add(p3);
+    files.add(p4);
+    files.add(p2);
+
+    Configuration conf = new Configuration();
+    RaidUtils.filterTrash(conf, files);
+
+    assertEquals(2, files.size());
+    for (Path p: files) {
+      assertTrue(p == p1 || p == p2);
+    }
+  }
+
+  /**
+   * Create a file with three stripes, corrupt a block each in two stripes,
+   * and wait for the the file to be fixed.
+   */
+  public void testBlockFix() throws Exception {
+    LOG.info("Test testBlockFix started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1);
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 7, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    LOG.info("Test testBlockFix created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      cnode.stop(); cnode.join();
+
+      FileStatus srcStat = fileSys.getFileStatus(file1);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
+        dfs, file1.toUri().getPath(), 0, srcStat.getLen());
+
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 0);
+      assertEquals(0, cnode.blockFixer.filesFixed());
+
+      // Corrupt blocks in two different stripes. We can fix them.
+      int[] corruptBlockIdxs = new int[]{0, 4, 6};
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(locs.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
+
+      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 1);
+      assertEquals(corruptFiles[0], file1.toUri().getPath());
+      assertEquals(3,
+        RaidDFSUtil.corruptBlocksInFile(dfs, file1.toUri().getPath(), 0,
+          srcStat.getLen()).size());
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      long start = System.currentTimeMillis();
+      while (cnode.blockFixer.filesFixed() < 1 &&
+             System.currentTimeMillis() - start < 120000) {
+        LOG.info("Test testBlockFix waiting for files to be fixed.");
+        Thread.sleep(1000);
+      }
+      assertEquals(1, cnode.blockFixer.filesFixed());
+
+      dfs = getDFS(conf, dfs);
+      assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+
+    } catch (Exception e) {
+      LOG.info("Test testBlockFix Exception " + e + StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+    LOG.info("Test testBlockFix completed.");
+  }
+
+  /**
+   * Tests integrity of generated block.
+   * Create a file and delete a block entirely. Wait for the block to be
+   * regenerated. Now stop RaidNode and corrupt the generated block.
+   * Test that corruption in the generated block can be detected by clients.
+   */
+  public void testGeneratedBlock() throws Exception {
+    LOG.info("Test testGeneratedBlock started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1);
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    long crc1 = TestRaidDfs.createTestFile(fileSys, file1, 1, 7, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    LOG.info("Test testGeneratedBlock created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      cnode.stop(); cnode.join();
+
+      FileStatus srcStat = fileSys.getFileStatus(file1);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
+        dfs, file1.toUri().getPath(), 0, srcStat.getLen());
+
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 0);
+      assertEquals(0, cnode.blockFixer.filesFixed());
+
+      corruptBlock(locs.get(0).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize);
+
+      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 1);
+      assertEquals(corruptFiles[0], file1.toUri().getPath());
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      long start = System.currentTimeMillis();
+      while (cnode.blockFixer.filesFixed() < 1 &&
+             System.currentTimeMillis() - start < 120000) {
+        LOG.info("Test testGeneratedBlock waiting for files to be fixed.");
+        Thread.sleep(1000);
+      }
+      assertEquals(1, cnode.blockFixer.filesFixed());
+
+      // Stop RaidNode
+      cnode.stop(); cnode.join(); cnode = null;
+
+      // The block has successfully been reconstructed.
+      dfs = getDFS(conf, dfs);
+      assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+
+      // Now corrupt the generated block.
+      locs = RaidDFSUtil.getBlockLocations(
+        dfs, file1.toUri().getPath(), 0, srcStat.getLen());
+      corruptBlock(locs.get(0).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize);
+
+      // This should fail.
+      boolean caughtChecksumException = false;
+      try {
+        Thread.sleep(5*1000);
+      } catch (InterruptedException ignore) {
+      }
+      try {
+        TestRaidDfs.validateFile(dfs, file1, file1Len, crc1);
+      } catch (org.apache.hadoop.fs.ChecksumException ce) {
+        caughtChecksumException = true;
+      }
+      assertTrue(caughtChecksumException);
+    } catch (Exception e) {
+      LOG.info("Test testGeneratedBlock Exception " + e + StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+    LOG.info("Test testGeneratedBlock completed.");
+  }
+
+  /**
+   * Corrupt a parity file and wait for it to get fixed.
+   */
+  public void testParityBlockFix() throws Exception {
+    LOG.info("Test testParityBlockFix started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, -1);
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    Path parityFile = new Path("/destraid/user/dhruba/raidtest/file1");
+    TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 7, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    LOG.info("Test testParityBlockFix created test files");
+
+    // create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+      cnode.stop(); cnode.join();
+
+      long parityCRC = getCRC(fileSys, parityFile);
+
+      FileStatus parityStat = fileSys.getFileStatus(parityFile);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
+        dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
+
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 0);
+      assertEquals(0, cnode.blockFixer.filesFixed());
+
+      // Corrupt parity blocks for different stripes.
+      int[] corruptBlockIdxs = new int[]{0, 1, 2};
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(locs.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs, blockSize);
+
+      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 1);
+      assertEquals(corruptFiles[0], parityFile.toUri().getPath());
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      long start = System.currentTimeMillis();
+      while (cnode.blockFixer.filesFixed() < 1 &&
+             System.currentTimeMillis() - start < 120000) {
+        LOG.info("Test testParityBlockFix waiting for files to be fixed.");
+        Thread.sleep(1000);
+      }
+      assertEquals(1, cnode.blockFixer.filesFixed());
+
+      long checkCRC = getCRC(fileSys, parityFile);
+
+      assertEquals(parityCRC, checkCRC);
+
+    } catch (Exception e) {
+      LOG.info("Test testParityBlockFix Exception " + e + StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+    LOG.info("Test testParityBlockFix completed.");
+  }
+
+  public void testParityHarBlockFix() throws Exception {
+    LOG.info("Test testParityHarBlockFix started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup(stripeLength, 0); // Time before har = 0 days.
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    // Parity file will have 7 blocks.
+    TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 20, blockSize);
+    long file1Len = fileSys.getFileStatus(file1).getLen();
+    LOG.info("Test testParityHarBlockFix created test files");
+
+    // create an instance of the RaidNode
+    // HAR block size = 2 * src block size = 2 * parity block size.
+    Configuration localConf = new Configuration(conf);
+    localConf.setLong("har.block.size", blockSize * 2);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    localConf.setInt("raid.blockfix.interval", 1000);
+    localConf.setLong("raid.blockfix.filespertask", 2L);
+
+    try {
+      cnode = RaidNode.createRaidNode(null, localConf);
+      Path harDirectory =
+        new Path("/destraid/user/dhruba/raidtest/raidtest" + RaidNode.HAR_SUFFIX);
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < 1000 * 120) {
+        if (fileSys.exists(harDirectory)) {
+          break;
+        }
+        LOG.info("Test testParityHarBlockFix waiting for har");
+        Thread.sleep(1000);
+      }
+
+      Path partFile = new Path(harDirectory, "part-0");
+      long partCRC = getCRC(fileSys, partFile);
+      FileStatus partStat = fileSys.getFileStatus(partFile);
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      LocatedBlocks locs = RaidDFSUtil.getBlockLocations(
+        dfs, partFile.toUri().getPath(), 0, partStat.getLen());
+      // 7 parity blocks => 4 har blocks.
+      assertEquals(4, locs.getLocatedBlocks().size());
+      cnode.stop(); cnode.join();
+
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 0);
+      assertEquals(0, cnode.blockFixer.filesFixed());
+
+      // Corrupt parity blocks for different stripes.
+      int[] corruptBlockIdxs = new int[]{0, 3};
+      for (int idx: corruptBlockIdxs)
+        corruptBlock(locs.get(idx).getBlock().getBlockName());
+      reportCorruptBlocks(dfs, partFile, corruptBlockIdxs,
+        partStat.getBlockSize());
+
+      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 1);
+      assertEquals(corruptFiles[0], partFile.toUri().getPath());
+
+      cnode = RaidNode.createRaidNode(null, localConf);
+      start = System.currentTimeMillis();
+      while (cnode.blockFixer.filesFixed() < 1 &&
+             System.currentTimeMillis() - start < 120000) {
+        LOG.info("Test testParityHarBlockFix waiting for files to be fixed.");
+        Thread.sleep(1000);
+      }
+      assertEquals(1, cnode.blockFixer.filesFixed());
+
+      long checkCRC = getCRC(fileSys, partFile);
+
+      assertEquals(partCRC, checkCRC);
+    } catch (Exception e) {
+      LOG.info("Test testParityHarBlockFix Exception " + e + StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+    LOG.info("Test testParityHarBlockFix completed.");
+  }
+
+  private static DistributedFileSystem getDFS(
+        Configuration conf, FileSystem dfs) throws IOException {
+    Configuration clientConf = new Configuration(conf);
+    clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+    clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+    URI dfsUri = dfs.getUri();
+    FileSystem.closeAll();
+    return (DistributedFileSystem) FileSystem.get(dfsUri, clientConf);
+  }
+
+  private void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
+
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+
+    conf.set("raid.config.file", CONFIG_FILE);
+    conf.setBoolean("raid.config.reload", true);
+    conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+    // scan all policies once every 5 second
+    conf.setLong("raid.policy.rescan.interval", 5000);
+
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+    // do not use map-reduce cluster for Raiding
+    conf.setBoolean("fs.raidnode.local", true);
+    conf.set("raid.server.address", "localhost:0");
+    conf.setInt("hdfs.raid.stripeLength", stripeLength);
+    conf.set("hdfs.raid.locs", "/destraid");
+
+    dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    namenode = fileSys.getUri().toString();
+
+    FileSystem.setDefaultUri(conf, namenode);
+    mr = new MiniMRCluster(4, namenode, 3);
+    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", jobTrackerName);
+
+    FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+    fileWriter.write("<?xml version=\"1.0\"?>\n");
+    String str = "<configuration> " +
+                   "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+                     "<policy name = \"RaidTest1\"> " +
+                        "<erasureCode>xor</erasureCode> " +
+                        "<destPath> /destraid</destPath> " +
+                        "<property> " +
+                          "<name>targetReplication</name> " +
+                          "<value>1</value> " +
+                          "<description>after RAIDing, decrease the replication factor
of a file to this value." +
+                          "</description> " +
+                        "</property> " +
+                        "<property> " +
+                          "<name>metaReplication</name> " +
+                          "<value>1</value> " +
+                          "<description> replication factor of parity file" +
+                          "</description> " +
+                        "</property> " +
+                        "<property> " +
+                          "<name>modTimePeriod</name> " +
+                          "<value>2000</value> " +
+                          "<description> time (milliseconds) after a file is modified
to make it " +
+                                         "a candidate for RAIDing " +
+                          "</description> " +
+                        "</property> ";
+    if (timeBeforeHar >= 0) {
+      str +=
+                        "<property> " +
+                          "<name>time_before_har</name> " +
+                          "<value>" + timeBeforeHar + "</value> " +
+                          "<description> amount of time waited before har'ing parity
files" +
+                          "</description> " +
+                        "</property> ";
+    }
+
+    str +=
+                     "</policy>" +
+                   "</srcPath>" +
+                 "</configuration>";
+    fileWriter.write(str);
+    fileWriter.close();
+  }
+
+  private void myTearDown() throws Exception {
+    if (cnode != null) { cnode.stop(); cnode.join(); }
+    if (mr != null) { mr.shutdown(); }
+    if (dfs != null) { dfs.shutdown(); }
+  }
+
+  private long getCRC(FileSystem fs, Path p) throws IOException {
+    CRC32 crc = new CRC32();
+    FSDataInputStream stm = fs.open(p);
+    for (int b = 0; b > 0; b = stm.read()) {
+      crc.update(b);
+    }
+    stm.close();
+    return crc.getValue();
+  }
+
+  void corruptBlock(String blockName) throws IOException {
+    boolean corrupted = false;
+    for (int i = 0; i < NUM_DATANODES; i++) {
+      corrupted |= TestDatanodeBlockScanner.corruptReplica(blockName, i);
+    }
+    assertTrue(corrupted);
+  }
+
+  void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
+    long blockSize) throws IOException {
+
+    FSDataInputStream in = fs.open(file);
+    for (int idx: idxs) {
+      long offset = idx * blockSize;
+      LOG.info("Reporting corrupt block " + file + ":" + offset);
+      in.seek(offset);
+      try {
+        in.readFully(new byte[(int)blockSize]);
+        fail("Expected exception not thrown for " + file + ":" + offset);
+      } catch (org.apache.hadoop.fs.ChecksumException e) {
+      }
+    }
+  }
+}
+



Mime
View raw message