hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r1040417 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/raid/
Date Tue, 30 Nov 2010 06:23:56 GMT
Author: dhruba
Date: Tue Nov 30 06:23:55 2010
New Revision: 1040417

URL: http://svn.apache.org/viewvc?rev=1040417&view=rev
Log:
MAPREDUCE-2155. RaidNode should optionally use the mapreduce jobs to 
fix missing blocks.  (Patrick Kling via dhruba)


Added:
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Nov 30 06:23:55 2010
@@ -22,6 +22,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1752. Implement getFileBlockLocations in HarFilesystem.
     (Patrick Kling via dhruba)
 
+    MAPREDUCE-2155. RaidNode should optionally use the mapreduce jobs to 
+    fix missing blocks.  (Patrick Kling via dhruba)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java Tue Nov 30 06:23:55 2010
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSck;
@@ -60,29 +61,24 @@ public abstract class RaidDFSUtil {
     return dfs.getClient().namenode.getBlockLocations(path, offset, length);
   }
 
-  public static String[] getCorruptFiles(Configuration conf)
+  /**
+   * Make successive calls to listCorruptFiles to obtain all 
+   * corrupt files.
+   */ 
+  public static String[] getCorruptFiles(DistributedFileSystem dfs)
     throws IOException {
-    ByteArrayOutputStream baseOut = new ByteArrayOutputStream();
-    PrintStream out = new PrintStream(baseOut, true);
-    DFSck fsck = new DFSck(conf, out);
-    String[] args = new String[]{"-list-corruptfileblocks"};
-    try {
-      ToolRunner.run(fsck, args);
-    } catch (Exception e) {
-      throw new IOException("DFSck.run exception ", e);
-    }
-    byte[] output = baseOut.toByteArray();
-    BufferedReader in = new BufferedReader(new InputStreamReader(
-      new ByteArrayInputStream(output)));
-    String line;
     Set<String> corruptFiles = new HashSet<String>();
-    while ((line = in.readLine()) != null) {
-      // The interesting lines are of the form: blkid<tab>path
-      int separatorPos = line.indexOf('\t');
-      if (separatorPos != -1) {
-        corruptFiles.add(line.substring(separatorPos + 1));
+    
+    String cookie = null;
+    for (CorruptFileBlocks fbs = dfs.listCorruptFileBlocks("/", cookie);
+         fbs.getFiles().length > 0;
+         fbs = dfs.listCorruptFileBlocks("/", cookie)) {
+      for (String path : fbs.getFiles()) {
+        corruptFiles.add(path);
       }
+      cookie = fbs.getCookie();
     }
+
     return corruptFiles.toArray(new String[corruptFiles.size()]);
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java Tue Nov 30 06:23:55 2010
@@ -42,6 +42,8 @@ import java.util.Random;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -64,9 +66,9 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.BlockMissingException;
 import org.apache.hadoop.hdfs.RaidDFSUtil;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.net.NetUtils;
 
 import org.apache.hadoop.raid.RaidNode;
@@ -77,6 +79,10 @@ import org.apache.hadoop.raid.protocol.P
 /**
  * contains the core functionality of the block fixer
  *
+ * configuration options:
+ * raid.blockfix.classname         - the class name of the block fixer 
+ *                                   implementation to use
+ *
  * raid.blockfix.interval          - interval between checks for corrupt files
  *
  * raid.blockfix.history.interval  - interval before fixing same file again
@@ -85,10 +91,9 @@ import org.apache.hadoop.raid.protocol.P
  *
  * raid.blockfix.write.timeout     - write time out
  */
-public class BlockFixer extends Configured implements Runnable {
-  public static final Log LOG = LogFactory.getLog(
-                                  "org.apache.hadoop.raid.BlockFixer");
+public abstract class BlockFixer extends Configured implements Runnable {
 
+  public static final String BLOCKFIX_CLASSNAME = "raid.blockfix.classname";
   public static final String BLOCKFIX_INTERVAL = "raid.blockfix.interval";
   public static final String BLOCKFIX_HISTORY_INTERVAL =
     "raid.blockfix.history.interval";
@@ -101,661 +106,737 @@ public class BlockFixer extends Configur
   public static final long DEFAULT_BLOCKFIX_HISTORY_INTERVAL =
     60 * 60 * 1000; // 60 mins
 
-  private java.util.HashMap<String, java.util.Date> history;
+  public static BlockFixer createBlockFixer(Configuration conf)
+    throws ClassNotFoundException {
+    try {
+      // default to distributed block fixer
+      Class<?> blockFixerClass =
+        conf.getClass(BLOCKFIX_CLASSNAME, DistBlockFixer.class);
+      if (!BlockFixer.class.isAssignableFrom(blockFixerClass)) {
+        throw new ClassNotFoundException("not an implementation of blockfixer");
+      }
+      Constructor<?> constructor =
+        blockFixerClass.getConstructor(new Class[] {Configuration.class} );
+      return (BlockFixer) constructor.newInstance(conf);
+    } catch (NoSuchMethodException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    } catch (InstantiationException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    } catch (IllegalAccessException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    } catch (InvocationTargetException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    }
+  }
+
   private long numFilesFixed = 0;
-  private String xorPrefix;
-  private String rsPrefix;
-  private Encoder xorEncoder;
-  private Decoder xorDecoder;
-  private Encoder rsEncoder;
-  private Decoder rsDecoder;
+
+  public volatile boolean running = true;
 
   // interval between checks for corrupt files
-  protected long blockFixInterval = DEFAULT_BLOCKFIX_INTERVAL;
+  protected long blockFixInterval;
 
   // interval before fixing same file again
-  protected long historyInterval = DEFAULT_BLOCKFIX_HISTORY_INTERVAL;
-
-  public volatile boolean running = true;
+  protected long historyInterval;
 
-
-  public BlockFixer(Configuration conf) throws IOException {
+  public BlockFixer(Configuration conf) {
     super(conf);
-    history = new java.util.HashMap<String, java.util.Date>();
-    blockFixInterval = getConf().getInt(BLOCKFIX_INTERVAL,
-                                   (int) blockFixInterval);
-    xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath();
-    if (!xorPrefix.endsWith(Path.SEPARATOR)) {
-      xorPrefix += Path.SEPARATOR;
-    }
-    int stripeLength = RaidNode.getStripeLength(getConf());
-    xorEncoder = new XOREncoder(getConf(), stripeLength);
-    xorDecoder = new XORDecoder(getConf(), stripeLength);
-    rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath();
-    if (!rsPrefix.endsWith(Path.SEPARATOR)) {
-      rsPrefix += Path.SEPARATOR;
-    }
-    int parityLength = RaidNode.rsParityLength(getConf());
-    rsEncoder = new ReedSolomonEncoder(getConf(), stripeLength, parityLength);
-    rsDecoder = new ReedSolomonDecoder(getConf(), stripeLength, parityLength);
-  }
+    blockFixInterval =
+      getConf().getLong(BLOCKFIX_INTERVAL, DEFAULT_BLOCKFIX_INTERVAL);
+    historyInterval =
+      getConf().getLong(BLOCKFIX_HISTORY_INTERVAL,
+                        DEFAULT_BLOCKFIX_HISTORY_INTERVAL);
 
-  public void run() {
-    while (running) {
-      try {
-        LOG.info("BlockFixer continuing to run...");
-        doFix();
-      } catch (Exception e) {
-        LOG.error(StringUtils.stringifyException(e));
-      } catch (Error err) {
-        LOG.error("Exiting after encountering " +
-                    StringUtils.stringifyException(err));
-        throw err;
-      }
-    }
   }
 
-  public long filesFixed() {
+  @Override
+  public abstract void run();
+
+  /**
+   * returns the number of files that have been fixed by this block fixer
+   */
+  public synchronized long filesFixed() {
     return numFilesFixed;
   }
 
-  void doFix() throws InterruptedException, IOException {
-    while (running) {
-      // Sleep before proceeding to fix files.
-      Thread.sleep(blockFixInterval);
+  /**
+   * increments the number of files that have been fixed by this block fixer
+   */
+  protected synchronized void incrFilesFixed() {
+    numFilesFixed++;
+  }
 
-      // Purge history older than the history interval.
-      purgeHistory();
+  /**
+   * increments the number of files that have been fixed by this block fixer
+   */
+  protected synchronized void incrFilesFixed(long incr) {
+    if (incr < 0) {
+      throw new IllegalArgumentException("cannot increment by negative value " +
+                                         incr);
+    }
+    
+    numFilesFixed += incr;
+  }
 
-      List<Path> corruptFiles = getCorruptFiles();
-      if (corruptFiles.isEmpty()) {
-        // If there are no corrupt files, retry after some time.
-        continue;
+  static boolean isSourceFile(Path p, String[] destPrefixes) {
+    String pathStr = p.toUri().getPath();
+    for (String destPrefix: destPrefixes) {
+      if (pathStr.startsWith(destPrefix)) {
+        return false;
       }
-      LOG.info("Found " + corruptFiles.size() + " corrupt files.");
-
-      sortCorruptFiles(corruptFiles);
+    }
+    return true;
+  }
 
-      for (Path srcPath: corruptFiles) {
-        if (!running) break;
-        try {
-          fixFile(srcPath);
-        } catch (IOException ie) {
-          LOG.error("Hit error while processing " + srcPath +
-            ": " + StringUtils.stringifyException(ie));
-          // Do nothing, move on to the next file.
-        }
+  void filterUnfixableSourceFiles(Iterator<Path> it) throws IOException {
+    String xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath();
+    if (!xorPrefix.endsWith(Path.SEPARATOR)) {
+      xorPrefix += Path.SEPARATOR;
+    }
+    String rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath();
+    if (!rsPrefix.endsWith(Path.SEPARATOR)) {
+      rsPrefix += Path.SEPARATOR;
+    }
+    String[] destPrefixes = new String[]{xorPrefix, rsPrefix};
+    while (it.hasNext()) {
+      Path p = it.next();
+      if (isSourceFile(p, destPrefixes) &&
+          RaidNode.xorParityForSource(p, getConf()) == null &&
+          RaidNode.rsParityForSource(p, getConf()) == null) {
+        it.remove();
       }
     }
   }
 
+  /**
+   * this class implements the actual fixing functionality
+   * we keep this in a separate class so that 
+   * the distributed block fixer can use it
+   */ 
+  static class BlockFixerHelper extends Configured {
 
-  void fixFile(Path srcPath) throws IOException {
+    public static final Log LOG = LogFactory.getLog(BlockFixer.
+                                                    BlockFixerHelper.class);
 
-    if (RaidNode.isParityHarPartFile(srcPath)) {
-      processCorruptParityHarPartFile(srcPath);
-      return;
-    }
+    private String xorPrefix;
+    private String rsPrefix;
+    private XOREncoder xorEncoder;
+    private XORDecoder xorDecoder;
+    private ReedSolomonEncoder rsEncoder;
+    private ReedSolomonDecoder rsDecoder;
 
-    // The corrupted file is a XOR parity file
-    if (isXorParityFile(srcPath)) {
-      processCorruptParityFile(srcPath, xorEncoder);
-      return;
-    }
+    public BlockFixerHelper(Configuration conf) throws IOException {
+      super(conf);
+
+      xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath();
+      if (!xorPrefix.endsWith(Path.SEPARATOR)) {
+        xorPrefix += Path.SEPARATOR;
+      }
+      rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath();
+      if (!rsPrefix.endsWith(Path.SEPARATOR)) {
+        rsPrefix += Path.SEPARATOR;
+      }
+      int stripeLength = RaidNode.getStripeLength(getConf());
+      xorEncoder = new XOREncoder(getConf(), stripeLength);
+      xorDecoder = new XORDecoder(getConf(), stripeLength);
+      int parityLength = RaidNode.rsParityLength(getConf());
+      rsEncoder = new ReedSolomonEncoder(getConf(), stripeLength, parityLength);
+      rsDecoder = new ReedSolomonDecoder(getConf(), stripeLength, parityLength);
 
-    // The corrupted file is a ReedSolomon parity file
-    if (isRsParityFile(srcPath)) {
-      processCorruptParityFile(srcPath, rsEncoder);
-      return;
     }
 
-    // The corrupted file is a source file
-    RaidNode.ParityFilePair ppair =
-      RaidNode.xorParityForSource(srcPath, getConf());
-    Decoder decoder = null;
-    if (ppair != null) {
-      decoder = xorDecoder;
-    } else  {
-      ppair = RaidNode.rsParityForSource(srcPath, getConf());
-      if (ppair != null) {
-        decoder = rsDecoder;
+    /**
+     * checks whether file is xor parity file
+     */
+    boolean isXorParityFile(Path p) {
+      String pathStr = p.toUri().getPath();
+      if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
+        return false;
       }
+      return pathStr.startsWith(xorPrefix);
     }
 
-    // If we have a parity file, process the file and fix it.
-    if (ppair != null) {
-      processCorruptFile(srcPath, ppair, decoder);
+    /**
+     * checks whether file is rs parity file
+     */
+    boolean isRsParityFile(Path p) {
+      String pathStr = p.toUri().getPath();
+      if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
+        return false;
+      }
+      return pathStr.startsWith(rsPrefix);
     }
 
-  }
-
-  /**
-   * We maintain history of fixed files because a fixed file may appear in
-   * the list of corrupt files if we loop around too quickly.
-   * This function removes the old items in the history so that we can
-   * recognize files that have actually become corrupt since being fixed.
-   */
-  void purgeHistory() {
-    // Default history interval is 1 hour.
-    long historyInterval = getConf().getLong(
-                             BLOCKFIX_HISTORY_INTERVAL, 3600*1000);
-    java.util.Date cutOff = new java.util.Date(
-                                   System.currentTimeMillis()-historyInterval);
-    List<String> toRemove = new java.util.ArrayList<String>();
-
-    for (String key: history.keySet()) {
-      java.util.Date item = history.get(key);
-      if (item.before(cutOff)) {
-        toRemove.add(key);
-      }
-    }
-    for (String key: toRemove) {
-      LOG.info("Removing " + key + " from history");
-      history.remove(key);
+    /**
+     * Fix a file, do not report progess.
+     *
+     * @return true if file has been fixed, false if no fixing 
+     * was necessary or possible.
+     */
+    boolean fixFile(Path srcPath) throws IOException {
+      return fixFile(srcPath, new RaidUtils.DummyProgressable());
     }
-  }
 
-  /**
-   * @return A list of corrupt files as obtained from the namenode
-   */
-  List<Path> getCorruptFiles() throws IOException {
-    DistributedFileSystem dfs = getDFS(new Path("/"));
+    /**
+     * Fix a file, report progess.
+     *
+     * @return true if file has been fixed, false if no fixing 
+     * was necessary or possible.
+     */
+    boolean fixFile(Path srcPath, Progressable progress) throws IOException {
 
-    String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(getConf());
-    List<Path> corruptFiles = new LinkedList<Path>();
-    for (String file: nnCorruptFiles) {
-      if (!history.containsKey(file)) {
-        corruptFiles.add(new Path(file));
+      if (RaidNode.isParityHarPartFile(srcPath)) {
+        return processCorruptParityHarPartFile(srcPath, progress);
       }
-    }
-    RaidUtils.filterTrash(getConf(), corruptFiles);
-    return corruptFiles;
-  }
 
-  /**
-   * Sorts source files ahead of parity files.
-   */
-  void sortCorruptFiles(List<Path> files) {
-    // TODO: We should first fix the files that lose more blocks
-    Comparator<Path> comp = new Comparator<Path>() {
-      public int compare(Path p1, Path p2) {
-        if (isXorParityFile(p2) || isRsParityFile(p2)) {
-          // If p2 is a parity file, p1 is smaller.
-          return -1;
-        }
-        if (isXorParityFile(p1) || isRsParityFile(p1)) {
-          // If p1 is a parity file, p2 is smaller.
-          return 1;
-        }
-        // If both are source files, they are equal.
-        return 0;
+      // The corrupted file is a XOR parity file
+      if (isXorParityFile(srcPath)) {
+        return processCorruptParityFile(srcPath, xorEncoder, progress);
       }
-    };
-    Collections.sort(files, comp);
-  }
 
-  /**
-   * Reads through a corrupt source file fixing corrupt blocks on the way.
-   * @param srcPath Path identifying the corrupt file.
-   * @throws IOException
-   */
-  void processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair,
-      Decoder decoder) throws IOException {
-    LOG.info("Processing corrupt file " + srcPath);
-
-    DistributedFileSystem srcFs = getDFS(srcPath);
-    FileStatus srcStat = srcFs.getFileStatus(srcPath);
-    long blockSize = srcStat.getBlockSize();
-    long srcFileSize = srcStat.getLen();
-    String uriPath = srcPath.toUri().getPath();
-
-    int numBlocksFixed = 0;
-    List<LocatedBlock> corrupt =
-      RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize);
-    for (LocatedBlock lb: corrupt) {
-      Block corruptBlock = lb.getBlock();
-      long corruptOffset = lb.getStartOffset();
-
-      LOG.info("Found corrupt block " + corruptBlock +
-          ", offset " + corruptOffset);
-
-      final long blockContentsSize =
-        Math.min(blockSize, srcFileSize - corruptOffset);
-      File localBlockFile =
-        File.createTempFile(corruptBlock.getBlockName(), ".tmp");
-      localBlockFile.deleteOnExit();
-
-      try {
-        decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(),
-          parityPair.getPath(), blockSize, corruptOffset, localBlockFile,
-          blockContentsSize);
-
-        // We have a the contents of the block, send them.
-        DatanodeInfo datanode = chooseDatanode(lb.getLocations());
-        computeMetdataAndSendFixedBlock(
-          datanode, localBlockFile, lb, blockContentsSize);
-        numBlocksFixed++;
+      // The corrupted file is a ReedSolomon parity file
+      if (isRsParityFile(srcPath)) {
+        return processCorruptParityFile(srcPath, rsEncoder, progress);
+      }
 
-        LOG.info("Adding " + srcPath + " to history");
-        history.put(srcPath.toString(), new java.util.Date());
-      } finally {
-        localBlockFile.delete();
+      // The corrupted file is a source file
+      RaidNode.ParityFilePair ppair =
+        RaidNode.xorParityForSource(srcPath, getConf());
+      Decoder decoder = null;
+      if (ppair != null) {
+        decoder = xorDecoder;
+      } else  {
+        ppair = RaidNode.rsParityForSource(srcPath, getConf());
+        if (ppair != null) {
+          decoder = rsDecoder;
+        }
       }
-    }
-    LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath);
-    numFilesFixed++;
-  }
 
-  /**
-   * checks whether file is xor parity file
-   */
-  boolean isXorParityFile(Path p) {
-    String pathStr = p.toUri().getPath();
-    if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
+      // If we have a parity file, process the file and fix it.
+      if (ppair != null) {
+        return processCorruptFile(srcPath, ppair, decoder, progress);
+      }
+      
+      // there was nothing to do
       return false;
     }
-    return pathStr.startsWith(xorPrefix);
-  }
 
-  /**
-   * checks whether file is rs parity file
-   */
-  boolean isRsParityFile(Path p) {
-    String pathStr = p.toUri().getPath();
-    if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
-      return false;
+    /**
+     * Sorts source files ahead of parity files.
+     */
+    void sortCorruptFiles(List<Path> files) {
+      // TODO: We should first fix the files that lose more blocks
+      Comparator<Path> comp = new Comparator<Path>() {
+        public int compare(Path p1, Path p2) {
+          if (isXorParityFile(p2) || isRsParityFile(p2)) {
+            // If p2 is a parity file, p1 is smaller.
+            return -1;
+          }
+          if (isXorParityFile(p1) || isRsParityFile(p1)) {
+            // If p1 is a parity file, p2 is smaller.
+            return 1;
+          }
+          // If both are source files, they are equal.
+          return 0;
+        }
+      };
+      Collections.sort(files, comp);
     }
-    return pathStr.startsWith(rsPrefix);
-  }
 
-  /**
-   * Returns a DistributedFileSystem hosting the path supplied.
-   */
-  protected DistributedFileSystem getDFS(Path p) throws IOException {
-    return (DistributedFileSystem) p.getFileSystem(getConf());
-  }
-
-  /**
-   * Fixes corrupt blocks in a parity file.
-   * This function uses the corresponding source file to regenerate parity
-   * file blocks.
-   */
-  void processCorruptParityFile(Path parityPath, Encoder encoder)
+    /**
+     * Returns a DistributedFileSystem hosting the path supplied.
+     */
+    protected DistributedFileSystem getDFS(Path p) throws IOException {
+      return (DistributedFileSystem) p.getFileSystem(getConf());
+    }
+
+    /**
+     * Reads through a corrupt source file fixing corrupt blocks on the way.
+     * @param srcPath Path identifying the corrupt file.
+     * @throws IOException
+     * @return true if file has been fixed, false if no fixing 
+     * was necessary or possible.
+     */
+    boolean processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair,
+                               Decoder decoder, Progressable progress)
       throws IOException {
-    LOG.info("Processing corrupt file " + parityPath);
-    Path srcPath = sourcePathFromParityPath(parityPath);
-    if (srcPath == null) {
-      LOG.warn("Unusable parity file " + parityPath);
-      return;
-    }
-
-    DistributedFileSystem parityFs = getDFS(parityPath);
-    FileStatus parityStat = parityFs.getFileStatus(parityPath);
-    long blockSize = parityStat.getBlockSize();
-    long parityFileSize = parityStat.getLen();
-    FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath);
-    long srcFileSize = srcStat.getLen();
-
-    // Check timestamp.
-    if (srcStat.getModificationTime() != parityStat.getModificationTime()) {
-      LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath +
-               ", moving on...");
-      return;
-    }
-
-    String uriPath = parityPath.toUri().getPath();
-    int numBlocksFixed = 0;
-    List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(
-      parityFs, uriPath, 0, parityFileSize);
-    for (LocatedBlock lb: corrupt) {
-      Block corruptBlock = lb.getBlock();
-      long corruptOffset = lb.getStartOffset();
-
-      LOG.info("Found corrupt block " + corruptBlock +
-          ", offset " + corruptOffset);
-
-      File localBlockFile =
-        File.createTempFile(corruptBlock.getBlockName(), ".tmp");
-      localBlockFile.deleteOnExit();
-
-      try {
-        encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize,
-            blockSize, parityPath, corruptOffset, localBlockFile);
-        // We have a the contents of the block, send them.
-        DatanodeInfo datanode = chooseDatanode(lb.getLocations());
-        computeMetdataAndSendFixedBlock(
-          datanode, localBlockFile, lb, blockSize);
-
-        numBlocksFixed++;
-        LOG.info("Adding " + parityPath + " to history");
-        history.put(parityPath.toString(), new java.util.Date());
-      } finally {
-        localBlockFile.delete();
+      LOG.info("Processing corrupt file " + srcPath);
+      
+      DistributedFileSystem srcFs = getDFS(srcPath);
+      FileStatus srcStat = srcFs.getFileStatus(srcPath);
+      long blockSize = srcStat.getBlockSize();
+      long srcFileSize = srcStat.getLen();
+      String uriPath = srcPath.toUri().getPath();
+      
+      int numBlocksFixed = 0;
+      List<LocatedBlock> corrupt =
+        RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize);
+      if (corrupt.size() == 0) {
+        return false;
+      }
+      for (LocatedBlock lb: corrupt) {
+        Block corruptBlock = lb.getBlock();
+        long corruptOffset = lb.getStartOffset();
+        
+        LOG.info("Found corrupt block " + corruptBlock +
+                 ", offset " + corruptOffset);
+        
+        final long blockContentsSize =
+          Math.min(blockSize, srcFileSize - corruptOffset);
+        File localBlockFile =
+          File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+        localBlockFile.deleteOnExit();
+        
+        try {
+          decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(),
+                                     parityPair.getPath(), blockSize,
+                                     corruptOffset, localBlockFile,
+                                     blockContentsSize);
+          
+          // We have a the contents of the block, send them.
+          DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+          computeMetadataAndSendFixedBlock(datanode, localBlockFile,
+                                          lb, blockContentsSize);
+          numBlocksFixed++;
+        } finally {
+          localBlockFile.delete();
+        }
+        progress.progress();
       }
+      LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath);
+      return true;
     }
-    LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath);
-    numFilesFixed++;
-  }
 
-  /**
-   * Reads through a parity HAR part file, fixing corrupt blocks on the way.
-   * A HAR block can contain many file blocks, as long as the HAR part file
-   * block size is a multiple of the file block size.
-   */
-  void processCorruptParityHarPartFile(Path partFile) throws IOException {
-    LOG.info("Processing corrupt file " + partFile);
-    // Get some basic information.
-    DistributedFileSystem dfs = getDFS(partFile);
-    FileStatus partFileStat = dfs.getFileStatus(partFile);
-    long partFileSize = partFileStat.getLen();
-    long partFileBlockSize = partFileStat.getBlockSize();
-    LOG.info(partFile + " has block size " + partFileBlockSize);
-
-    // Find the path to the index file.
-    // Parity file HARs are only one level deep, so the index files is at the
-    // same level as the part file.
-    String harDirectory = partFile.toUri().getPath(); // Temporarily.
-    harDirectory =
-      harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR));
-    Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName);
-    FileStatus indexStat = dfs.getFileStatus(indexFile);
-    // Parses through the HAR index file.
-    HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen());
-
-    String uriPath = partFile.toUri().getPath();
-    int numBlocksFixed = 0;
-    List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(
-      dfs, uriPath, 0, partFileSize);
-    for (LocatedBlock lb: corrupt) {
-      Block corruptBlock = lb.getBlock();
-      long corruptOffset = lb.getStartOffset();
-
-      File localBlockFile =
-        File.createTempFile(corruptBlock.getBlockName(), ".tmp");
-      localBlockFile.deleteOnExit();
-      processCorruptParityHarPartBlock(
-        dfs, partFile, corruptBlock, corruptOffset, partFileStat, harIndex,
-        localBlockFile);
-      // Now we have recovered the part file block locally, send it.
-      try {
-        DatanodeInfo datanode = chooseDatanode(lb.getLocations());
-        computeMetdataAndSendFixedBlock(datanode, localBlockFile,
-          lb, localBlockFile.length());
-        numBlocksFixed++;
+    /**
+     * Fixes corrupt blocks in a parity file.
+     * This function uses the corresponding source file to regenerate parity
+     * file blocks.
+     * @return true if file has been fixed, false if no fixing 
+     * was necessary or possible.
+     */
+    boolean processCorruptParityFile(Path parityPath, Encoder encoder,
+                                     Progressable progress)
+      throws IOException {
+      LOG.info("Processing corrupt file " + parityPath);
+      Path srcPath = sourcePathFromParityPath(parityPath);
+      if (srcPath == null) {
+        LOG.warn("Unusable parity file " + parityPath);
+        return false;
+      }
+
+      DistributedFileSystem parityFs = getDFS(parityPath);
+      FileStatus parityStat = parityFs.getFileStatus(parityPath);
+      long blockSize = parityStat.getBlockSize();
+      long parityFileSize = parityStat.getLen();
+      FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath);
+      long srcFileSize = srcStat.getLen();
+
+      // Check timestamp.
+      if (srcStat.getModificationTime() != parityStat.getModificationTime()) {
+        LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath +
+                 ", moving on...");
+        return false;
+      }
+
+      String uriPath = parityPath.toUri().getPath();
+      int numBlocksFixed = 0;
+      List<LocatedBlock> corrupt =
+        RaidDFSUtil.corruptBlocksInFile(parityFs, uriPath, 0, parityFileSize);
+      if (corrupt.size() == 0) {
+        return false;
+      }
+      for (LocatedBlock lb: corrupt) {
+        Block corruptBlock = lb.getBlock();
+        long corruptOffset = lb.getStartOffset();
+        
+        LOG.info("Found corrupt block " + corruptBlock +
+                 ", offset " + corruptOffset);
+        
+        File localBlockFile =
+          File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+        localBlockFile.deleteOnExit();
+        
+        try {
+          encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize,
+                                           blockSize, parityPath,
+                                           corruptOffset, localBlockFile);
+          // We have a the contents of the block, send them.
+          DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+          computeMetadataAndSendFixedBlock(datanode, localBlockFile, lb,
+                                          blockSize);
+          
+          numBlocksFixed++;
+        } finally {
+          localBlockFile.delete();
+        }
+        progress.progress();
+      }
+      LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath);
+      return true;
+    }
 
-        LOG.info("Adding " + partFile + " to history");
-        history.put(partFile.toString(), new java.util.Date());
-      } finally {
-        localBlockFile.delete();
+    /**
+     * Reads through a parity HAR part file, fixing corrupt blocks on the way.
+     * A HAR block can contain many file blocks, as long as the HAR part file
+     * block size is a multiple of the file block size.
+     * @return true if file has been fixed, false if no fixing 
+     * was necessary or possible.
+     */
+    boolean processCorruptParityHarPartFile(Path partFile,
+                                            Progressable progress)
+      throws IOException {
+      LOG.info("Processing corrupt file " + partFile);
+      // Get some basic information.
+      DistributedFileSystem dfs = getDFS(partFile);
+      FileStatus partFileStat = dfs.getFileStatus(partFile);
+      long partFileSize = partFileStat.getLen();
+      long partFileBlockSize = partFileStat.getBlockSize();
+      LOG.info(partFile + " has block size " + partFileBlockSize);
+
+      // Find the path to the index file.
+      // Parity file HARs are only one level deep, so the index files is at the
+      // same level as the part file.
+      String harDirectory = partFile.toUri().getPath(); // Temporarily.
+      harDirectory =
+        harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR));
+      Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName);
+      FileStatus indexStat = dfs.getFileStatus(indexFile);
+      // Parses through the HAR index file.
+      HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen());
+
+      String uriPath = partFile.toUri().getPath();
+      int numBlocksFixed = 0;
+      List<LocatedBlock> corrupt =
+        RaidDFSUtil.corruptBlocksInFile(dfs, uriPath, 0, partFileSize);
+      if (corrupt.size() == 0) {
+        return false;
+      }
+      for (LocatedBlock lb: corrupt) {
+        Block corruptBlock = lb.getBlock();
+        long corruptOffset = lb.getStartOffset();
+
+        File localBlockFile =
+          File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+        localBlockFile.deleteOnExit();
+        processCorruptParityHarPartBlock(dfs, partFile, corruptBlock,
+                                         corruptOffset, partFileStat, harIndex,
+                                         localBlockFile, progress);
+        // Now we have recovered the part file block locally, send it.
+        try {
+          DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+          computeMetadataAndSendFixedBlock(datanode, localBlockFile,
+                                          lb, localBlockFile.length());
+          numBlocksFixed++;
+        } finally {
+          localBlockFile.delete();
+        }
+        progress.progress();
       }
+      LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile);
+      return true;
     }
-    LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile);
-    numFilesFixed++;
-  }
 
-  /**
-   * This fixes a single part file block by recovering in sequence each
-   * parity block in the part file block.
-   */
-  private void processCorruptParityHarPartBlock(
-    FileSystem dfs, Path partFile, Block corruptBlock, long corruptOffset,
-    FileStatus partFileStat, HarIndex harIndex, File localBlockFile)
-    throws IOException {
-    String partName = partFile.toUri().getPath(); // Temporarily.
-    partName = partName.substring(1 + partName.lastIndexOf(Path.SEPARATOR));
+    /**
+     * This fixes a single part file block by recovering in sequence each
+     * parity block in the part file block.
+     */
+    private void processCorruptParityHarPartBlock(FileSystem dfs, Path partFile,
+                                                  Block corruptBlock,
+                                                  long corruptOffset,
+                                                  FileStatus partFileStat,
+                                                  HarIndex harIndex,
+                                                  File localBlockFile,
+                                                  Progressable progress)
+      throws IOException {
+      String partName = partFile.toUri().getPath(); // Temporarily.
+      partName = partName.substring(1 + partName.lastIndexOf(Path.SEPARATOR));
 
-    OutputStream out = new FileOutputStream(localBlockFile);
+      OutputStream out = new FileOutputStream(localBlockFile);
 
-    try {
-      // A HAR part file block could map to several parity files. We need to
-      // use all of them to recover this block.
-      final long corruptEnd = Math.min(corruptOffset + partFileStat.getBlockSize(),
-                                      partFileStat.getLen());
-      for (long offset = corruptOffset; offset < corruptEnd; ) {
-        HarIndex.IndexEntry entry = harIndex.findEntry(partName, offset);
-        if (entry == null) {
-          String msg = "Corrupt index file has no matching index entry for " +
-            partName + ":" + offset;
-          LOG.warn(msg);
-          throw new IOException(msg);
-        }
-        Path parityFile = new Path(entry.fileName);
-        Encoder encoder;
-        if (isXorParityFile(parityFile)) {
-          encoder = xorEncoder;
-        } else if (isRsParityFile(parityFile)) {
-          encoder = rsEncoder;
-        } else {
-          String msg = "Could not figure out parity file correctly";
-          LOG.warn(msg);
-          throw new IOException(msg);
-        }
-        Path srcFile = sourcePathFromParityPath(parityFile);
-        FileStatus srcStat = dfs.getFileStatus(srcFile);
-        if (srcStat.getModificationTime() != entry.mtime) {
-          String msg = "Modification times of " + parityFile + " and " + srcFile +
-            " do not match.";
-          LOG.warn(msg);
-          throw new IOException(msg);
-        }
-        long corruptOffsetInParity = offset - entry.startOffset;
-        LOG.info(partFile + ":" + offset + " maps to " +
-                 parityFile + ":" + corruptOffsetInParity +
-                 " and will be recovered from " + srcFile);
-        encoder.recoverParityBlockToStream(dfs, srcFile, srcStat.getLen(),
-          srcStat.getBlockSize(), parityFile, corruptOffsetInParity, out);
-        // Finished recovery of one parity block. Since a parity block has the
-        // same size as a source block, we can move offset by source block size.
-        offset += srcStat.getBlockSize();
-        LOG.info("Recovered " + srcStat.getBlockSize() + " part file bytes ");
-        if (offset > corruptEnd) {
-          String msg =
-            "Recovered block spills across part file blocks. Cannot continue...";
-          throw new IOException(msg);
+      try {
+        // A HAR part file block could map to several parity files. We need to
+        // use all of them to recover this block.
+        final long corruptEnd = Math.min(corruptOffset +
+                                         partFileStat.getBlockSize(),
+                                         partFileStat.getLen());
+        for (long offset = corruptOffset; offset < corruptEnd; ) {
+          HarIndex.IndexEntry entry = harIndex.findEntry(partName, offset);
+          if (entry == null) {
+            String msg = "Corrupt index file has no matching index entry for " +
+              partName + ":" + offset;
+            LOG.warn(msg);
+            throw new IOException(msg);
+          }
+          Path parityFile = new Path(entry.fileName);
+          Encoder encoder;
+          if (isXorParityFile(parityFile)) {
+            encoder = xorEncoder;
+          } else if (isRsParityFile(parityFile)) {
+            encoder = rsEncoder;
+          } else {
+            String msg = "Could not figure out parity file correctly";
+            LOG.warn(msg);
+            throw new IOException(msg);
+          }
+          Path srcFile = sourcePathFromParityPath(parityFile);
+          FileStatus srcStat = dfs.getFileStatus(srcFile);
+          if (srcStat.getModificationTime() != entry.mtime) {
+            String msg = "Modification times of " + parityFile + " and " +
+              srcFile + " do not match.";
+            LOG.warn(msg);
+            throw new IOException(msg);
+          }
+          long corruptOffsetInParity = offset - entry.startOffset;
+          LOG.info(partFile + ":" + offset + " maps to " +
+                   parityFile + ":" + corruptOffsetInParity +
+                   " and will be recovered from " + srcFile);
+          encoder.recoverParityBlockToStream(dfs, srcFile, srcStat.getLen(),
+                                             srcStat.getBlockSize(), parityFile,
+                                             corruptOffsetInParity, out);
+          // Finished recovery of one parity block. Since a parity block has the
+          // same size as a source block, we can move offset by source block size.
+          offset += srcStat.getBlockSize();
+          LOG.info("Recovered " + srcStat.getBlockSize() + " part file bytes ");
+          if (offset > corruptEnd) {
+            String msg =
+              "Recovered block spills across part file blocks. Cannot continue.";
+            throw new IOException(msg);
+          }
+          progress.progress();
         }
+      } finally {
+        out.close();
       }
-    } finally {
-      out.close();
     }
-  }
 
-  /**
-   * Choose a datanode (hostname:portnumber). The datanode is chosen at
-   * random from the live datanodes.
-   * @param locationsToAvoid locations to avoid.
-   * @return A string in the format name:port.
-   * @throws IOException
-   */
-  private DatanodeInfo chooseDatanode(DatanodeInfo[] locationsToAvoid)
-    throws IOException {
-    DistributedFileSystem dfs = getDFS(new Path("/"));
-    DatanodeInfo[] live = dfs.getClient().datanodeReport(
-                                                 DatanodeReportType.LIVE);
-    LOG.info("Choosing a datanode from " + live.length +
-      " live nodes while avoiding " + locationsToAvoid.length);
-    Random rand = new Random();
-    DatanodeInfo chosen = null;
-    int maxAttempts = 1000;
-    for (int i = 0; i < maxAttempts && chosen == null; i++) {
-      int idx = rand.nextInt(live.length);
-      chosen = live[idx];
-      for (DatanodeInfo avoid: locationsToAvoid) {
-        if (chosen.name.equals(avoid.name)) {
-          LOG.info("Avoiding " + avoid.name);
-          chosen = null;
-          break;
+    /**
+     * Choose a datanode (hostname:portnumber). The datanode is chosen at
+     * random from the live datanodes.
+     * @param locationsToAvoid locations to avoid.
+     * @return A datanode
+     * @throws IOException
+     */
+    private DatanodeInfo chooseDatanode(DatanodeInfo[] locationsToAvoid)
+      throws IOException {
+      DistributedFileSystem dfs = getDFS(new Path("/"));
+      DatanodeInfo[] live =
+        dfs.getClient().datanodeReport(DatanodeReportType.LIVE);
+      LOG.info("Choosing a datanode from " + live.length +
+               " live nodes while avoiding " + locationsToAvoid.length);
+      Random rand = new Random();
+      DatanodeInfo chosen = null;
+      int maxAttempts = 1000;
+      for (int i = 0; i < maxAttempts && chosen == null; i++) {
+        int idx = rand.nextInt(live.length);
+        chosen = live[idx];
+        for (DatanodeInfo avoid: locationsToAvoid) {
+          if (chosen.name.equals(avoid.name)) {
+            LOG.info("Avoiding " + avoid.name);
+            chosen = null;
+            break;
+          }
         }
       }
+      if (chosen == null) {
+        throw new IOException("Could not choose datanode");
+      }
+      LOG.info("Choosing datanode " + chosen.name);
+      return chosen;
     }
-    if (chosen == null) {
-      throw new IOException("Could not choose datanode");
-    }
-    LOG.info("Choosing datanode " + chosen.name);
-    return chosen;
-  }
 
-  /**
-   * Reads data from the data stream provided and computes metadata.
-   */
-  static DataInputStream computeMetadata(
-    Configuration conf, InputStream dataStream) throws IOException {
-    ByteArrayOutputStream mdOutBase = new ByteArrayOutputStream(1024*1024);
-    DataOutputStream mdOut = new DataOutputStream(mdOutBase);
-
-    // First, write out the version.
-    mdOut.writeShort(FSDataset.METADATA_VERSION);
-
-    // Create a summer and write out its header.
-    int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
-    DataChecksum sum = DataChecksum.newDataChecksum(
-                        DataChecksum.CHECKSUM_CRC32,
-                        bytesPerChecksum);
-    sum.writeHeader(mdOut);
-
-    // Buffer to read in a chunk of data.
-    byte[] buf = new byte[bytesPerChecksum];
-    // Buffer to store the checksum bytes.
-    byte[] chk = new byte[sum.getChecksumSize()];
-
-    // Read data till we reach the end of the input stream.
-    int bytesSinceFlush = 0;
-    while (true) {
-      // Read some bytes.
-      int bytesRead = dataStream.read(
-        buf, bytesSinceFlush, bytesPerChecksum-bytesSinceFlush);
-      if (bytesRead == -1) {
-        if (bytesSinceFlush > 0) {
+    /**
+     * Reads data from the data stream provided and computes metadata.
+     */
+    static DataInputStream computeMetadata(Configuration conf,
+                                           InputStream dataStream)
+      throws IOException {
+      ByteArrayOutputStream mdOutBase = new ByteArrayOutputStream(1024*1024);
+      DataOutputStream mdOut = new DataOutputStream(mdOutBase);
+      
+      // First, write out the version.
+      mdOut.writeShort(FSDataset.METADATA_VERSION);
+      
+      // Create a summer and write out its header.
+      int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+      DataChecksum sum =
+        DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+                                     bytesPerChecksum);
+      sum.writeHeader(mdOut);
+      
+      // Buffer to read in a chunk of data.
+      byte[] buf = new byte[bytesPerChecksum];
+      // Buffer to store the checksum bytes.
+      byte[] chk = new byte[sum.getChecksumSize()];
+      
+      // Read data till we reach the end of the input stream.
+      int bytesSinceFlush = 0;
+      while (true) {
+        // Read some bytes.
+        int bytesRead = dataStream.read(buf, bytesSinceFlush,
+                                        bytesPerChecksum-bytesSinceFlush);
+        if (bytesRead == -1) {
+          if (bytesSinceFlush > 0) {
+            boolean reset = true;
+            sum.writeValue(chk, 0, reset); // This also resets the sum.
+            // Write the checksum to the stream.
+            mdOut.write(chk, 0, chk.length);
+            bytesSinceFlush = 0;
+          }
+          break;
+        }
+        // Update the checksum.
+        sum.update(buf, bytesSinceFlush, bytesRead);
+        bytesSinceFlush += bytesRead;
+        
+        // Flush the checksum if necessary.
+        if (bytesSinceFlush == bytesPerChecksum) {
           boolean reset = true;
           sum.writeValue(chk, 0, reset); // This also resets the sum.
           // Write the checksum to the stream.
           mdOut.write(chk, 0, chk.length);
           bytesSinceFlush = 0;
         }
-        break;
-      }
-      // Update the checksum.
-      sum.update(buf, bytesSinceFlush, bytesRead);
-      bytesSinceFlush += bytesRead;
-
-      // Flush the checksum if necessary.
-      if (bytesSinceFlush == bytesPerChecksum) {
-        boolean reset = true;
-        sum.writeValue(chk, 0, reset); // This also resets the sum.
-        // Write the checksum to the stream.
-        mdOut.write(chk, 0, chk.length);
-        bytesSinceFlush = 0;
       }
+      
+      byte[] mdBytes = mdOutBase.toByteArray();
+      return new DataInputStream(new ByteArrayInputStream(mdBytes));
     }
 
-    byte[] mdBytes = mdOutBase.toByteArray();
-    return new DataInputStream(new ByteArrayInputStream(mdBytes));
-  }
+    private void computeMetadataAndSendFixedBlock(DatanodeInfo datanode,
+                                                  File localBlockFile,
+                                                  LocatedBlock block,
+                                                  long blockSize)
+      throws IOException {
 
-  private void computeMetdataAndSendFixedBlock(
-    DatanodeInfo datanode,
-    File localBlockFile, LocatedBlock block, long blockSize
-    ) throws IOException {
-
-    LOG.info("Computing metdata");
-    InputStream blockContents = null;
-    DataInputStream blockMetadata = null;
-    try {
-      blockContents = new FileInputStream(localBlockFile);
-      blockMetadata = computeMetadata(getConf(), blockContents);
-      blockContents.close();
-      // Reopen
-      blockContents = new FileInputStream(localBlockFile);
-      sendFixedBlock(datanode, blockContents, blockMetadata, block, blockSize);
-    } finally {
-      if (blockContents != null) {
+      LOG.info("Computing metdata");
+      InputStream blockContents = null;
+      DataInputStream blockMetadata = null;
+      try {
+        blockContents = new FileInputStream(localBlockFile);
+        blockMetadata = computeMetadata(getConf(), blockContents);
         blockContents.close();
-        blockContents = null;
-      }
-      if (blockMetadata != null) {
-        blockMetadata.close();
-        blockMetadata = null;
+        // Reopen
+        blockContents = new FileInputStream(localBlockFile);
+        sendFixedBlock(datanode, blockContents, blockMetadata, block,
+                       blockSize);
+      } finally {
+        if (blockContents != null) {
+          blockContents.close();
+          blockContents = null;
+        }
+        if (blockMetadata != null) {
+          blockMetadata.close();
+          blockMetadata = null;
+        }
       }
     }
-  }
 
-  /**
-   * Send a generated block to a datanode.
-   * @param datanode Chosen datanode name in host:port form.
-   * @param blockContents Stream with the block contents.
-   * @param corruptBlock Block identifying the block to be sent.
-   * @param blockSize size of the block.
-   * @throws IOException
-   */
-  private void sendFixedBlock(
-    DatanodeInfo datanode,
-    final InputStream blockContents, DataInputStream metadataIn,
-    LocatedBlock block, long blockSize
-    ) throws IOException {
-    InetSocketAddress target = NetUtils.createSocketAddr(datanode.name);
-    Socket sock = SocketChannel.open().socket();
-
-    int readTimeout = getConf().getInt(BLOCKFIX_READ_TIMEOUT,
-      HdfsConstants.READ_TIMEOUT);
-    NetUtils.connect(sock, target, readTimeout);
-    sock.setSoTimeout(readTimeout);
-
-    int writeTimeout = getConf().getInt(BLOCKFIX_WRITE_TIMEOUT,
-      HdfsConstants.WRITE_TIMEOUT);
-
-    OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
-    DataOutputStream out = new DataOutputStream(
-        new BufferedOutputStream(baseStream, FSConstants.SMALL_BUFFER_SIZE));
-
-    boolean corruptChecksumOk = false;
-    boolean chunkOffsetOK = false;
-    boolean verifyChecksum = true;
-    boolean transferToAllowed = false;
-
-    try {
-      LOG.info("Sending block " + block.getBlock() +
-          " from " + sock.getLocalSocketAddress().toString() +
-          " to " + sock.getRemoteSocketAddress().toString() +
-          " " + blockSize + " bytes");
-      RaidBlockSender blockSender = new RaidBlockSender(
-          block.getBlock(), blockSize, 0, blockSize,
-          corruptChecksumOk, chunkOffsetOK, verifyChecksum, transferToAllowed,
-          metadataIn, new RaidBlockSender.InputStreamFactory() {
-          @Override
-          public InputStream createStream(long offset) throws IOException {
-            // we are passing 0 as the offset above, so we can safely ignore
-            // the offset passed
-            return blockContents;
-          }
-        });
-
-      DatanodeInfo[] nodes = new DatanodeInfo[]{datanode};
-      DataTransferProtocol.Sender.opWriteBlock(
-        out, block.getBlock(), 1,
-        DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE,
-        0, blockSize, 0, "", null, nodes, block.getBlockToken());
-      blockSender.sendBlock(out, baseStream);
-
-      LOG.info("Sent block " + block.getBlock() + " to " + datanode.name);
-    } finally {
-      out.close();
+    /**
+     * Send a generated block to a datanode.
+     * @param datanode Chosen datanode name in host:port form.
+     * @param blockContents Stream with the block contents.
+     * @param corruptBlock Block identifying the block to be sent.
+     * @param blockSize size of the block.
+     * @throws IOException
+     */
+    private void sendFixedBlock(DatanodeInfo datanode,
+                                final InputStream blockContents,
+                                DataInputStream metadataIn,
+                                LocatedBlock block, long blockSize)
+      throws IOException {
+      InetSocketAddress target = NetUtils.createSocketAddr(datanode.name);
+      Socket sock = SocketChannel.open().socket();
+      
+      int readTimeout =
+        getConf().getInt(BLOCKFIX_READ_TIMEOUT,
+                         HdfsConstants.READ_TIMEOUT);
+      NetUtils.connect(sock, target, readTimeout);
+      sock.setSoTimeout(readTimeout);
+      
+      int writeTimeout = getConf().getInt(BLOCKFIX_WRITE_TIMEOUT,
+                                          HdfsConstants.WRITE_TIMEOUT);
+      
+      OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
+      DataOutputStream out = 
+        new DataOutputStream(new BufferedOutputStream(baseStream,
+                                                      FSConstants.
+                                                      SMALL_BUFFER_SIZE));
+      
+      boolean corruptChecksumOk = false;
+      boolean chunkOffsetOK = false;
+      boolean verifyChecksum = true;
+      boolean transferToAllowed = false;
+      
+      try {
+        LOG.info("Sending block " + block.getBlock() +
+                 " from " + sock.getLocalSocketAddress().toString() +
+                 " to " + sock.getRemoteSocketAddress().toString() +
+                 " " + blockSize + " bytes");
+        RaidBlockSender blockSender =
+          new RaidBlockSender(block.getBlock(), blockSize, 0, blockSize,
+                              corruptChecksumOk, chunkOffsetOK, verifyChecksum,
+                              transferToAllowed, metadataIn,
+                              new RaidBlockSender.InputStreamFactory() {
+                                @Override
+                                public InputStream
+                                  createStream(long offset) throws IOException {
+                                  // we are passing 0 as the offset above,
+                                  // so we can safely ignore
+                                  // the offset passed
+                                  return blockContents;
+                                }
+                              });
+        
+        DatanodeInfo[] nodes = new DatanodeInfo[]{datanode};
+        DataTransferProtocol.Sender.opWriteBlock(out, block.getBlock(), 1,
+                                                 DataTransferProtocol.
+                                                 BlockConstructionStage.
+                                                 PIPELINE_SETUP_CREATE,
+                                                 0, blockSize, 0, "", null,
+                                                 nodes, block.getBlockToken());
+        blockSender.sendBlock(out, baseStream);
+        
+        LOG.info("Sent block " + block.getBlock() + " to " + datanode.name);
+      } finally {
+        out.close();
+      }
     }
-  }
 
-  /**
-   * returns the source file corresponding to a parity file
-   */
-  Path sourcePathFromParityPath(Path parityPath) {
-    String parityPathStr = parityPath.toUri().getPath();
-    if (parityPathStr.startsWith(xorPrefix)) {
-      // Remove the prefix to get the source file.
-      String src = parityPathStr.replaceFirst(xorPrefix, "/");
-      return new Path(src);
-    } else if (parityPathStr.startsWith(rsPrefix)) {
-      // Remove the prefix to get the source file.
-      String src = parityPathStr.replaceFirst(rsPrefix, "/");
-      return new Path(src);
+    /**
+     * returns the source file corresponding to a parity file
+     */
+    Path sourcePathFromParityPath(Path parityPath) {
+      String parityPathStr = parityPath.toUri().getPath();
+      if (parityPathStr.startsWith(xorPrefix)) {
+        // Remove the prefix to get the source file.
+        String src = parityPathStr.replaceFirst(xorPrefix, "/");
+        return new Path(src);
+      } else if (parityPathStr.startsWith(rsPrefix)) {
+        // Remove the prefix to get the source file.
+        String src = parityPathStr.replaceFirst(rsPrefix, "/");
+        return new Path(src);
+      }
+      return null;
+    }
+
+    /**
+     * Returns the corrupt blocks in a file.
+     */
+    List<LocatedBlock> corruptBlocksInFile(DistributedFileSystem fs,
+                                           String uriPath, FileStatus stat)
+      throws IOException {
+      List<LocatedBlock> corrupt = new LinkedList<LocatedBlock>();
+      LocatedBlocks locatedBlocks =
+        RaidDFSUtil.getBlockLocations(fs, uriPath, 0, stat.getLen());
+      for (LocatedBlock b: locatedBlocks.getLocatedBlocks()) {
+        if (b.isCorrupt() ||
+            (b.getLocations().length == 0 && b.getBlockSize() > 0)) {
+          corrupt.add(b);
+        }
+      }
+      return corrupt;
     }
-    return null;
   }
+
 }
 

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java?rev=1040417&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java Tue Nov 30 06:23:55 2010
@@ -0,0 +1,671 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+/**
+ * distributed block fixer, uses map reduce jobs to fix corrupt files
+ *
+ * configuration options
+ * raid.blockfix.filespertask       - number of corrupt files to fix in a single
+ *                                    map reduce task (i.e., at one mapper node)
+ *
+ * raid.blockfix.fairscheduler.pool - the pool to use for block fixer jobs
+ *
+ * raid.blockfix.maxpendingfiles    - maximum number of files to fix 
+ *                                    simultaneously
+ */
+public class DistBlockFixer extends BlockFixer {
+  // volatile should be sufficient since only the block fixer thread
+  // updates numJobsRunning (other threads may read)
+  private volatile int numJobsRunning = 0;
+
+  private static final String WORK_DIR_PREFIX = "blockfixer";
+  private static final String IN_FILE_SUFFIX = ".in";
+  private static final String PART_PREFIX = "part-";
+  
+  private static final String BLOCKFIX_FILES_PER_TASK = 
+    "raid.blockfix.filespertask";
+  private static final String BLOCKFIX_MAX_PENDING_FILES =
+    "raid.blockfix.maxpendingfiles";
+  private static final String BLOCKFIX_POOL = 
+    "raid.blockfix.fairscheduler.pool";
+  // mapred.fairscheduler.pool is only used in the local configuration
+  // passed to a block fixing job
+  private static final String MAPRED_POOL = 
+    "mapred.fairscheduler.pool";
+
+  // default number of files to fix in a task
+  private static final long DEFAULT_BLOCKFIX_FILES_PER_TASK = 10L;
+
+  // default number of files to fix simultaneously
+  private static final long DEFAULT_BLOCKFIX_MAX_PENDING_FILES = 1000L;
+ 
+  protected static final Log LOG = LogFactory.getLog(DistBlockFixer.class);
+
+  // number of files to fix in a task
+  private long filesPerTask;
+
+  // number of files to fix simultaneously
+  final private long maxPendingFiles;
+
+  // number of files being fixed right now
+  private long pendingFiles;
+
+  // pool name to use (may be null, in which case no special pool is used)
+  private String poolName;
+
+  private long lastCheckTime;
+
+  private final SimpleDateFormat dateFormat = 
+    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  private Map<String, CorruptFileInfo> fileIndex = 
+    new HashMap<String, CorruptFileInfo>();
+  private Map<Job, List<CorruptFileInfo>> jobIndex =
+    new HashMap<Job, List<CorruptFileInfo>>();
+
+  static enum Counter {
+    FILES_SUCCEEDED, FILES_FAILED, FILES_NOACTION
+  }
+
+  public DistBlockFixer(Configuration conf) {
+    super(conf);
+    filesPerTask = DistBlockFixer.filesPerTask(getConf());
+    maxPendingFiles = DistBlockFixer.maxPendingFiles(getConf());
+    pendingFiles = 0L;
+    poolName = conf.get(BLOCKFIX_POOL);
+
+    // start off due for the first iteration
+    lastCheckTime = System.currentTimeMillis() - blockFixInterval;
+  }
+
+  /**
+   * determines how many files to fix in a single task
+   */ 
+  protected static long filesPerTask(Configuration conf) {
+    return conf.getLong(BLOCKFIX_FILES_PER_TASK, 
+                        DEFAULT_BLOCKFIX_FILES_PER_TASK);
+
+  }
+  /**
+   * determines how many files to fix simultaneously
+   */ 
+  protected static long maxPendingFiles(Configuration conf) {
+    return conf.getLong(BLOCKFIX_MAX_PENDING_FILES, 
+                        DEFAULT_BLOCKFIX_MAX_PENDING_FILES);
+  }
+
+  /**
+   * runs the block fixer periodically
+   */
+  public void run() {
+    while (running) {
+      // check if it is time to run the block fixer
+      long now = System.currentTimeMillis();
+      if (now >= lastCheckTime + blockFixInterval) {
+        lastCheckTime = now;
+        try {
+          checkAndFixBlocks(now);
+        } catch (InterruptedException ignore) {
+          LOG.info("interrupted");
+        } catch (Exception e) {
+          // log exceptions and keep running
+          LOG.error(StringUtils.stringifyException(e));
+        } catch (Error e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw e;
+        }
+      }
+      
+      // try to sleep for the remainder of the interval
+      long sleepPeriod = (lastCheckTime - System.currentTimeMillis()) + 
+        blockFixInterval;
+      
+      if ((sleepPeriod > 0L) && running) {
+        try {
+          Thread.sleep(sleepPeriod);
+        } catch (InterruptedException ignore) {
+          LOG.info("interrupted");
+        }
+      }
+    }
+  }
+
+  /**
+   * checks for corrupt blocks and fixes them (if any)
+   */
+  private void checkAndFixBlocks(long startTime)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    checkJobs();
+
+    if (pendingFiles >= maxPendingFiles) {
+      return;
+    }
+
+    List<Path> corruptFiles = getCorruptFiles();
+    filterUnfixableSourceFiles(corruptFiles.iterator());
+
+    String startTimeStr = dateFormat.format(new Date(startTime));
+
+    LOG.info("found " + corruptFiles.size() + " corrupt files");
+
+    if (corruptFiles.size() > 0) {
+      String jobName = "blockfixer." + startTime;
+      startJob(jobName, corruptFiles);
+    }
+  }
+
+  /**
+   * Handle a failed job.
+   */
+  private void failJob(Job job) throws IOException {
+    // assume no files have been fixed
+    LOG.info("job " + job.getJobID() + "(" + job.getJobName() +
+      ") finished (failed)");
+    for (CorruptFileInfo fileInfo: jobIndex.get(job)) {
+      fileInfo.fail();
+    }
+    numJobsRunning--;
+  }
+
+  /**
+   * Handle a successful job.
+   */ 
+  private void succeedJob(Job job, long filesSucceeded, long filesFailed)
+    throws IOException {
+    LOG.info("job " + job.getJobID() + "(" + job.getJobName() +
+      ") finished (succeeded)");
+
+    if (filesFailed == 0) {
+      // no files have failed
+      for (CorruptFileInfo fileInfo: jobIndex.get(job)) {
+        fileInfo.succeed();
+      }
+    } else {
+      // we have to look at the output to check which files have failed
+      Set<String> failedFiles = getFailedFiles(job);
+      
+      for (CorruptFileInfo fileInfo: jobIndex.get(job)) {
+        if (failedFiles.contains(fileInfo.getFile().toString())) {
+          fileInfo.fail();
+        } else {
+          // call succeed for files that have succeeded or for which no action
+          // was taken
+          fileInfo.succeed();
+        }
+      }
+    }
+    // report succeeded files to metrics
+    incrFilesFixed(filesSucceeded);
+    numJobsRunning--;
+  }
+
+  /**
+   * checks if jobs have completed and updates job and file index
+   * returns a list of failed files for restarting
+   */
+  private void checkJobs() throws IOException {
+    Iterator<Job> jobIter = jobIndex.keySet().iterator();
+    while(jobIter.hasNext()) {
+      Job job = jobIter.next();
+
+      try {
+        if (job.isComplete()) {
+          long filesSucceeded =
+            job.getCounters().findCounter(Counter.FILES_SUCCEEDED).getValue();
+          long filesFailed =
+            job.getCounters().findCounter(Counter.FILES_FAILED).getValue();
+          long filesNoAction =
+            job.getCounters().findCounter(Counter.FILES_NOACTION).getValue();
+          int files = jobIndex.get(job).size();
+          if (job.isSuccessful() && 
+              (filesSucceeded + filesFailed + filesNoAction == 
+               ((long) files))) {
+            // job has processed all files
+            succeedJob(job, filesSucceeded, filesFailed);
+          } else {
+            failJob(job);
+          }
+          jobIter.remove();
+        } else {
+          LOG.info("job " + job.getJobName() + " still running");
+        }
+      } catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+        failJob(job);
+        try {
+          job.killJob();
+        } catch (Exception ee) {
+          LOG.error(StringUtils.stringifyException(ee));
+        }
+        jobIter.remove();
+      }
+    }
+    purgeFileIndex();
+  }
+
+  /**
+   * determines which files have failed for a given job
+   */
+  private Set<String> getFailedFiles(Job job) throws IOException {
+    Set<String> failedFiles = new HashSet<String>();
+
+    Path outDir = SequenceFileOutputFormat.getOutputPath(job);
+    FileSystem fs  = outDir.getFileSystem(getConf());
+    if (!fs.getFileStatus(outDir).isDir()) {
+      throw new IOException(outDir.toString() + " is not a directory");
+    }
+
+    FileStatus[] files = fs.listStatus(outDir);
+
+    for (FileStatus f: files) {
+      Path fPath = f.getPath();
+      if ((!f.isDir()) && (fPath.getName().startsWith(PART_PREFIX))) {
+        LOG.info("opening " + fPath.toString());
+        SequenceFile.Reader reader = 
+          new SequenceFile.Reader(fs, fPath, getConf());
+
+        Text key = new Text();
+        Text value = new Text();
+        while (reader.next(key, value)) {
+          failedFiles.add(key.toString());
+        }
+        reader.close();
+      }
+    }
+    return failedFiles;
+  }
+
+
+  /**
+   * purge expired jobs from the file index
+   */
+  private void purgeFileIndex() {
+    Iterator<String> fileIter = fileIndex.keySet().iterator();
+    while(fileIter.hasNext()) {
+      String file = fileIter.next();
+      if (fileIndex.get(file).isExpired()) {
+        fileIter.remove();
+      }
+    }
+    
+  }
+
+  /**
+   * creates and submits a job, updates file index and job index
+   */
+  private Job startJob(String jobName, List<Path> corruptFiles) 
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Path inDir = new Path(WORK_DIR_PREFIX + "/in/" + jobName);
+    Path outDir = new Path(WORK_DIR_PREFIX + "/out/" + jobName);
+    List<Path> filesInJob = createInputFile(jobName, inDir, corruptFiles);
+
+    Configuration jobConf = new Configuration(getConf());
+    if (poolName != null) {
+      jobConf.set(MAPRED_POOL, poolName);
+    }
+    Job job = new Job(jobConf, jobName);
+    job.setJarByClass(getClass());
+    job.setMapperClass(DistBlockFixerMapper.class);
+    job.setNumReduceTasks(0);
+    job.setInputFormatClass(DistBlockFixerInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+
+    DistBlockFixerInputFormat.setInputPaths(job, inDir);
+    SequenceFileOutputFormat.setOutputPath(job, outDir);
+
+    job.submit();
+    // submit the job before inserting it into the index
+    // this way, if submit fails, we won't have added anything to the index
+    insertJob(job, filesInJob);
+    return job;
+  }
+
+  /**
+   * inserts new job into file index and job index
+   */
+  private void insertJob(Job job, List<Path> corruptFiles) {
+    List<CorruptFileInfo> fileInfos = new LinkedList<CorruptFileInfo>();
+
+    for (Path file: corruptFiles) {
+      CorruptFileInfo fileInfo = new CorruptFileInfo(file, job);
+      fileInfos.add(fileInfo);
+      fileIndex.put(file.toString(), fileInfo);
+    }
+
+    jobIndex.put(job, fileInfos);
+    numJobsRunning++;
+  }
+    
+  /**
+   * creates the input file (containing the names of the files to be fixed
+   */
+  private List<Path> createInputFile(String jobName, Path inDir, 
+                                     List<Path> corruptFiles) 
+    throws IOException {
+
+    Path file = new Path(inDir, jobName + IN_FILE_SUFFIX);
+    FileSystem fs = file.getFileSystem(getConf());
+    SequenceFile.Writer fileOut = SequenceFile.createWriter(fs, getConf(), file,
+                                                            LongWritable.class,
+                                                            Text.class);
+    long index = 0L;
+
+    List<Path> filesAdded = new LinkedList<Path>();
+
+    for (Path corruptFile: corruptFiles) {
+      if (pendingFiles >= maxPendingFiles) {
+        break;
+      }
+
+      String corruptFileName = corruptFile.toString();
+      fileOut.append(new LongWritable(index++), new Text(corruptFileName));
+      filesAdded.add(corruptFile);
+      pendingFiles++;
+
+      if (index % filesPerTask == 0) {
+        fileOut.sync(); // create sync point to make sure we can split here
+      }
+    }
+
+    fileOut.close();
+    return filesAdded;
+  }
+
+  /**
+   * gets a list of corrupt files from the name node
+   * and filters out files that are currently being fixed or 
+   * that were recently fixed
+   */
+  private List<Path> getCorruptFiles() throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) 
+      (new Path("/")).getFileSystem(getConf());
+
+    String[] files = RaidDFSUtil.getCorruptFiles(dfs);
+    List<Path> corruptFiles = new LinkedList<Path>();
+
+    for (String f: files) {
+      Path p = new Path(f);
+      // filter out files that are being fixed or that were recently fixed
+      if (!fileIndex.containsKey(p.toString())) {
+        corruptFiles.add(p);
+      }
+    }
+    RaidUtils.filterTrash(getConf(), corruptFiles);
+
+    return corruptFiles;
+  }
+
+  /**
+   * returns the number of map reduce jobs running
+   */
+  public int jobsRunning() {
+    return numJobsRunning;
+  }
+
+  /**
+   * hold information about a corrupt file that is being fixed
+   */
+  class CorruptFileInfo {
+
+    private Path file;
+    private Job job;
+    private boolean done;
+    private long time;
+
+    public CorruptFileInfo(Path file, Job job) {
+      this.file = file;
+      this.job = job;
+      this.done = false;
+      this.time = 0;
+    }
+
+    public boolean isDone() {
+      return done;
+    }
+    
+    public boolean isExpired() {
+      return done && ((System.currentTimeMillis() - time) > historyInterval);
+    }
+
+    public Path getFile() {
+      return file;
+    }
+    
+    /**
+     * updates file index to record a failed attempt at fixing a file,
+     * immediately removes the entry from the file index 
+     * (instead of letting it expire)
+     * so that we can retry right away
+     */
+    public void fail() {
+      // remove this file from the index
+      CorruptFileInfo removed = fileIndex.remove(file.toString());
+      if (removed == null) {
+        LOG.error("trying to remove file not in file index: " +
+                  file.toString());
+      } else {
+        LOG.info("fixing " + file.toString() + " failed");
+      }
+      pendingFiles--;
+    }
+
+    /**
+     * marks a file as fixed successfully
+     * and sets time stamp for expiry after specified interval
+     */
+    public void succeed() {
+      // leave the file in the index,
+      // will be pruged later
+      job = null;
+      done = true;
+      time = System.currentTimeMillis();
+      LOG.info("fixing " + file.toString() + " succeeded");
+      pendingFiles--;
+    }
+  }
+
+  static class DistBlockFixerInputFormat
+    extends SequenceFileInputFormat<LongWritable, Text> {
+
+    protected static final Log LOG = 
+      LogFactory.getLog(DistBlockFixerMapper.class);
+    
+    /**
+     * splits the input files into tasks handled by a single node
+     * we have to read the input files to do this based on a number of 
+     * items in a sequence
+     */
+    @Override
+    public List <InputSplit> getSplits(JobContext job) 
+      throws IOException {
+      long filesPerTask = DistBlockFixer.filesPerTask(job.getConfiguration());
+
+      Path[] inPaths = getInputPaths(job);
+
+      List<InputSplit> splits = new LinkedList<InputSplit>();
+
+      long fileCounter = 0;
+
+      for (Path inPath: inPaths) {
+        
+        FileSystem fs = inPath.getFileSystem(job.getConfiguration());      
+
+        if (!fs.getFileStatus(inPath).isDir()) {
+          throw new IOException(inPath.toString() + " is not a directory");
+        }
+
+        FileStatus[] inFiles = fs.listStatus(inPath);
+
+        for (FileStatus inFileStatus: inFiles) {
+          Path inFile = inFileStatus.getPath();
+          
+          if (!inFileStatus.isDir() &&
+              (inFile.getName().equals(job.getJobName() + IN_FILE_SUFFIX))) {
+
+            fileCounter++;
+            SequenceFile.Reader inFileReader = 
+              new SequenceFile.Reader(fs, inFile, job.getConfiguration());
+            
+            long startPos = inFileReader.getPosition();
+            long counter = 0;
+            
+            // create an input split every filesPerTask items in the sequence
+            LongWritable key = new LongWritable();
+            Text value = new Text();
+            try {
+              while (inFileReader.next(key, value)) {
+                if (counter % filesPerTask == filesPerTask - 1L) {
+                  splits.add(new FileSplit(inFile, startPos, 
+                                           inFileReader.getPosition() - 
+                                           startPos,
+                                           null));
+                  startPos = inFileReader.getPosition();
+                }
+                counter++;
+              }
+              
+              // create input split for remaining items if necessary
+              // this includes the case where no splits were created by the loop
+              if (startPos != inFileReader.getPosition()) {
+                splits.add(new FileSplit(inFile, startPos,
+                                         inFileReader.getPosition() - startPos,
+                                         null));
+              }
+            } finally {
+              inFileReader.close();
+            }
+          }
+        }
+      }
+
+      LOG.info("created " + splits.size() + " input splits from " +
+               fileCounter + " files");
+      
+      return splits;
+    }
+
+    /**
+     * indicates that input file can be split
+     */
+    @Override
+    public boolean isSplitable (JobContext job, Path file) {
+      return true;
+    }
+  }
+
+
+  /**
+   * mapper for fixing stripes with corrupt blocks
+   */
+  static class DistBlockFixerMapper
+    extends Mapper<LongWritable, Text, Text, Text> {
+
+    protected static final Log LOG = 
+      LogFactory.getLog(DistBlockFixerMapper.class);
+
+    /**
+     * fix a stripe
+     */
+    @Override
+    public void map(LongWritable key, Text fileText, Context context) 
+      throws IOException, InterruptedException {
+      
+      BlockFixerHelper helper = 
+        new BlockFixerHelper(context.getConfiguration());
+
+      String fileStr = fileText.toString();
+      LOG.info("fixing " + fileStr);
+
+      Path file = new Path(fileStr);
+      boolean success = false;
+
+      try {
+        boolean fixed = helper.fixFile(file, context);
+        
+        if (fixed) {
+          context.getCounter(Counter.FILES_SUCCEEDED).increment(1L);
+        } else {
+          context.getCounter(Counter.FILES_NOACTION).increment(1L);
+        }
+      } catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+
+        // report file as failed
+        context.getCounter(Counter.FILES_FAILED).increment(1L);
+        String outkey = fileStr;
+        String outval = "failed";
+        context.write(new Text(outkey), new Text(outval));
+      }
+      
+      context.progress();
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java?rev=1040417&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java Tue Nov 30 06:23:55 2010
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.net.NetUtils;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.RaidUtils;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
+
+/**
+ * This class fixes source file blocks using the parity file,
+ * and parity file blocks using the source file.
+ * It periodically fetches the list of corrupt files from the namenode,
+ * and figures out the location of the bad block by reading through
+ * the corrupt file.
+ */
+public class LocalBlockFixer extends BlockFixer {
+  public static final Log LOG = LogFactory.getLog(LocalBlockFixer.class);
+
+  private java.util.HashMap<String, java.util.Date> history;
+  
+  private BlockFixerHelper helper;
+
+  public LocalBlockFixer(Configuration conf) throws IOException {
+    super(conf);
+    history = new java.util.HashMap<String, java.util.Date>();
+    helper = new BlockFixerHelper(getConf());
+  }
+
+  public void run() {
+    while (running) {
+      try {
+        LOG.info("LocalBlockFixer continuing to run...");
+        doFix();
+      } catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+      } catch (Error err) {
+        LOG.error("Exiting after encountering " +
+                    StringUtils.stringifyException(err));
+        throw err;
+      }
+    }
+  }
+
+  void doFix() throws InterruptedException, IOException {
+    while (running) {
+      // Sleep before proceeding to fix files.
+      Thread.sleep(blockFixInterval);
+
+      // Purge history older than the history interval.
+      purgeHistory();
+
+      List<Path> corruptFiles = getCorruptFiles();
+
+      filterUnfixableSourceFiles(corruptFiles.iterator());
+
+      if (corruptFiles.isEmpty()) {
+        // If there are no corrupt files, retry after some time.
+        continue;
+      }
+      LOG.info("Found " + corruptFiles.size() + " corrupt files.");
+
+      helper.sortCorruptFiles(corruptFiles);
+
+      for (Path srcPath: corruptFiles) {
+        if (!running) break;
+        try {
+          boolean fixed = helper.fixFile(srcPath);
+          LOG.info("Adding " + srcPath + " to history");
+          history.put(srcPath.toString(), new java.util.Date());
+          if (fixed) {
+            incrFilesFixed();
+          }
+        } catch (IOException ie) {
+          LOG.error("Hit error while processing " + srcPath +
+            ": " + StringUtils.stringifyException(ie));
+          // Do nothing, move on to the next file.
+        }
+      }
+    }
+  }
+
+
+  /**
+   * We maintain history of fixed files because a fixed file may appear in
+   * the list of corrupt files if we loop around too quickly.
+   * This function removes the old items in the history so that we can
+   * recognize files that have actually become corrupt since being fixed.
+   */
+  void purgeHistory() {
+    java.util.Date cutOff = new java.util.Date(System.currentTimeMillis() -
+                                               historyInterval);
+    List<String> toRemove = new java.util.ArrayList<String>();
+
+    for (String key: history.keySet()) {
+      java.util.Date item = history.get(key);
+      if (item.before(cutOff)) {
+        toRemove.add(key);
+      }
+    }
+    for (String key: toRemove) {
+      LOG.info("Removing " + key + " from history");
+      history.remove(key);
+    }
+  }
+
+  /**
+   * @return A list of corrupt files as obtained from the namenode
+   */
+  List<Path> getCorruptFiles() throws IOException {
+    DistributedFileSystem dfs = helper.getDFS(new Path("/"));
+
+    String[] files = RaidDFSUtil.getCorruptFiles(dfs);
+    List<Path> corruptFiles = new LinkedList<Path>();
+    for (String f: files) {
+      Path p = new Path(f);
+      if (!history.containsKey(p.toString())) {
+        corruptFiles.add(p);
+      }
+    }
+    RaidUtils.filterTrash(getConf(), corruptFiles);
+    return corruptFiles;
+  }
+
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Nov 30 06:23:55 2010
@@ -289,7 +289,7 @@ public abstract class RaidNode implement
     running = true;
     this.server.start(); // start RPC server
 
-    this.blockFixer = new BlockFixer(conf);
+    this.blockFixer = BlockFixer.createBlockFixer(conf);
     this.blockFixerThread = new Daemon(this.blockFixer);
     this.blockFixerThread.start();
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java?rev=1040417&r1=1040416&r2=1040417&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java Tue Nov 30 06:23:55 2010
@@ -288,7 +288,7 @@ public class RaidShell extends Configure
   public void recoverBlocks(String[] args, int startIndex)
     throws IOException {
     LOG.info("Recovering blocks for " + (args.length - startIndex) + " files");
-    BlockFixer fixer = new BlockFixer(conf);
+    BlockFixer.BlockFixerHelper fixer = new BlockFixer.BlockFixerHelper(conf);
     for (int i = startIndex; i < args.length; i++) {
       String path = args[i];
       fixer.fixFile(new Path(path));



Mime
View raw message