hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1021873 [1/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/hdfs/ src/contrib/raid/src/test/org/apache/hadoo...
Date Tue, 12 Oct 2010 18:23:36 GMT
Author: schen
Date: Tue Oct 12 18:23:36 2010
New Revision: 1021873

URL: http://svn.apache.org/viewvc?rev=1021873&view=rev
Log:
MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
Vadali via schen)


Added:
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Oct 12 18:23:36 2010
@@ -133,6 +133,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1517. Supports streaming job to run in the background. (Bochun Bai
     via amareshwari)
 
+    MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
+    Vadali via schen)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
@@ -325,7 +328,7 @@ Trunk (unreleased changes)
     MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
     Mathew via amareshwari)
 
-    MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
+    MAPREDUCE-1908. DistributedRaidFileSystem now handles ChecksumException
     correctly. (Ramkumar Vadali via schen)
 
 Release 0.21.1 - Unreleased

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java Tue Oct 12 18:23:36 2010
@@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.raid.Decoder;
 import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.XORDecoder;
 import org.apache.hadoop.hdfs.BlockMissingException;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
@@ -94,7 +96,7 @@ public class DistributedRaidFileSystem e
     }
 
     // find stripe length configured
-    stripeLength = conf.getInt("hdfs.raid.stripeLength", RaidNode.DEFAULT_STRIPE_LENGTH);
+    stripeLength = RaidNode.getStripeLength(conf);
     if (stripeLength == 0) {
       LOG.info("dfs.raid.stripeLength is incorrectly defined to be " + 
                stripeLength + " Ignoring...");
@@ -343,9 +345,10 @@ public class DistributedRaidFileSystem e
             clientConf.set("fs.hdfs.impl", clazz.getName());
             // Disable caching so that a previously cached RaidDfs is not used.
             clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-            Path npath = RaidNode.unRaid(clientConf, path,
-                         alternates[idx], stripeLength,
-                         corruptOffset);
+            Decoder decoder =
+              new XORDecoder(clientConf, RaidNode.getStripeLength(clientConf));
+            Path npath = RaidNode.unRaid(clientConf, path, alternates[idx],
+                              decoder, stripeLength, corruptOffset);
             FileSystem fs1 = getUnderlyingFileSystem(conf);
             fs1.initialize(npath.toUri(), conf);
             LOG.info("Opening alternate path " + npath + " at offset " + curpos);

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java Tue Oct 12 18:23:36 2010
@@ -56,6 +56,10 @@ class ConfigManager {
 
   public static final long HAR_PARTFILE_SIZE = 10 * 1024 * 1024 * 1024l;
   
+  public static final int DISTRAID_MAX_JOBS = 10;
+
+  public static final int DISTRAID_MAX_FILES = 10000;
+
   /**
    * Time to wait after the config file has been modified before reloading it
    * (this is done to prevent loading a file that hasn't been fully written).
@@ -71,6 +75,9 @@ class ConfigManager {
   private long reloadInterval = RELOAD_INTERVAL;
   private long periodicity; // time between runs of all policies
   private long harPartfileSize;
+  private int maxJobsPerPolicy; // Max no. of jobs running simultaneously for
+                                // a job.
+  private int maxFilesPerJob; // Max no. of files raided by a job.
 
   // Reload the configuration
   private boolean doReload;
@@ -88,6 +95,10 @@ class ConfigManager {
     this.reloadInterval = conf.getLong("raid.config.reload.interval", RELOAD_INTERVAL);
     this.periodicity = conf.getLong("raid.policy.rescan.interval",  RESCAN_INTERVAL);
     this.harPartfileSize = conf.getLong("raid.har.partfile.size", HAR_PARTFILE_SIZE);
+    this.maxJobsPerPolicy = conf.getInt("raid.distraid.max.jobs",
+                                        DISTRAID_MAX_JOBS);
+    this.maxFilesPerJob = conf.getInt("raid.distraid.max.files",
+                                      DISTRAID_MAX_FILES);
     if (configFileName == null) {
       String msg = "No raid.config.file given in conf - " +
                    "the Hadoop Raid utility cannot run. Aborting....";
@@ -306,6 +317,14 @@ class ConfigManager {
     return harPartfileSize;
   }
   
+  public synchronized int getMaxJobsPerPolicy() {
+    return maxJobsPerPolicy;
+  }
+
+  public synchronized int getMaxFilesPerJob() {
+    return maxFilesPerJob;
+  }
+
   /**
    * Get a collection of all policies
    */

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Represents a generic decoder that can be used to read a file with
+ * corrupt blocks by using the parity file.
+ * This is an abstract class, concrete subclasses need to implement
+ * fixErasedBlock.
+ */
+public abstract class Decoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.Decoder");
+  protected Configuration conf;
+  protected int stripeSize;
+  protected int paritySize;
+  protected Random rand;
+  protected int bufSize;
+  protected byte[][] readBufs;
+  protected byte[][] writeBufs;
+
+  Decoder(Configuration conf, int stripeSize, int paritySize) {
+    this.conf = conf;
+    this.stripeSize = stripeSize;
+    this.paritySize = paritySize;
+    this.rand = new Random();
+    this.bufSize = conf.getInt("raid.decoder.bufsize", 1024 * 1024);
+    this.readBufs = new byte[stripeSize + paritySize][];
+    this.writeBufs = new byte[paritySize][];
+    allocateBuffers();
+  }
+
+  private void allocateBuffers() {
+    for (int i = 0; i < stripeSize + paritySize; i++) {
+      readBufs[i] = new byte[bufSize];
+    }
+    for (int i = 0; i < paritySize; i++) {
+      writeBufs[i] = new byte[bufSize];
+    }
+  }
+
+  private void configureBuffers(long blockSize) {
+    if ((long)bufSize > blockSize) {
+      bufSize = (int)blockSize;
+      allocateBuffers();
+    } else if (blockSize % bufSize != 0) {
+      bufSize = (int)(blockSize / 256L); // heuristic.
+      if (bufSize == 0) {
+        bufSize = 1024;
+      }
+      bufSize = Math.min(bufSize, 1024 * 1024);
+      allocateBuffers();
+    }
+  }
+
+  /**
+   * The interface to generate a decoded file using the good portion of the
+   * source file and the parity file.
+   * @param fs The filesystem containing the source file.
+   * @param srcFile The damaged source file.
+   * @param parityFs The filesystem containing the parity file. This could be
+   *        different from fs in case the parity file is part of a HAR archive.
+   * @param parityFile The parity file.
+   * @param errorOffset Known location of error in the source file. There could
+   *        be additional errors in the source file that are discovered during
+   *        the decode process.
+   * @param decodedFile The decoded file. This will have the exact same contents
+   *        as the source file on success.
+   */
+  public void decodeFile(
+    FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+    long errorOffset, Path decodedFile) throws IOException {
+
+    LOG.info("Create " + decodedFile + " for error at " +
+            srcFile + ":" + errorOffset);
+    FileStatus srcStat = fs.getFileStatus(srcFile);
+    long blockSize = srcStat.getBlockSize();
+    configureBuffers(blockSize);
+    // Move the offset to the start of the block.
+    errorOffset = (errorOffset / blockSize) * blockSize;
+
+    // Create the decoded file.
+    FSDataOutputStream out = fs.create(
+      decodedFile, false, conf.getInt("io.file.buffer.size", 64 * 1024),
+      srcStat.getReplication(), srcStat.getBlockSize());
+
+    // Open the source file.
+    FSDataInputStream in = fs.open(
+      srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+
+    // Start copying data block-by-block.
+    for (long offset = 0; offset < srcStat.getLen(); offset += blockSize) {
+      long limit = Math.min(blockSize, srcStat.getLen() - offset);
+      long bytesAlreadyCopied = 0;
+      if (offset != errorOffset) {
+        try {
+          in = fs.open(
+            srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+          in.seek(offset);
+          RaidUtils.copyBytes(in, out, readBufs[0], limit);
+          assert(out.getPos() == offset +limit);
+          LOG.info("Copied till " + out.getPos() + " from " + srcFile);
+          continue;
+        } catch (BlockMissingException e) {
+          LOG.info("Encountered BME at " + srcFile + ":" + offset);
+          bytesAlreadyCopied = out.getPos() - offset;
+        } catch (ChecksumException e) {
+          LOG.info("Encountered CE at " + srcFile + ":" + offset);
+          bytesAlreadyCopied = out.getPos() - offset;
+        }
+      }
+      // If we are here offset == errorOffset or we got an exception.
+      // Recover the block starting at offset.
+      fixErasedBlock(fs, srcFile, parityFs, parityFile, blockSize, offset,
+        bytesAlreadyCopied, limit, out);
+    }
+    out.close();
+
+    try {
+      fs.setOwner(decodedFile, srcStat.getOwner(), srcStat.getGroup());
+      fs.setPermission(decodedFile, srcStat.getPermission());
+      fs.setTimes(decodedFile, srcStat.getModificationTime(),
+                  srcStat.getAccessTime());
+    } catch (Exception exc) {
+      LOG.info("Didn't manage to copy meta information because of " + exc +
+               " Ignoring...");
+    }
+
+  }
+
+  /**
+   * Recovers a corrupt block to local file.
+   *
+   * @param srcFs The filesystem containing the source file.
+   * @param srcPath The damaged source file.
+   * @param parityPath The filesystem containing the parity file. This could be
+   *        different from fs in case the parity file is part of a HAR archive.
+   * @param parityFile The parity file.
+   * @param blockSize The block size of the file.
+   * @param blockOffset Known location of error in the source file. There could
+   *        be additional errors in the source file that are discovered during
+   *        the decode process.
+   * @param localBlockFile The file to write the block to.
+   * @param limit The maximum number of bytes to be written out.
+   *              This is to prevent writing beyond the end of the file.
+   */
+  public void recoverBlockToFile(
+    FileSystem srcFs, Path srcPath, FileSystem parityFs, Path parityPath,
+    long blockSize, long blockOffset, File localBlockFile, long limit)
+    throws IOException {
+    OutputStream out = new FileOutputStream(localBlockFile);
+    fixErasedBlock(srcFs, srcPath, parityFs, parityPath,
+                  blockSize, blockOffset, 0, limit, out);
+    out.close();
+  }
+
+  /**
+   * Implementation-specific mechanism of writing a fixed block.
+   * @param fs The filesystem containing the source file.
+   * @param srcFile The damaged source file.
+   * @param parityFs The filesystem containing the parity file. This could be
+   *        different from fs in case the parity file is part of a HAR archive.
+   * @param parityFile The parity file.
+   * @param blockSize The maximum size of a block.
+   * @param errorOffset Known location of error in the source file. There could
+   *        be additional errors in the source file that are discovered during
+   *        the decode process.
+   * @param bytesToSkip After the block is generated, these many bytes should be
+   *       skipped before writing to the output. This is needed because the
+   *       output may have a portion of the block written from the source file
+   *       before a new corruption is discovered in the block.
+   * @param limit The maximum number of bytes to be written out, including
+   *       bytesToSkip. This is to prevent writing beyond the end of the file.
+   * @param out The output.
+   */
+  protected abstract void fixErasedBlock(
+      FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+      long blockSize, long errorOffset, long bytesToSkip, long limit,
+      OutputStream out) throws IOException;
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Implements depth-first traversal using a Stack object. The traversal
+ * can be stopped at any time and the state of traversal is saved.
+ */
+public class DirectoryTraversal {
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.raid.DirectoryTraversal");
+
+  private FileSystem fs;
+  private List<FileStatus> paths;
+  private int pathIdx = 0;  // Next path to process.
+  private Stack<Node> stack = new Stack<Node>();
+
+  /**
+   * Represents a directory node in directory traversal.
+   */
+  static class Node {
+    private FileStatus path;  // Path that this node represents.
+    private FileStatus[] elements;  // Elements in the node.
+    private int idx = 0;
+
+    public Node(FileStatus path, FileStatus[] elements) {
+      this.path = path;
+      this.elements = elements;
+    }
+
+    public boolean hasNext() {
+      return idx < elements.length;
+    }
+
+    public FileStatus next() {
+      return elements[idx++];
+    }
+
+    public FileStatus path() {
+      return this.path;
+    }
+  }
+
+  /**
+   * Constructor.
+   * @param fs The filesystem to use.
+   * @param startPaths A list of paths that need to be traversed
+   */
+  public DirectoryTraversal(FileSystem fs, List<FileStatus> startPaths) {
+    this.fs = fs;
+    paths = startPaths;
+    pathIdx = 0;
+  }
+
+  /**
+   * Choose some files to RAID.
+   * @param conf Configuration to use.
+   * @param raidDestPrefix Prefix of the path to RAID to.
+   * @param modTimePeriod Time gap before RAIDing.
+   * @param limit Limit on the number of files to choose.
+   * @return list of files to RAID.
+   * @throws IOException
+   */
+  public List<FileStatus> selectFilesToRaid(
+      Configuration conf, int targetRepl, Path raidDestPrefix,
+      long modTimePeriod, int limit) throws IOException {
+    List<FileStatus> selected = new LinkedList<FileStatus>();
+    int numSelected = 0;
+
+    long now = System.currentTimeMillis();
+    while (numSelected < limit) {
+      FileStatus next = getNextFile();
+      if (next == null) {
+        break;
+      }
+      // We have the next file, do we want to select it?
+      // If the source file has fewer than or equal to 2 blocks, then skip it.
+      long blockSize = next.getBlockSize();
+      if (2 * blockSize >= next.getLen()) {
+        continue;
+      }
+
+      boolean select = false;
+      try {
+        Object ppair = RaidNode.getParityFile(
+            raidDestPrefix, next.getPath(), conf);
+        // Is there is a valid parity file?
+        if (ppair != null) {
+          // Is the source at the target replication?
+          if (next.getReplication() != targetRepl) {
+            // Select the file so that its replication can be set.
+            select = true;
+          } else {
+            // Nothing to do, don't select the file.
+            select = false;
+          }
+        } else if (next.getModificationTime() + modTimePeriod < now) {
+          // If there isn't a valid parity file, check if the file is too new.
+          select = true;
+        }
+      } catch (java.io.FileNotFoundException e) {
+        select = true; // destination file does not exist
+      }
+      if (select) {
+        selected.add(next);
+        numSelected++;
+      }
+    }
+
+    return selected;
+  }
+
+  /**
+   * Return the next file.
+   * @throws IOException
+   */
+  public FileStatus getNextFile() throws IOException {
+    // Check if traversal is done.
+    while (!doneTraversal()) {
+      // If traversal is not done, check if the stack is not empty.
+      while (!stack.isEmpty()) {
+        // If the stack is not empty, look at the top node.
+        Node node = stack.peek();
+        // Check if the top node has an element.
+        if (node.hasNext()) {
+          FileStatus element = node.next();
+          // Is the next element a directory.
+          if (!element.isDir()) {
+            // It is a file, return it.
+            return element;
+          }
+          // Next element is a directory, push it on to the stack and
+          // continue
+          try {
+            pushNewNode(element);
+          } catch (FileNotFoundException e) {
+            // Ignore and move to the next element.
+          }
+          continue;
+        } else {
+          // Top node has no next element, pop it and continue.
+          stack.pop();
+          continue;
+        }
+      }
+      // If the stack is empty, do we have more paths?
+      while (!paths.isEmpty()) {
+        FileStatus next = paths.remove(0);
+        pathIdx++;
+        if (!next.isDir()) {
+          return next;
+        }
+        try {
+          pushNewNode(next);
+        } catch (FileNotFoundException e) {
+          continue;
+        }
+        break;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets the next directory in the tree. The algorithm returns deeper directories
+   * first.
+   * @return A FileStatus representing the directory.
+   * @throws IOException
+   */
+  public FileStatus getNextDirectory() throws IOException {
+    // Check if traversal is done.
+    while (!doneTraversal()) {
+      // If traversal is not done, check if the stack is not empty.
+      while (!stack.isEmpty()) {
+        // If the stack is not empty, look at the top node.
+        Node node = stack.peek();
+        // Check if the top node has an element.
+        if (node.hasNext()) {
+          FileStatus element = node.next();
+          // Is the next element a directory.
+          if (element.isDir()) {
+            // Next element is a directory, push it on to the stack and
+            // continue
+            try {
+              pushNewNode(element);
+            } catch (FileNotFoundException e) {
+              // Ignore and move to the next element.
+            }
+            continue;
+          }
+        } else {
+          stack.pop();
+          return node.path;
+        }
+      }
+      // If the stack is empty, do we have more paths?
+      while (!paths.isEmpty()) {
+        FileStatus next = paths.remove(0);
+        pathIdx++;
+        if (next.isDir()) {
+          try {
+            pushNewNode(next);
+          } catch (FileNotFoundException e) {
+            continue;
+          }
+          break;
+        }
+      }
+    }
+    return null;
+  }
+
+  private void pushNewNode(FileStatus stat) throws IOException {
+    if (!stat.isDir()) {
+      return;
+    }
+    Path p = stat.getPath();
+    LOG.info("Traversing to directory " + p);
+    FileStatus[] elements = fs.listStatus(p);
+    Node newNode = new Node(stat, (elements == null? new FileStatus[0]: elements));
+    stack.push(newNode);
+  }
+
+  public boolean doneTraversal() {
+    return paths.isEmpty() && stack.isEmpty();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Tue Oct 12 18:23:36 2010
@@ -34,17 +34,21 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.raid.RaidNode.Statistics;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.util.StringUtils;
@@ -111,6 +115,11 @@ public class DistRaid {
 
   List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
 
+  private JobClient jobClient;
+  private RunningJob runningJob;
+  private int jobEventCounter = 0;
+  private String lastReport = null;
+
   /** Responsible for generating splits of the src file list. */
   static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
     /** Do nothing. */
@@ -184,6 +193,7 @@ public class DistRaid {
     private int failcount = 0;
     private int succeedcount = 0;
     private Statistics st = null;
+    private Reporter reporter = null;
 
     private String getCountString() {
       return "Succeeded: " + succeedcount + " Failed: " + failcount;
@@ -200,6 +210,7 @@ public class DistRaid {
     public void map(Text key, PolicyInfo policy,
         OutputCollector<WritableComparable, Text> out, Reporter reporter)
         throws IOException {
+      this.reporter = reporter;
       try {
         LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
         Path p = new Path(key.toString());
@@ -268,29 +279,70 @@ public class DistRaid {
   private static int getMapCount(int srcCount, int numNodes) {
     int numMaps = (int) (srcCount / OP_PER_MAP);
     numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
-    return Math.max(numMaps, 1);
+    return Math.max(numMaps, MAX_MAPS_PER_NODE);
   }
 
-  /** invokes mapred job do parallel raiding */
-  public void doDistRaid() throws IOException {
-    if (raidPolicyPathPairList.size() == 0) {
-      LOG.info("DistRaid has no paths to raid.");
-      return;
-    }
-    try {
-      if (setup()) {
-        JobClient.runJob(jobconf);
-      }
-    } finally {
-      // delete job directory
-      final String jobdir = jobconf.get(JOB_DIR_LABEL);
-      if (jobdir != null) {
-        final Path jobpath = new Path(jobdir);
-        jobpath.getFileSystem(jobconf).delete(jobpath, true);
-      }
-    }
-    raidPolicyPathPairList.clear();
-  }
+  /** Invokes a map-reduce job do parallel raiding.
+   *  @return true if the job was started, false otherwise
+   */
+  public boolean startDistRaid() throws IOException {
+    assert(raidPolicyPathPairList.size() > 0);
+    if (setup()) {
+      this.jobClient = new JobClient(jobconf);
+      this.runningJob = this.jobClient.submitJob(jobconf);
+      LOG.info("Job Started: " + runningJob.getID());
+      return true;
+    }
+    return false;
+  }
+
+   /** Checks if the map-reduce job has completed.
+    *
+    * @return true if the job completed, false otherwise.
+    * @throws IOException
+    */
+   public boolean checkComplete() throws IOException {
+     JobID jobID = runningJob.getID();
+     if (runningJob.isComplete()) {
+       // delete job directory
+       final String jobdir = jobconf.get(JOB_DIR_LABEL);
+       if (jobdir != null) {
+         final Path jobpath = new Path(jobdir);
+         jobpath.getFileSystem(jobconf).delete(jobpath, true);
+       }
+       if (runningJob.isSuccessful()) {
+         LOG.info("Job Complete(Succeeded): " + jobID);
+       } else {
+         LOG.info("Job Complete(Failed): " + jobID);
+       }
+       raidPolicyPathPairList.clear();
+       Counters ctrs = runningJob.getCounters();
+       long filesRaided = ctrs.findCounter(Counter.FILES_SUCCEEDED).getValue();
+       long filesFailed = ctrs.findCounter(Counter.FILES_FAILED).getValue();
+       return true;
+     } else {
+       String report =  (" job " + jobID +
+         " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
+         " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
+       if (!report.equals(lastReport)) {
+         LOG.info(report);
+         lastReport = report;
+       }
+       TaskCompletionEvent[] events =
+         runningJob.getTaskCompletionEvents(jobEventCounter);
+       jobEventCounter += events.length;
+       for(TaskCompletionEvent event : events) {
+         if (event.getTaskStatus() ==  TaskCompletionEvent.Status.FAILED) {
+           LOG.info(" Job " + jobID + " " + event.toString());
+         }
+       }
+       return false;
+     }
+   }
+
+   public boolean successful() throws IOException {
+     return runningJob.isSuccessful();
+   }
 
   /**
    * set up input file which has the list of input files.

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Represents a generic encoder that can generate a parity file for a source
+ * file.
+ * This is an abstract class, concrete subclasses need to implement
+ * encodeFileImpl.
+ */
+public abstract class Encoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.Encoder");
+  protected Configuration conf;
+  protected int stripeSize;
+  protected int paritySize;
+  protected Random rand;
+  protected int bufSize;
+  protected byte[][] readBufs;
+  protected byte[][] writeBufs;
+
+  /**
+   * A class that acts as a sink for data, similar to /dev/null.
+   */
+  static class NullOutputStream extends OutputStream {
+    public void write(byte[] b) throws IOException {}
+    public void write(int b) throws IOException {}
+    public void write(byte[] b, int off, int len) throws IOException {}
+  }
+
+  Encoder(
+    Configuration conf, int stripeSize, int paritySize) {
+    this.conf = conf;
+    this.stripeSize = stripeSize;
+    this.paritySize = paritySize;
+    this.rand = new Random();
+    this.bufSize = conf.getInt("raid.encoder.bufsize", 1024 * 1024);
+    this.readBufs = new byte[stripeSize][];
+    this.writeBufs = new byte[paritySize][];
+    allocateBuffers();
+  }
+
+  private void allocateBuffers() {
+    for (int i = 0; i < stripeSize; i++) {
+      readBufs[i] = new byte[bufSize];
+    }
+    for (int i = 0; i < paritySize; i++) {
+      writeBufs[i] = new byte[bufSize];
+    }
+  }
+
+  private void configureBuffers(long blockSize) {
+    if ((long)bufSize > blockSize) {
+      bufSize = (int)blockSize;
+      allocateBuffers();
+    } else if (blockSize % bufSize != 0) {
+      bufSize = (int)(blockSize / 256L); // heuristic.
+      if (bufSize == 0) {
+        bufSize = 1024;
+      }
+      bufSize = Math.min(bufSize, 1024 * 1024);
+      allocateBuffers();
+    }
+  }
+
+  /**
+   * The interface to use to generate a parity file.
+   * This method can be called multiple times with the same Encoder object,
+   * thus allowing reuse of the buffers allocated by the Encoder object.
+   *
+   * @param fs The filesystem containing the source file.
+   * @param srcFile The source file.
+   * @param parityFile The parity file to be generated.
+   */
+  public void encodeFile(FileSystem fs, Path srcFile, Path parityFile,
+    short parityRepl, Progressable reporter) throws IOException {
+    FileStatus srcStat = fs.getFileStatus(srcFile);
+    long srcSize = srcStat.getLen();
+    long blockSize = srcStat.getBlockSize();
+
+    configureBuffers(blockSize);
+
+    // Create a tmp file to which we will write first.
+    Path parityTmp = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
+                              parityFile.toUri().getPath() +
+                              "." + rand.nextLong() + ".tmp");
+    FSDataOutputStream out = fs.create(
+                               parityTmp,
+                               true,
+                               conf.getInt("io.file.buffer.size", 64 * 1024),
+                               parityRepl,
+                               blockSize);
+
+    try {
+      encodeFileToStream(fs, srcFile, srcSize, blockSize, out, reporter);
+      out.close();
+      out = null;
+      LOG.info("Wrote temp parity file " + parityTmp);
+
+      // delete destination if exists
+      if (fs.exists(parityFile)){
+        fs.delete(parityFile, false);
+      }
+      fs.mkdirs(parityFile.getParent());
+      if (!fs.rename(parityTmp, parityFile)) {
+        String msg = "Unable to rename file " + parityTmp + " to " + parityFile;
+        throw new IOException (msg);
+      }
+      LOG.info("Wrote parity file " + parityFile);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+      fs.delete(parityTmp, false);
+    }
+  }
+
+  /**
+   * Recovers a corrupt block in a parity file to a local file.
+   *
+   * The encoder generates paritySize parity blocks for a source file stripe.
+   * Since we want only one of the parity blocks, this function creates
+   * null outputs for the blocks to be discarded.
+   *
+   * @param fs The filesystem in which both srcFile and parityFile reside.
+   * @param srcFile The source file.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source/parity files.
+   * @param corruptOffset The location of corruption in the parity file.
+   * @param localBlockFile The destination for the reovered block.
+   */
+  public void recoverParityBlockToFile(
+    FileSystem fs,
+    Path srcFile, long srcSize, long blockSize,
+    Path parityFile, long corruptOffset,
+    File localBlockFile) throws IOException {
+    OutputStream out = new FileOutputStream(localBlockFile);
+    try {
+      recoverParityBlockToStream(fs, srcFile, srcSize, blockSize, parityFile,
+        corruptOffset, out);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Recovers a corrupt block in a parity file to a local file.
+   *
+   * The encoder generates paritySize parity blocks for a source file stripe.
+   * Since we want only one of the parity blocks, this function creates
+   * null outputs for the blocks to be discarded.
+   *
+   * @param fs The filesystem in which both srcFile and parityFile reside.
+   * @param srcFile The source file.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source/parity files.
+   * @param corruptOffset The location of corruption in the parity file.
+   * @param out The destination for the reovered block.
+   */
+  public void recoverParityBlockToStream(
+    FileSystem fs,
+    Path srcFile, long srcSize, long blockSize,
+    Path parityFile, long corruptOffset,
+    OutputStream out) throws IOException {
+    LOG.info("Recovering parity block" + parityFile + ":" + corruptOffset);
+    // Get the start offset of the corrupt block.
+    corruptOffset = (corruptOffset / blockSize) * blockSize;
+    // Output streams to each block in the parity file stripe.
+    OutputStream[] outs = new OutputStream[paritySize];
+    long indexOfCorruptBlockInParityStripe =
+      (corruptOffset / blockSize) % paritySize;
+    LOG.info("Index of corrupt block in parity stripe: " +
+              indexOfCorruptBlockInParityStripe);
+    // Create a real output stream for the block we want to recover,
+    // and create null streams for the rest.
+    for (int i = 0; i < paritySize; i++) {
+      if (indexOfCorruptBlockInParityStripe == i) {
+        outs[i] = out;
+      } else {
+        outs[i] = new NullOutputStream();
+      }
+    }
+    // Get the stripe index and start offset of stripe.
+    long stripeIdx = corruptOffset / (paritySize * blockSize);
+    long stripeStart = stripeIdx * blockSize * stripeSize;
+
+    // Get input streams to each block in the source file stripe.
+    InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+        srcSize, blockSize);
+    LOG.info("Starting recovery by using source stripe " +
+              srcFile + ":" + stripeStart);
+    // Read the data from the blocks and write to the parity file.
+    encodeStripe(blocks, stripeStart, blockSize, outs, Reporter.NULL);
+  }
+
+  /**
+   * Recovers a corrupt block in a parity file to an output stream.
+   *
+   * The encoder generates paritySize parity blocks for a source file stripe.
+   * Since there is only one output provided, some blocks are written out to
+   * files before being written out to the output.
+   *
+   * @param fs The filesystem in which both srcFile and parityFile reside.
+   * @param srcFile The source file.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source/parity files.
+   * @param out The destination for the reovered block.
+   */
+  private void encodeFileToStream(FileSystem fs, Path srcFile, long srcSize,
+    long blockSize, OutputStream out, Progressable reporter) throws IOException {
+    OutputStream[] tmpOuts = new OutputStream[paritySize];
+    // One parity block can be written directly to out, rest to local files.
+    tmpOuts[0] = out;
+    File[] tmpFiles = new File[paritySize - 1];
+    for (int i = 0; i < paritySize - 1; i++) {
+      tmpFiles[i] = File.createTempFile("parity", "_" + i);
+      LOG.info("Created tmp file " + tmpFiles[i]);
+      tmpFiles[i].deleteOnExit();
+    }
+    try {
+      // Loop over stripes in the file.
+      for (long stripeStart = 0; stripeStart < srcSize;
+          stripeStart += blockSize * stripeSize) {
+        reporter.progress();
+        LOG.info("Starting encoding of stripe " + srcFile + ":" + stripeStart);
+        // Create input streams for blocks in the stripe.
+        InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+          srcSize, blockSize);
+        // Create output streams to the temp files.
+        for (int i = 0; i < paritySize - 1; i++) {
+          tmpOuts[i + 1] = new FileOutputStream(tmpFiles[i]);
+        }
+        // Call the implementation of encoding.
+        encodeStripe(blocks, stripeStart, blockSize, tmpOuts, reporter);
+        // Close output streams to the temp files and write the temp files
+        // to the output provided.
+        for (int i = 0; i < paritySize - 1; i++) {
+          tmpOuts[i + 1].close();
+          tmpOuts[i + 1] = null;
+          InputStream in  = new FileInputStream(tmpFiles[i]);
+          RaidUtils.copyBytes(in, out, writeBufs[i], blockSize);
+          reporter.progress();
+        }
+      }
+    } finally {
+      for (int i = 0; i < paritySize - 1; i++) {
+        if (tmpOuts[i + 1] != null) {
+          tmpOuts[i + 1].close();
+        }
+        tmpFiles[i].delete();
+        LOG.info("Deleted tmp file " + tmpFiles[i]);
+      }
+    }
+  }
+
+  /**
+   * Return input streams for each block in a source file's stripe.
+   * @param fs The filesystem where the file resides.
+   * @param srcFile The source file.
+   * @param stripeStartOffset The start offset of the stripe.
+   * @param srcSize The size of the source file.
+   * @param blockSize The block size for the source file.
+   */
+  protected InputStream[] stripeInputs(
+    FileSystem fs,
+    Path srcFile,
+    long stripeStartOffset,
+    long srcSize,
+    long blockSize
+    ) throws IOException {
+    InputStream[] blocks = new InputStream[stripeSize];
+    for (int i = 0; i < stripeSize; i++) {
+      long seekOffset = stripeStartOffset + i * blockSize;
+      if (seekOffset < srcSize) {
+        FSDataInputStream in = fs.open(
+                   srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+        in.seek(seekOffset);
+        LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
+        blocks[i] = in;
+      } else {
+        LOG.info("Using zeros at offset " + seekOffset);
+        // We have no src data at this offset.
+        blocks[i] = new RaidUtils.ZeroInputStream(
+                          seekOffset + blockSize);
+      }
+    }
+    return blocks;
+  }
+
+  /**
+   * The implementation of generating parity data for a stripe.
+   *
+   * @param blocks The streams to blocks in the stripe.
+   * @param srcFile The source file.
+   * @param stripeStartOffset The start offset of the stripe
+   * @param blockSize The maximum size of a block.
+   * @param outs output streams to the parity blocks.
+   * @param reporter progress indicator.
+   */
+  protected abstract void encodeStripe(
+    InputStream[] blocks,
+    long stripeStartOffset,
+    long blockSize,
+    OutputStream[] outs,
+    Progressable reporter) throws IOException;
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Periodically monitors the status of jobs registered with it.
+ *
+ * Jobs that are submitted for the same policy name are kept in the same list,
+ * and the list itself is kept in a map that has the policy name as the key and
+ * the list as value.
+ */
+class JobMonitor implements Runnable {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.JobMonitor");
+
+  volatile boolean running = true;
+
+  private Map<String, List<DistRaid>> jobs;
+  private long jobMonitorInterval;
+  private volatile long jobsMonitored = 0;
+  private volatile long jobsSucceeded = 0;
+
+  public JobMonitor(Configuration conf) {
+    jobMonitorInterval = conf.getLong("raid.jobmonitor.interval", 60000);
+    jobs = new java.util.HashMap<String, List<DistRaid>>();
+  }
+
+  public void run() {
+    while (running) {
+      try {
+        LOG.info("JobMonitor thread continuing to run...");
+        doMonitor();
+      } catch (Throwable e) {
+        LOG.error("JobMonitor encountered exception " +
+          StringUtils.stringifyException(e));
+        // All expected exceptions are caught by doMonitor(). It is better
+        // to exit now, this will prevent RaidNode from submitting more jobs
+        // since the number of running jobs will never decrease.
+        return;
+      }
+    }
+  }
+
+  /**
+   * Periodically checks status of running map-reduce jobs.
+   */
+  public void doMonitor() {
+    while (running) {
+      String[] keys = null;
+      // Make a copy of the names of the current jobs.
+      synchronized(jobs) {
+        keys = jobs.keySet().toArray(new String[0]);
+      }
+
+      // Check all the jobs. We do not want to block access to `jobs`
+      // because that will prevent new jobs from being added.
+      // This is safe because JobMonitor.run is the only code that can
+      // remove a job from `jobs`. Thus all elements in `keys` will have
+      // valid values.
+      Map<String, List<DistRaid>> finishedJobs =
+        new HashMap<String, List<DistRaid>>();
+
+      for (String key: keys) {
+        // For each policy being monitored, get the list of jobs running.
+        DistRaid[] jobListCopy = null;
+        synchronized(jobs) {
+          List<DistRaid> jobList = jobs.get(key);
+          synchronized(jobList) {
+            jobListCopy = jobList.toArray(new DistRaid[jobList.size()]);
+          }
+        }
+        // The code that actually contacts the JobTracker is not synchronized,
+        // it uses copies of the list of jobs.
+        for (DistRaid job: jobListCopy) {
+          // Check each running job.
+          try {
+            boolean complete = job.checkComplete();
+            if (complete) {
+              addJob(finishedJobs, key, job);
+              if (job.successful()) {
+                jobsSucceeded++;
+              }
+            }
+          } catch (IOException ioe) {
+            // If there was an error, consider the job finished.
+            addJob(finishedJobs, key, job);
+          }
+        }
+      }
+
+      if (finishedJobs.size() > 0) {
+        for (String key: finishedJobs.keySet()) {
+          List<DistRaid> finishedJobList = finishedJobs.get(key);
+          // Iterate through finished jobs and remove from jobs.
+          // removeJob takes care of locking.
+          for (DistRaid job: finishedJobList) {
+            removeJob(jobs, key, job);
+          }
+        }
+      }
+
+      try {
+        Thread.sleep(jobMonitorInterval);
+      } catch (InterruptedException ie) {
+      }
+    }
+  }
+
+  public int runningJobsCount(String key) {
+    int count = 0;
+    synchronized(jobs) {
+      if (jobs.containsKey(key)) {
+        List<DistRaid> jobList = jobs.get(key);
+        synchronized(jobList) {
+          count = jobList.size();
+        }
+      }
+    }
+    return count;
+  }
+
+  public void monitorJob(String key, DistRaid job) {
+    addJob(jobs, key, job);
+    jobsMonitored++;
+  }
+
+  public long jobsMonitored() {
+    return this.jobsMonitored;
+  }
+
+  public long jobsSucceeded() {
+    return this.jobsSucceeded;
+  }
+
+  private static void addJob(Map<String, List<DistRaid>> jobsMap,
+                              String jobName, DistRaid job) {
+    synchronized(jobsMap) {
+      List<DistRaid> list = null;
+      if (jobsMap.containsKey(jobName)) {
+        list = jobsMap.get(jobName);
+      } else {
+        list = new LinkedList<DistRaid>();
+        jobsMap.put(jobName, list);
+      }
+      synchronized(list) {
+        list.add(job);
+      }
+    }
+  }
+
+  private static void removeJob(Map<String, List<DistRaid>> jobsMap,
+                                  String jobName, DistRaid job) {
+    synchronized(jobsMap) {
+      if (jobsMap.containsKey(jobName)) {
+        List<DistRaid> list = jobsMap.get(jobName);
+        synchronized(list) {
+          for (Iterator<DistRaid> it = list.iterator(); it.hasNext(); ) {
+            DistRaid val = it.next();
+            if (val == job) {
+              it.remove();
+            }
+          }
+          if (list.size() == 0) {
+            jobsMap.remove(jobName);
+          }
+        }
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Wraps over multiple input streams and provides an input stream that is
+ * an XOR of the streams.
+ */
+class ParityInputStream extends InputStream {
+  private static final int DEFAULT_BUFSIZE = 5*1024*1024;
+  private InputStream[] streams;
+  private byte[] xor;
+  private byte[] buf;
+  private int bufSize;
+  private long remaining;
+  private int available = 0;
+  private int readPos = 0;
+
+  public ParityInputStream(
+      InputStream[] streams, long parityBlockSize, byte[] buf, byte[] xor) {
+    assert buf.length == xor.length;
+    bufSize = buf.length;
+    this.streams = streams;
+    remaining = parityBlockSize;
+    this.buf = buf;
+    this.xor = xor;
+  }
+  
+  @Override
+  public int read() throws IOException {
+    makeAvailable();
+    if (available == 0) {
+      return -1;
+    }
+    int ret = xor[readPos];
+    readPos++;
+    available--;
+    return ret;
+  }
+  
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    makeAvailable();
+    if (available == 0) {
+      return -1;
+    }
+    int ret = Math.min(len, available);
+    for (int i = 0; i < ret; ++i) {
+      b[off+i] = xor[readPos+i];
+    }
+    readPos += ret;
+    available -= ret;
+    return ret;
+  }
+
+  public void close() throws IOException {
+    for (InputStream i: streams) {
+      i.close();
+    }
+  }
+  
+  /**
+   * Send the contents of the stream to the sink.
+   * @param sink
+   * @param reporter
+   * @throws IOException
+   */
+  public void drain(OutputStream sink, Progressable reporter)
+          throws IOException {
+    
+    while (true) {
+      makeAvailable();
+      if (available == 0) {
+        break;
+      }
+      sink.write(xor, readPos, available);
+      available = 0;
+      if (reporter != null) {
+        reporter.progress();
+      }
+    }
+  }
+
+  /**
+   * Make some bytes available for reading in the internal buffer.
+   * @throws IOException
+   */
+  private void makeAvailable() throws IOException {
+    if (available > 0 || remaining <= 0) {
+      return;
+    }
+    // Read some bytes from the first stream.
+    int xorlen = (int)Math.min(remaining, bufSize);
+    readExact(streams[0], xor, xorlen);
+
+    // Read bytes from all the other streams and xor them.
+    for (int i = 1; i < streams.length; i++) {
+      readExact(streams[i], buf, xorlen);
+
+      for (int j = 0; j < xorlen; j++) {
+        xor[j] ^= buf[j];
+      }
+    }
+    
+    remaining -= xorlen;
+    available = xorlen;
+    readPos = 0;
+    readPos = 0;
+  }
+
+  private static void readExact(InputStream in, byte[] bufs, int toRead)
+  throws IOException {
+    int tread = 0;
+    while (tread < toRead) {
+      int read = in.read(bufs, tread, toRead - tread);
+      if (read == -1) {
+        // If the stream ends, fill in zeros.
+        Arrays.fill(bufs, tread, toRead, (byte)0);
+        tread = toRead;
+      } else {
+        tread += read;
+      }
+    }
+    assert tread == toRead;
+  }
+
+}
+



Mime
View raw message