hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1021873 [2/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/hdfs/ src/contrib/raid/src/test/org/apache/hadoo...
Date Tue, 12 Oct 2010 18:23:36 GMT
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Oct 12 18:23:36 2010
@@ -21,10 +21,12 @@ package org.apache.hadoop.raid;
 import java.io.IOException;
 import java.io.FileNotFoundException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.Iterator;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.HashSet;
@@ -73,7 +75,9 @@ public class RaidNode implements RaidPro
   public static final long SLEEP_TIME = 10000L; // 10 seconds
   public static final int DEFAULT_PORT = 60000;
   public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+  public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength";
   public static final String DEFAULT_RAID_LOCATION = "/raid";
+  public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
   public static final String HAR_SUFFIX = "_raid.har";
   
   /** RPC server */
@@ -101,6 +105,10 @@ public class RaidNode implements RaidPro
   /** Deamon thread to har raid directories */
   Daemon harThread = null;
 
+  /** Daemon thread to monitor distributed raid job progress */
+  JobMonitor jobMonitor = null;
+  Daemon jobMonitorThread = null;
+
   /** Do do distributed raiding */
   boolean isRaidLocal = false;
   
@@ -168,6 +176,7 @@ public class RaidNode implements RaidPro
     try {
       initialize(conf);
     } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
       this.stop();
       throw e;
     } catch (Exception e) {
@@ -193,6 +202,7 @@ public class RaidNode implements RaidPro
     try {
       if (server != null) server.join();
       if (triggerThread != null) triggerThread.join();
+      if (jobMonitorThread != null) jobMonitorThread.join();
       if (purgeThread != null) purgeThread.join();
     } catch (InterruptedException ie) {
       // do nothing
@@ -210,6 +220,8 @@ public class RaidNode implements RaidPro
     running = false;
     if (server != null) server.stop();
     if (triggerThread != null) triggerThread.interrupt();
+    if (jobMonitor != null) jobMonitor.running = false;
+    if (jobMonitorThread != null) jobMonitorThread.interrupt();
     if (purgeThread != null) purgeThread.interrupt();
   }
 
@@ -252,6 +264,10 @@ public class RaidNode implements RaidPro
     running = true;
     this.server.start(); // start RPC server
 
+    this.jobMonitor = new JobMonitor(conf);
+    this.jobMonitorThread = new Daemon(this.jobMonitor);
+    this.jobMonitorThread.start();
+
     // start the deamon thread to fire polcies appropriately
     this.triggerThread = new Daemon(new TriggerMonitor());
     this.triggerThread.start();
@@ -282,22 +298,15 @@ public class RaidNode implements RaidPro
     LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
     Path inputPath = new Path(inStr);
     Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
-    PolicyInfo info = findMatchingPolicy(srcPath);
-    if (info != null) {
-
-      // find stripe length from config
-      int stripeLength = getStripeLength(conf, info);
+    // find stripe length from config
+    int stripeLength = getStripeLength(conf);
 
-      // create destination path prefix
-      String destPrefix = getDestinationPath(conf, info);
-      Path destPath = new Path(destPrefix.trim());
-      FileSystem fs = FileSystem.get(destPath.toUri(), conf);
-      destPath = destPath.makeQualified(fs);
-
-      Path unraided = unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
-      if (unraided != null) {
-        return unraided.toString();
-      }
+    Path destPref = getDestinationPath(conf);
+    Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf));
+    Path unraided = unRaid(conf, srcPath, destPref, decoder,
+        stripeLength, corruptOffset);
+    if (unraided != null) {
+      return unraided.toString();
     }
     return null;
   }
@@ -306,6 +315,11 @@ public class RaidNode implements RaidPro
    * Periodically checks to see which policies should be fired.
    */
   class TriggerMonitor implements Runnable {
+
+    private Map<String, Long> scanTimes = new HashMap<String, Long>();
+    private Map<String, DirectoryTraversal> scanState =
+      new HashMap<String, DirectoryTraversal>();
+
     /**
      */
     public void run() {
@@ -320,6 +334,109 @@ public class RaidNode implements RaidPro
       }
     }
 
+    /**
+     * Should we select more files for a policy.
+     */
+    private boolean shouldSelectFiles(PolicyInfo info) {
+      String policyName = info.getName();
+      int runningJobsCount = jobMonitor.runningJobsCount(policyName);
+      // Is there a scan in progress for this policy?
+      if (scanState.containsKey(policyName)) {
+        int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
+
+        // If there is a scan in progress for this policy, we can have
+        // upto maxJobsPerPolicy running jobs.
+        return (runningJobsCount < maxJobsPerPolicy);
+      } else {
+        // If there isn't a scan in progress for this policy, we don't
+        // want to start a fresh scan if there is even one running job.
+        if (runningJobsCount >= 1) {
+          return false;
+        }
+        // Check the time of the last full traversal before starting a fresh
+        // traversal.
+        if (scanTimes.containsKey(policyName)) {
+          long lastScan = scanTimes.get(policyName);
+          return (now() > lastScan + configMgr.getPeriodicity());
+        } else {
+          return true;
+        }
+      }
+    }
+
+   /**
+    * Returns a list of pathnames that needs raiding.
+    * The list of paths could be obtained by resuming a previously suspended
+    * traversal.
+    * The number of paths returned is limited by raid.distraid.max.jobs.
+    */
+    private List<FileStatus> selectFiles(PolicyInfo info) throws IOException {
+      Path destPrefix = getDestinationPath(conf);
+      String policyName = info.getName();
+      Path srcPath = info.getSrcPath();
+      long modTimePeriod = 0;
+      String str = info.getProperty("modTimePeriod");
+      if (str != null) {
+         modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
+      }
+      short srcReplication = 0;
+      str = info.getProperty("srcReplication");
+      if (str != null) {
+        srcReplication = Short.parseShort(info.getProperty("srcReplication"));
+      }
+
+      // Max number of files returned.
+      int selectLimit = configMgr.getMaxFilesPerJob();
+      int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+
+      // If we have a pending traversal, resume it.
+      if (scanState.containsKey(policyName)) {
+        DirectoryTraversal dt = scanState.get(policyName);
+        LOG.info("Resuming traversal for policy " + policyName);
+        List<FileStatus> returnSet = dt.selectFilesToRaid(
+            conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+        if (dt.doneTraversal()) {
+          scanState.remove(policyName);
+        }
+        return returnSet;
+      }
+
+      // Expand destination prefix path.
+      String destpstr = destPrefix.toString();
+      if (!destpstr.endsWith(Path.SEPARATOR)) {
+        destpstr += Path.SEPARATOR;
+      }
+
+      List<FileStatus> returnSet = new LinkedList<FileStatus>();
+
+      FileSystem fs = srcPath.getFileSystem(conf);
+      FileStatus[] gpaths = fs.globStatus(srcPath);
+      if (gpaths != null) {
+        List<FileStatus> selectedPaths = new LinkedList<FileStatus>();
+        for (FileStatus onepath: gpaths) {
+          String pathstr = onepath.getPath().makeQualified(fs).toString();
+          if (!pathstr.endsWith(Path.SEPARATOR)) {
+            pathstr += Path.SEPARATOR;
+          }
+          if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
+            LOG.info("Skipping source " + pathstr +
+                     " because it conflicts with raid directory " + destpstr);
+          } else {
+            selectedPaths.add(onepath);
+          }
+        }
+
+        // Set the time for a new traversal.
+        scanTimes.put(policyName, now());
+        DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
+        returnSet = dt.selectFilesToRaid(
+            conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+        if (!dt.doneTraversal()) {
+          scanState.put(policyName, dt);
+        }
+      }
+      return returnSet;
+    }
 
     /**
      * Keep processing policies.
@@ -328,18 +445,11 @@ public class RaidNode implements RaidPro
     private void doProcess() throws IOException, InterruptedException {
       PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
 
-      long prevExec = 0;
-      DistRaid dr = null;
       while (running) {
+        Thread.sleep(SLEEP_TIME);
 
-        boolean reload = configMgr.reloadConfigsIfNecessary();
-        while(!reload && now() < prevExec + configMgr.getPeriodicity()){
-          Thread.sleep(SLEEP_TIME);
-          reload = configMgr.reloadConfigsIfNecessary();
-        }
+        configMgr.reloadConfigsIfNecessary();
 
-        prevExec = now();
-        
         // activate all categories
         Collection<PolicyList> all = configMgr.getAllPolicies();
         
@@ -348,35 +458,18 @@ public class RaidNode implements RaidPro
         PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
         Arrays.sort(sorted, lexi);
 
-        if (!isRaidLocal) {
-          dr = new DistRaid(conf);
-        }
-        // paths we have processed so far
-        List<String> processed = new LinkedList<String>();
-        
         for (PolicyList category : sorted) {
           for (PolicyInfo info: category.getAll()) {
 
-            long modTimePeriod = 0;
-            short srcReplication = 0;
-            String str = info.getProperty("modTimePeriod");
-            if (str != null) {
-               modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod")); 
-            }
-            str = info.getProperty("srcReplication");
-            if (str != null) {
-               srcReplication = Short.parseShort(info.getProperty("srcReplication")); 
+            if (!shouldSelectFiles(info)) {
+              continue;
             }
 
             LOG.info("Triggering Policy Filter " + info.getName() +
                      " " + info.getSrcPath());
             List<FileStatus> filteredPaths = null;
-            try { 
-              filteredPaths = selectFiles(conf, info.getSrcPath(), 
-                                          getDestinationPath(conf, info),
-                                          modTimePeriod,
-                                          srcReplication,
-                                          prevExec);
+            try {
+              filteredPaths = selectFiles(info);
             } catch (Exception e) {
               LOG.info("Exception while invoking filter on policy " + info.getName() +
                        " srcPath " + info.getSrcPath() + 
@@ -389,95 +482,41 @@ public class RaidNode implements RaidPro
                continue;
             }
 
-            // If any of the filtered path has already been accepted 
-            // by a previous policy, then skip it.
-            for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
-              String fs = iter.next().getPath().toString() + "/";
-              for (String p : processed) {
-                if (p.startsWith(fs)) {
-                  iter.remove();
-                  break;
-                }
-              }
-            }
-
             // Apply the action on accepted paths
-            LOG.info("Triggering Policy Action " + info.getName());
+            LOG.info("Triggering Policy Action " + info.getName() +
+                     " " + info.getSrcPath());
             try {
-            	if (isRaidLocal){
-            	  doRaid(conf, info, filteredPaths);
-            	}
-            	else{
-            	  //add paths for distributed raiding
-            	  dr.addRaidPaths(info, filteredPaths);
-            	}
+              if (isRaidLocal){
+                doRaid(conf, info, filteredPaths);
+              }
+              else{
+                // We already checked that no job for this policy is running
+                // So we can start a new job.
+                DistRaid dr = new DistRaid(conf);
+                //add paths for distributed raiding
+                dr.addRaidPaths(info, filteredPaths);
+                boolean started = dr.startDistRaid();
+                if (started) {
+                  jobMonitor.monitorJob(info.getName(), dr);
+                }
+              }
             } catch (Exception e) {
               LOG.info("Exception while invoking action on policy " + info.getName() +
                        " srcPath " + info.getSrcPath() + 
                        " exception " + StringUtils.stringifyException(e));
               continue;
             }
-
-            // add these paths to processed paths
-            for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
-              String p = iter.next().getPath().toString() + "/";
-              processed.add(p);
-            }
           }
         }
-        processed.clear(); // free up memory references before yielding
-
-        //do the distributed raiding
-        if (!isRaidLocal) {
-          dr.doDistRaid();
-        } 
       }
     }
   }
 
-  /**
-   * Returns the policy that matches the specified path.
-   * The method below finds the first policy that matches an input path. Since different 
-   * policies with different purposes and destinations might be associated with the same input
-   * path, we should be skeptical about the places using the method and we should try to change
-   * the code to avoid it.
-   */
-  private PolicyInfo findMatchingPolicy(Path inpath) throws IOException {
-    PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
-    Collection<PolicyList> all = configMgr.getAllPolicies();
-        
-    // sort all policies by reverse lexicographical order. This is needed
-    // to make the nearest policy take precedence.
-    PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
-    Arrays.sort(sorted, lexi);
-
-    // loop through all categories of policies.
-    for (PolicyList category : sorted) {
-      PolicyInfo first = category.getAll().iterator().next();
-      if (first != null) {
-        Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed
-        if (srcPaths == null) {
-          continue;
-        }
-
-        for (Path src: srcPaths) {
-          if (inpath.toString().startsWith(src.toString())) {
-            // if the srcpath is a prefix of the specified path
-            // we have a match! 
-            return first;
-          }
-        }
-      }
-    }
-    return null; // no matching policies
-  }
-
-  
   static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
     return new Path(destPathPrefix, makeRelative(srcPath));
   }
   
-  private static class ParityFilePair {
+  static class ParityFilePair {
     private Path path;
     private FileSystem fs;
     
@@ -506,11 +545,19 @@ public class RaidNode implements RaidPro
    * @return Path object representing the parity file of the source
    * @throws IOException
    */
-  static private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
+  static ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
     Path srcParent = srcPath.getParent();
 
     FileSystem fsDest = destPathPrefix.getFileSystem(conf);
-
+    FileSystem fsSrc = srcPath.getFileSystem(conf);
+    
+    FileStatus srcStatus = null;
+    try {
+      srcStatus = fsSrc.getFileStatus(srcPath);
+    } catch (java.io.FileNotFoundException e) {
+      return null;
+    }
+    
     Path outDir = destPathPrefix;
     if (srcParent != null) {
       if (srcParent.getParent() == null) {
@@ -520,36 +567,36 @@ public class RaidNode implements RaidPro
       }
     }
 
+    
+    //CASE 1: CHECK HAR - Must be checked first because har is created after
+    // parity file and returning the parity file could result in error while
+    // reading it.
+    Path outPath =  getOriginalParityFile(destPathPrefix, srcPath);
     String harDirName = srcParent.getName() + HAR_SUFFIX; 
     Path HarPath = new Path(outDir,harDirName);
-    Path outPath =  getOriginalParityFile(destPathPrefix, srcPath);
-
-    if (!fsDest.exists(HarPath)) {  // case 1: no HAR file
-      return new ParityFilePair(outPath,fsDest);
-    }
-
-    URI HarPathUri = HarPath.toUri();
-    Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
-    FileSystem fsHar = new HarFileSystem(fsDest);
-    fsHar.initialize(inHarPath.toUri(), conf);
-
-    if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR
-      return new ParityFilePair(outPath,fsDest);
-    }
-
-    if (! fsDest.exists(outPath)) { // case 3: only inside HAR
-      return new ParityFilePair(inHarPath,fsHar);
+    if (fsDest.exists(HarPath)) {  
+      URI HarPathUri = HarPath.toUri();
+      Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
+      FileSystem fsHar = new HarFileSystem(fsDest);
+      fsHar.initialize(inHarPath.toUri(), conf);
+      if (fsHar.exists(inHarPath)) {
+        FileStatus inHar = fsHar.getFileStatus(inHarPath);
+        if (inHar.getModificationTime() == srcStatus.getModificationTime()) {
+          return new ParityFilePair(inHarPath,fsHar);
+        }
+      }
     }
-
-    // both inside and outside HAR. Should return most recent
-    FileStatus inHar = fsHar.getFileStatus(inHarPath);
-    FileStatus outHar = fsDest.getFileStatus(outPath);
-
-    if (inHar.getModificationTime() >= outHar.getModificationTime()) {
-      return new ParityFilePair(inHarPath,fsHar);
+    
+    //CASE 2: CHECK PARITY
+    try {
+      FileStatus outHar = fsDest.getFileStatus(outPath);
+      if (outHar.getModificationTime() == srcStatus.getModificationTime()) {
+        return new ParityFilePair(outPath,fsDest);
+      }
+    } catch (java.io.FileNotFoundException e) {
     }
 
-    return new ParityFilePair(outPath,fsDest);
+    return null; // NULL if no parity file
   }
   
   private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
@@ -558,108 +605,6 @@ public class RaidNode implements RaidPro
 	  
   }
   
- /**
-  * Returns a list of pathnames that needs raiding.
-  */
-  List<FileStatus> selectFiles(Configuration conf, Path p, String destPrefix,
-                                       long modTimePeriod, short srcReplication, long now) throws IOException {
-
-    List<FileStatus> returnSet = new LinkedList<FileStatus>();
-
-    // expand destination prefix path
-    Path destp = new Path(destPrefix.trim());
-    FileSystem fs = FileSystem.get(destp.toUri(), conf);
-    destp = destp.makeQualified(fs);
-
-    // Expand destination prefix path.
-    String destpstr = destp.toString();
-    if (!destpstr.endsWith(Path.SEPARATOR)) {
-      destpstr += Path.SEPARATOR;
-    }
-
-    fs = p.getFileSystem(conf);
-    FileStatus[] gpaths = fs.globStatus(p);
-    if (gpaths != null) {
-      for (FileStatus onepath: gpaths) {
-        String pathstr = onepath.getPath().makeQualified(fs).toString();
-        if (!pathstr.endsWith(Path.SEPARATOR)) {
-          pathstr += Path.SEPARATOR;
-        }
-        if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
-          LOG.info("Skipping source " + pathstr +
-                   " because it conflicts with raid directory " + destpstr);
-        } else {
-         recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now);
-        }
-      }
-    }
-    return returnSet;
-  }
-
-  /**
-   * Pick files that need to be RAIDed.
-   */
-  private void recurse(FileSystem srcFs,
-                       Configuration conf,
-                       Path destPathPrefix,
-                       FileStatus src,
-                       List<FileStatus> accept,
-                       long modTimePeriod, 
-                       short srcReplication, 
-                       long now) throws IOException {
-    Path path = src.getPath();
-    FileStatus[] files = null;
-    try {
-      files = srcFs.listStatus(path);
-    } catch (java.io.FileNotFoundException e) {
-      // ignore error because the file could have been deleted by an user
-      LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e));
-    } catch (IOException e) {
-      throw e;
-    }
-
-    // If the modTime of the raid file is later than the modtime of the
-    // src file and the src file has not been modified
-    // recently, then that file is a candidate for RAID.
-
-    if (src.isFile()) {
-
-      // if the source file has fewer than or equal to 2 blocks, then no need to RAID
-      long blockSize = src.getBlockSize();
-      if (2 * blockSize >= src.getLen()) {
-        return;
-      }
-
-      // check if destination path already exists. If it does and it's modification time
-      // does not match the modTime of the source file, then recalculate RAID
-      boolean add = false;
-      try {
-        ParityFilePair ppair = getParityFile(destPathPrefix, path);
-        Path outpath =  ppair.getPath();
-        FileSystem outFs = ppair.getFileSystem();
-        FileStatus ostat = outFs.getFileStatus(outpath);
-        if (ostat.getModificationTime() != src.getModificationTime() &&
-            src.getModificationTime() + modTimePeriod < now) {
-          add = true;
-         }
-      } catch (java.io.FileNotFoundException e) {
-        add = true; // destination file does not exist
-      }
-
-      if (add) {
-        accept.add(src);
-      }
-      return;
-
-    } else if (files != null) {
-      for (FileStatus one:files) {
-        if (!one.getPath().getName().endsWith(HAR_SUFFIX)){
-          recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
-        }
-      }
-    }
-  }
-
 
   /**
    * RAID a list of files.
@@ -668,8 +613,8 @@ public class RaidNode implements RaidPro
       throws IOException {
     int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
     int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
-    int stripeLength = getStripeLength(conf, info);
-    String destPrefix = getDestinationPath(conf, info);
+    int stripeLength = getStripeLength(conf);
+    Path destPref = getDestinationPath(conf);
     String simulate = info.getProperty("simulate");
     boolean doSimulate = simulate == null ? false : Boolean
         .parseBoolean(simulate);
@@ -677,13 +622,9 @@ public class RaidNode implements RaidPro
     Statistics statistics = new Statistics();
     int count = 0;
 
-    Path p = new Path(destPrefix.trim());
-    FileSystem fs = FileSystem.get(p.toUri(), conf);
-    p = p.makeQualified(fs);
-
     for (FileStatus s : paths) {
-      doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl,
-          stripeLength);
+      doRaid(conf, s, destPref, statistics, Reporter.NULL, doSimulate, targetRepl,
+             metaRepl, stripeLength);
       if (count % 1000 == 0) {
         LOG.info("RAID statistics " + statistics.toString());
       }
@@ -701,23 +642,16 @@ public class RaidNode implements RaidPro
       FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
     int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
     int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
-    int stripeLength = getStripeLength(conf, info);
-    String destPrefix = getDestinationPath(conf, info);
+    int stripeLength = getStripeLength(conf);
+    Path destPref = getDestinationPath(conf);
     String simulate = info.getProperty("simulate");
     boolean doSimulate = simulate == null ? false : Boolean
         .parseBoolean(simulate);
 
-    int count = 0;
-
-    Path p = new Path(destPrefix.trim());
-    FileSystem fs = FileSystem.get(p.toUri(), conf);
-    p = p.makeQualified(fs);
-
-    doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl,
-        stripeLength);
+    doRaid(conf, src, destPref, statistics, reporter, doSimulate,
+           targetRepl, metaRepl, stripeLength);
   }
-  
-  
+
   /**
    * RAID an individual file
    */
@@ -784,25 +718,11 @@ public class RaidNode implements RaidPro
                                   Path destPathPrefix, BlockLocation[] locations,
                                   int metaRepl, int stripeLength) throws IOException {
 
-    // two buffers for generating parity
-    Random rand = new Random();
-    int bufSize = 5 * 1024 * 1024; // 5 MB
-    byte[] bufs = new byte[bufSize];
-    byte[] xor = new byte[bufSize];
-
     Path inpath = stat.getPath();
-    long blockSize = stat.getBlockSize();
-    long fileSize = stat.getLen();
-
-    // create output tmp path
     Path outpath =  getOriginalParityFile(destPathPrefix, inpath);
     FileSystem outFs = outpath.getFileSystem(conf);
-   
-    Path tmppath =  new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + 
-                             outpath.toUri().getPath() + "." + 
-                             rand.nextLong() + ".tmp");
 
-    // if the parity file is already upto-date, then nothing to do
+    // If the parity file is already upto-date, then nothing to do
     try {
       FileStatus stmp = outFs.getFileStatus(outpath);
       if (stmp.getModificationTime() == stat.getModificationTime()) {
@@ -812,66 +732,11 @@ public class RaidNode implements RaidPro
       }
     } catch (IOException e) {
       // ignore errors because the raid file might not exist yet.
-    } 
-
-    LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath);
-    FSDataOutputStream out = outFs.create(tmppath, 
-                                          true, 
-                                          conf.getInt("io.file.buffer.size", 64 * 1024), 
-                                          (short)metaRepl, 
-                                          blockSize);
-
-    try {
-
-      // loop once for every stripe length
-      for (int startBlock = 0; startBlock < locations.length;) {
-
-        // report progress to Map-reduce framework
-        if (reporter != null) {
-          reporter.progress();
-        }
-        int blocksLeft = locations.length - startBlock;
-        int stripe = Math.min(stripeLength, blocksLeft);
-        LOG.info(" startBlock " + startBlock + " stripe " + stripe);
-
-        // open a new file descriptor for each block in this stripe.
-        // make each fd point to the beginning of each block in this stripe.
-        FSDataInputStream[] ins = new FSDataInputStream[stripe];
-        for (int i = 0; i < stripe; i++) {
-          ins[i] = inFs.open(inpath, bufSize);
-          ins[i].seek(blockSize * (startBlock + i));
-        }
-
-        generateParity(ins,out,blockSize,bufs,xor, reporter);
-        
-        // close input file handles
-        for (int i = 0; i < ins.length; i++) {
-          ins[i].close();
-        }
-
-        // increment startBlock to point to the first block to be processed
-        // in the next iteration
-        startBlock += stripe;
-      }
-      out.close();
-      out = null;
-
-      // delete destination if exists
-      if (outFs.exists(outpath)){
-        outFs.delete(outpath, false);
-      }
-      // rename tmppath to the real parity filename
-      outFs.mkdirs(outpath.getParent());
-      if (!outFs.rename(tmppath, outpath)) {
-        String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
-        LOG.warn(msg);
-        throw new IOException (msg);
-      }
-    } finally {
-      // remove the tmp file if it still exists
-      outFs.delete(tmppath, false);  
     }
 
+    XOREncoder encoder = new XOREncoder(conf, stripeLength);
+    encoder.encodeFile(inFs, inpath, outpath, (short)metaRepl, reporter);
+
     // set the modification time of the RAID file. This is done so that the modTime of the
     // RAID file reflects that contents of the source file that it has RAIDed. This should
     // also work for files that are being appended to. This is necessary because the time on
@@ -880,255 +745,39 @@ public class RaidNode implements RaidPro
     outFs.setTimes(outpath, stat.getModificationTime(), -1);
 
     FileStatus outstat = outFs.getFileStatus(outpath);
-    LOG.info("Source file " + inpath + " of size " + fileSize +
+    FileStatus inStat = inFs.getFileStatus(inpath);
+    LOG.info("Source file " + inpath + " of size " + inStat.getLen() +
              " Parity file " + outpath + " of size " + outstat.getLen() +
              " src mtime " + stat.getModificationTime()  +
              " parity mtime " + outstat.getModificationTime());
   }
 
-  private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead) 
-      throws IOException {
-
-    int tread = 0;
-    
-    while (tread < toRead) {
-      int read = ins.read(bufs, tread, toRead - tread);
-      if (read == -1) {
-        return tread;
-      } else {
-        tread += read;
-      }
-    }
-    
-    return tread;
-  }
-  
-  private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout, 
-      long parityBlockSize, byte[] bufs, byte[] xor, Reporter reporter) throws IOException {
-    
-    int bufSize;
-    if ((bufs == null) || (bufs.length == 0)){
-      bufSize = 5 * 1024 * 1024; // 5 MB
-      bufs = new byte[bufSize];
-    } else {
-      bufSize = bufs.length;
-    }
-    if ((xor == null) || (xor.length != bufs.length)){
-      xor = new byte[bufSize];
-    }
-
-    int xorlen = 0;
-      
-    // this loop processes all good blocks in selected stripe
-    long remaining = parityBlockSize;
-    
-    while (remaining > 0) {
-      int toRead = (int)Math.min(remaining, bufSize);
-
-      if (ins.length > 0) {
-        xorlen = readInputUntilEnd(ins[0], xor, toRead);
-      }
-
-      // read all remaining blocks and xor them into the buffer
-      for (int i = 1; i < ins.length; i++) {
-
-        // report progress to Map-reduce framework
-        if (reporter != null) {
-          reporter.progress();
-        }
-        
-        int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
-        
-        int j;
-        int xorlimit = (int) Math.min(xorlen,actualRead);
-        for (j = 0; j < xorlimit; j++) {
-          xor[j] ^= bufs[j];
-        }
-        if ( actualRead > xorlen ){
-          for (; j < actualRead; j++) {
-            xor[j] = bufs[j];
-          }
-          xorlen = actualRead;
-        }
-        
-      }
-
-      if (xorlen < toRead) {
-        Arrays.fill(bufs, xorlen, toRead, (byte) 0);
-      }
-      
-      // write this to the tmp file
-      fout.write(xor, 0, toRead);
-      remaining -= toRead;
-    }
-  
-  }
-  
   /**
-   * Extract a good block from the parity block. This assumes that the corruption
-   * is in the main file and the parity file is always good.
+   * Extract a good block from the parity block. This assumes that the
+   * corruption is in the main file and the parity file is always good.
    */
-  public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix, 
-                            int stripeLength, long corruptOffset) throws IOException {
-
-    // extract block locations, size etc from source file
-    Random rand = new Random();
-    FileSystem srcFs = srcPath.getFileSystem(conf);
-    FileStatus srcStat = srcFs.getFileStatus(srcPath);
-    long blockSize = srcStat.getBlockSize();
-    long fileSize = srcStat.getLen();
-
-    // find the stripe number where the corrupted offset lies
-    long snum = corruptOffset / (stripeLength * blockSize);
-    long startOffset = snum * stripeLength * blockSize;
-    long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize;
-    long corruptBlockSize = Math.min(blockSize, fileSize - startOffset);
-
-    LOG.info("Start offset of relevent stripe = " + startOffset +
-             " corruptBlockInStripe " + corruptBlockInStripe);
-
-    // open file descriptors to read all good blocks of the file
-    FSDataInputStream[] instmp = new FSDataInputStream[stripeLength];
-    int  numLength = 0;
-    for (int i = 0; i < stripeLength; i++) {
-      if (i == corruptBlockInStripe) {
-        continue;  // do not open corrupt block
-      }
-      if (startOffset + i * blockSize >= fileSize) {
-        LOG.info("Stop offset of relevent stripe = " + 
-                  startOffset + i * blockSize);
-        break;
-      }
-      instmp[numLength] = srcFs.open(srcPath);
-      instmp[numLength].seek(startOffset + i * blockSize);
-      numLength++;
-    }
-
-    // create array of inputstream, allocate one extra slot for 
-    // parity file. numLength could be smaller than stripeLength
-    // if we are processing the last partial stripe on a file.
-    numLength += 1;
-    FSDataInputStream[] ins = new FSDataInputStream[numLength];
-    for (int i = 0; i < numLength-1; i++) {
-      ins[i] = instmp[i];
-    }
-    LOG.info("Decompose a total of " + numLength + " blocks.");
-
-    // open and seek to the appropriate offset in parity file.
-    ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf); 
-    Path parityFile = ppair.getPath();
-    FileSystem parityFs = ppair.getFileSystem();
-    LOG.info("Parity file for " + srcPath + " is " + parityFile);
-    ins[numLength-1] = parityFs.open(parityFile);
-    ins[numLength-1].seek(snum * blockSize);
-    LOG.info("Parity file " + parityFile +
-             " seeking to relevent block at offset " + 
-             ins[numLength-1].getPos());
-
-    // create a temporary filename in the source filesystem
-    // do not overwrite an existing tmp file. Make it fail for now.
-    // We need to generate a unique name for this tmp file later on.
-    Path tmpFile = null;
-    FSDataOutputStream fout = null;
-    FileSystem destFs = destPathPrefix.getFileSystem(conf);
-    int retry = 5;
-    try {
-      tmpFile = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + "/" + 
-          rand.nextInt());
-      fout = destFs.create(tmpFile, false);
-    } catch (IOException e) {
-      if (retry-- <= 0) {
-        LOG.info("Unable to create temporary file " + tmpFile +
-                 " Aborting....");
-        throw e; 
-      }
-      LOG.info("Unable to create temporary file " + tmpFile +
-               "Retrying....");
-    }
-    LOG.info("Created recovered block file " + tmpFile);
-
-    // buffers for generating parity bits
-    int bufSize = 5 * 1024 * 1024; // 5 MB
-    byte[] bufs = new byte[bufSize];
-    byte[] xor = new byte[bufSize];
-   
-    generateParity(ins,fout,corruptBlockSize,bufs,xor,null);
-    
-    // close all files
-    fout.close();
-    for (int i = 0; i < ins.length; i++) {
-      ins[i].close();
+  public static Path unRaid(Configuration conf, Path srcPath,
+      Path destPathPrefix, Decoder decoder, int stripeLength,
+      long corruptOffset) throws IOException {
+
+    // Test if parity file exists
+    ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
+    if (ppair == null) {
+      return null;
     }
 
-    // Now, reopen the source file and the recovered block file
-    // and copy all relevant data to new file
     final Path recoveryDestination = 
       new Path(conf.get("fs.raid.tmpdir", "/tmp/raid"));
+    FileSystem destFs = recoveryDestination.getFileSystem(conf);
     final Path recoveredPrefix = 
       destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
     final Path recoveredPath = 
-      new Path(recoveredPrefix + "." + rand.nextLong() + ".recovered");
+      new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
     LOG.info("Creating recovered file " + recoveredPath);
 
-    FSDataInputStream sin = srcFs.open(srcPath);
-    FSDataOutputStream out = destFs.create(recoveredPath, false, 
-                                             conf.getInt("io.file.buffer.size", 64 * 1024),
-                                             srcStat.getReplication(), 
-                                             srcStat.getBlockSize());
-
-    FSDataInputStream bin = destFs.open(tmpFile);
-    long recoveredSize = 0;
-
-    // copy all the good blocks (upto the corruption)
-    // from source file to output file
-    long remaining = corruptOffset / blockSize * blockSize;
-    while (remaining > 0) {
-      int toRead = (int)Math.min(remaining, bufSize);
-      sin.readFully(bufs, 0, toRead);
-      out.write(bufs, 0, toRead);
-      remaining -= toRead;
-      recoveredSize += toRead;
-    }
-    LOG.info("Copied upto " + recoveredSize + " from src file. ");
-
-    // copy recovered block to output file
-    remaining = corruptBlockSize;
-    while (recoveredSize < fileSize &&
-           remaining > 0) {
-      int toRead = (int)Math.min(remaining, bufSize);
-      bin.readFully(bufs, 0, toRead);
-      out.write(bufs, 0, toRead);
-      remaining -= toRead;
-      recoveredSize += toRead;
-    }
-    LOG.info("Copied upto " + recoveredSize + " from recovered-block file. ");
-
-    // skip bad block in src file
-    if (recoveredSize < fileSize) {
-      sin.seek(sin.getPos() + corruptBlockSize); 
-    }
-
-    // copy remaining good data from src file to output file
-    while (recoveredSize < fileSize) {
-      int toRead = (int)Math.min(fileSize - recoveredSize, bufSize);
-      sin.readFully(bufs, 0, toRead);
-      out.write(bufs, 0, toRead);
-      recoveredSize += toRead;
-    }
-    out.close();
-    LOG.info("Completed writing " + recoveredSize + " bytes into " +
-             recoveredPath);
-              
-    sin.close();
-    bin.close();
-
-    // delete the temporary block file that was created.
-    destFs.delete(tmpFile, false);
-    LOG.info("Deleted temporary file " + tmpFile);
-
-    // copy the meta information from source path to the newly created
-    // recovered path
-    copyMetaInformation(destFs, srcStat, recoveredPath);
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    decoder.decodeFile(srcFs, srcPath, ppair.getFileSystem(),
+        ppair.getPath(), corruptOffset, recoveredPath);
 
     return recoveredPath;
   }
@@ -1179,35 +828,22 @@ public class RaidNode implements RaidPro
         PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
         Arrays.sort(sorted, lexi);
 
-        // paths we have processed so far
-        Set<Path> processed = new HashSet<Path>();
-        
         for (PolicyList category : sorted) {
           for (PolicyInfo info: category.getAll()) {
 
             try {
               // expand destination prefix path
-              String destinationPrefix = getDestinationPath(conf, info);
-              Path destPref = new Path(destinationPrefix.trim());
-              FileSystem destFs = FileSystem.get(destPref.toUri(), conf);
-              destPref = destFs.makeQualified(destPref);
+              Path destPref = getDestinationPath(conf);
+              FileSystem destFs = destPref.getFileSystem(conf);
 
               //get srcPaths
               Path[] srcPaths = info.getSrcPathExpanded();
               
-              if ( srcPaths != null ){
+              if (srcPaths != null) {
                 for (Path srcPath: srcPaths) {
                   // expand destination prefix
                   Path destPath = getOriginalParityFile(destPref, srcPath);
 
-                  // if this destination path has already been processed as part
-                  // of another policy, then nothing more to do
-                  if (processed.contains(destPath)) {
-                    LOG.info("Obsolete parity files for policy " + 
-                            info.getName() + " has already been procesed.");
-                    continue;
-                  }
-
                   FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
                   FileStatus stat = null;
                   try {
@@ -1221,12 +857,8 @@ public class RaidNode implements RaidPro
                     recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat);
                   }
 
-                  // this destination path has already been processed
-                  processed.add(destPath);
-
                 }
               }
-
             } catch (Exception e) {
               LOG.warn("Ignoring Exception while processing policy " + 
                        info.getName() + " " + 
@@ -1342,10 +974,8 @@ public class RaidNode implements RaidPro
             try {
               long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
 
-              String destinationPrefix = getDestinationPath(conf, info);
-              Path destPref = new Path(destinationPrefix.trim());
+              Path destPref = getDestinationPath(conf);
               FileSystem destFs = destPref.getFileSystem(conf); 
-              destPref = destFs.makeQualified(destPref);
 
               //get srcPaths
               Path[] srcPaths = info.getSrcPathExpanded();
@@ -1407,7 +1037,11 @@ public class RaidNode implements RaidPro
           recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath);
           shouldHar = false;
         } else if (one.getModificationTime() > cutoff ) {
-          shouldHar = false;
+          if (shouldHar) {
+            LOG.info("Cannot archive " + destPath + 
+                   " because " + one.getPath() + " was modified after cutoff");
+            shouldHar = false;
+          }
         }
       }
 
@@ -1433,6 +1067,7 @@ public class RaidNode implements RaidPro
     }
 
     if ( shouldHar ) {
+      LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath );
       singleHar(destFs, dest, tmpHarPath);
     }
   } 
@@ -1493,56 +1128,25 @@ public class RaidNode implements RaidPro
       LOG.info("Leaving Har thread.");
     }
     
-
-  }  
-  
-  /**
-   * If the config file has an entry for hdfs.raid.locations, then that overrides
-   * destination path specified in the raid policy file
-   */
-  static private String getDestinationPath(Configuration conf, PolicyInfo info) {
-    String locs = conf.get("hdfs.raid.locations"); 
-    if (locs != null) {
-      return locs;
-    }
-    locs = info.getDestinationPath();
-    if (locs == null) {
-      return DEFAULT_RAID_LOCATION;
-    }
-    return locs;
   }
 
   /**
-   * If the config file has an entry for hdfs.raid.stripeLength, then use that
-   * specified in the raid policy file
+   * Return the path prefix that stores the parity files
    */
-  static private int getStripeLength(Configuration conf, PolicyInfo info)
-    throws IOException {
-    int len = conf.getInt("hdfs.raid.stripeLength", 0); 
-    if (len != 0) {
-      return len;
-    }
-    String str = info.getProperty("stripeLength");
-    if (str == null) {
-      String msg = "hdfs.raid.stripeLength is not defined." +
-                   " Using a default " + DEFAULT_STRIPE_LENGTH;
-      LOG.info(msg);
-      return DEFAULT_STRIPE_LENGTH;
-    }
-    return Integer.parseInt(str);
+  static Path getDestinationPath(Configuration conf)
+      throws IOException {
+    String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
+    Path p = new Path(loc.trim());
+    FileSystem fs = FileSystem.get(p.toUri(), conf);
+    p = p.makeQualified(fs);
+    return p;
   }
 
   /**
-   * Copy the file owner, modtime, etc from srcPath to the recovered Path.
-   * It is possiible that we might have to retrieve file persmissions,
-   * quotas, etc too in future.
+   * Obtain stripe length from configuration
    */
-  static private void copyMetaInformation(FileSystem fs, FileStatus stat, 
-                                          Path recoveredPath) 
-    throws IOException {
-    fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup());
-    fs.setPermission(recoveredPath, stat.getPermission());
-    fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime());
+  public static int getStripeLength(Configuration conf) {
+    return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.IOUtils;
+
+public class RaidUtils {
+  public static void readTillEnd(InputStream in, byte[] buf, boolean eofOK)
+    throws IOException {
+    int toRead = buf.length;
+    int numRead = 0;
+    while (numRead < toRead) {
+      int nread = in.read(buf, numRead, toRead - numRead);
+      if (nread < 0) {
+        if (eofOK) {
+          // EOF hit, fill with zeros
+          Arrays.fill(buf, numRead, toRead, (byte)0);
+          numRead = toRead;
+        } else {
+          // EOF hit, throw.
+          throw new IOException("Premature EOF");
+        }
+      } else {
+        numRead += nread;
+      }
+    }
+  }
+
+  public static void copyBytes(
+    InputStream in, OutputStream out, byte[] buf, long count)
+    throws IOException {
+    for (long bytesRead = 0; bytesRead < count; ) {
+      int toRead = Math.min(buf.length, (int)(count - bytesRead));
+      IOUtils.readFully(in, buf, 0, toRead);
+      bytesRead += toRead;
+      out.write(buf, 0, toRead);
+    }
+  }
+
+  public static class ZeroInputStream extends InputStream
+	    implements Seekable, PositionedReadable {
+    private long endOffset;
+    private long pos;
+
+    public ZeroInputStream(long endOffset) {
+      this.endOffset = endOffset;
+      this.pos = 0;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (pos < endOffset) {
+        pos++;
+        return 0;
+      }
+      return -1;
+    }
+
+    @Override
+    public int available() throws IOException {
+      return (int)(endOffset - pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public void seek(long seekOffset) throws IOException {
+      if (seekOffset < endOffset) {
+        pos = seekOffset;
+      } else {
+        throw new IOException("Illegal Offset" + pos);
+      }
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      int count = 0;
+      for (; position < endOffset && count < length; position++) {
+        buffer[offset + count] = 0;
+        count++;
+      }
+      return count;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      int count = 0;
+      for (; position < endOffset && count < length; position++) {
+        buffer[offset + count] = 0;
+        count++;
+      }
+      if (count < length) {
+        throw new IOException("Premature EOF");
+      }
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+      readFully(position, buffer, 0, buffer.length);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+public class XORDecoder extends Decoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.XORDecoder");
+
+  public XORDecoder(
+    Configuration conf, int stripeSize) {
+    super(conf, stripeSize, 1);
+  }
+
+  @Override
+  protected void fixErasedBlock(
+      FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+      long blockSize, long errorOffset, long bytesToSkip, long limit,
+      OutputStream out) throws IOException {
+    LOG.info("Fixing block at " + srcFile + ":" + errorOffset +
+             ", skipping " + bytesToSkip + ", limit " + limit);
+    FileStatus srcStat = fs.getFileStatus(srcFile);
+    ArrayList<FSDataInputStream> xorinputs = new ArrayList<FSDataInputStream>();
+
+    FSDataInputStream parityFileIn = parityFs.open(parityFile);
+    parityFileIn.seek(parityOffset(errorOffset, blockSize));
+    xorinputs.add(parityFileIn);
+
+    long errorBlockOffset = (errorOffset / blockSize) * blockSize;
+    long[] srcOffsets = stripeOffsets(errorOffset, blockSize);
+    for (int i = 0; i < srcOffsets.length; i++) {
+      if (srcOffsets[i] == errorBlockOffset) {
+        LOG.info("Skipping block at " + srcFile + ":" + errorBlockOffset);
+        continue;
+      }
+      if (srcOffsets[i] < srcStat.getLen()) {
+        FSDataInputStream in = fs.open(srcFile);
+        in.seek(srcOffsets[i]);
+        xorinputs.add(in);
+      }
+    }
+    FSDataInputStream[] inputs = xorinputs.toArray(
+                                    new FSDataInputStream[]{null});
+    ParityInputStream recovered =
+      new ParityInputStream(inputs, limit, readBufs[0], writeBufs[0]);
+    recovered.skip(bytesToSkip);
+    recovered.drain(out, null);
+  }
+
+  protected long[] stripeOffsets(long errorOffset, long blockSize) {
+    long[] offsets = new long[stripeSize];
+    long stripeIdx = errorOffset / (blockSize * stripeSize);
+    long startOffsetOfStripe = stripeIdx * stripeSize * blockSize;
+    for (int i = 0; i < stripeSize; i++) {
+      offsets[i] = startOffsetOfStripe + i * blockSize;
+    }
+    return offsets;
+  }
+
+  protected long parityOffset(long errorOffset, long blockSize) {
+    long stripeIdx = errorOffset / (blockSize * stripeSize);
+    return stripeIdx * blockSize;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+public class XOREncoder extends Encoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.XOREncoder");
+  public XOREncoder(
+    Configuration conf, int stripeSize) {
+    super(conf, stripeSize, 1);
+  }
+
+  @Override
+  protected void encodeStripe(
+    InputStream[] blocks,
+    long stripeStartOffset,
+    long blockSize,
+    OutputStream[] outs,
+    Progressable reporter) throws IOException {
+    LOG.info("Peforming XOR ");
+    ParityInputStream parityIn =
+      new ParityInputStream(blocks, blockSize, readBufs[0], writeBufs[0]);
+    try {
+      parityIn.drain(outs[0], reporter);
+    } finally {
+      parityIn.close();
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Tue Oct 12 18:23:36 2010
@@ -47,11 +47,14 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
 import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
 
 public class TestRaidDfs extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -59,8 +62,7 @@ public class TestRaidDfs extends TestCas
   final static String CONFIG_FILE = new File(TEST_DIR, 
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
-  final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
-  final Random rand = new Random();
+  final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
   final static int NUM_DATANODES = 3;
 
   Configuration conf;
@@ -83,6 +85,9 @@ public class TestRaidDfs extends TestCas
     // scan all policies once every 5 second
     conf.setLong("raid.policy.rescan.interval", 5000);
 
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
     // do not use map-reduce cluster for Raiding
     conf.setBoolean("fs.raidnode.local", true);
     conf.set("raid.server.address", "localhost:0");
@@ -133,80 +138,148 @@ public class TestRaidDfs extends TestCas
     if (cnode != null) { cnode.stop(); cnode.join(); }
     if (dfs != null) { dfs.shutdown(); }
   }
+  
+  private LocatedBlocks getBlockLocations(Path file, long length)
+    throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+    return dfs.getClient().namenode.getBlockLocations(file.toString(), 0, length);
+  }
 
-  /**
-   * Test DFS Raid
-   */
-  public void testRaidDfs() throws Exception {
-    LOG.info("Test testRaidDfs started.");
-    long blockSize = 8192L;
-    int stripeLength = 3;
-    mySetup();
-    Path file1 = new Path("/user/dhruba/raidtest/file1");
-    Path destPath = new Path("/destraid/user/dhruba/raidtest");
-    long crc1 = createOldFile(fileSys, file1, 1, 7, blockSize);
-    LOG.info("Test testPathFilter created test files");
+  private LocatedBlocks getBlockLocations(Path file)
+    throws IOException {
+    FileStatus stat = fileSys.getFileStatus(file);
+    return getBlockLocations(file, stat.getLen());
+  }
 
-    // create an instance of the RaidNode
-    cnode = RaidNode.createRaidNode(null, conf);
-    
-    try {
-      FileStatus[] listPaths = null;
+  private DistributedRaidFileSystem getRaidFS() throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+    Configuration clientConf = new Configuration(conf);
+    clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+    clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+    clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+    URI dfsUri = dfs.getUri();
+    return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
+  }
 
-      // wait till file is raided
-      while (listPaths == null || listPaths.length != 1) {
-        LOG.info("Test testPathFilter waiting for files to be raided.");
-        try {
-          listPaths = fileSys.listStatus(destPath);
-        } catch (FileNotFoundException e) {
-          //ignore
+  public static void waitForFileRaided(
+    Log logger, FileSystem fileSys, Path file, Path destPath)
+  throws IOException, InterruptedException {
+    FileStatus parityStat = null;
+    String fileName = file.getName().toString();
+    // wait till file is raided
+    while (parityStat == null) {
+      logger.info("Waiting for files to be raided.");
+      try {
+        FileStatus[] listPaths = fileSys.listStatus(destPath);
+        if (listPaths != null) {
+          for (FileStatus f : listPaths) {
+            logger.info("File raided so far : " + f.getPath());
+            String found = f.getPath().getName().toString();
+            if (fileName.equals(found)) {
+              parityStat = f;
+              break;
+            }
+          }
         }
-        Thread.sleep(1000);                  // keep waiting
+      } catch (FileNotFoundException e) {
+        //ignore
       }
-      assertEquals(listPaths.length, 1); // all files raided
-      LOG.info("Files raided so far : " + listPaths[0].getPath());
+      Thread.sleep(1000);                  // keep waiting
+    }
 
-      // extract block locations from File system. Wait till file is closed.
+    while (true) {
       LocatedBlocks locations = null;
       DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
-      while (true) {
-        locations = dfs.getClient().getNamenode().getBlockLocations(file1.toString(),
-                                                               0, listPaths[0].getLen());
-        if (!locations.isUnderConstruction()) {
-          break;
-        }
-        Thread.sleep(1000);
+      locations = dfs.getClient().namenode.getBlockLocations(
+                    file.toString(), 0, parityStat.getLen());
+      if (!locations.isUnderConstruction()) {
+        break;
       }
+      Thread.sleep(1000);
+    }
 
-      // filter all filesystem calls from client
-      Configuration clientConf = new Configuration(conf);
-      clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
-      clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-      URI dfsUri = dfs.getUri();
-      FileSystem.closeAll();
-      FileSystem raidfs = FileSystem.get(dfsUri, clientConf);
-      
-      assertTrue("raidfs not an instance of DistributedRaidFileSystem",raidfs instanceof DistributedRaidFileSystem);
-      
-      LOG.info("Corrupt first block of file");
-      corruptBlock(file1, locations.get(0).getBlock(), NUM_DATANODES, false);
-      validateFile(raidfs, file1, file1, crc1);
+    while (true) {
+      FileStatus stat = fileSys.getFileStatus(file);
+      if (stat.getReplication() == 1) break;
+      Thread.sleep(1000);
+    }
+  }
+
+  private void corruptBlockAndValidate(Path srcFile, Path destPath,
+    int[] listBlockNumToCorrupt, long blockSize, int numBlocks)
+  throws IOException, InterruptedException {
+    int repl = 1;
+    long crc = createTestFilePartialLastBlock(fileSys, srcFile, repl,
+                  numBlocks, blockSize);
+    long length = fileSys.getFileStatus(srcFile).getLen();
+
+    waitForFileRaided(LOG, fileSys, srcFile, destPath);
+
+    // Delete first block of file
+    for (int blockNumToCorrupt : listBlockNumToCorrupt) {
+      LOG.info("Corrupt block " + blockNumToCorrupt + " of file " + srcFile);
+      LocatedBlocks locations = getBlockLocations(srcFile);
+      corruptBlock(srcFile, locations.get(blockNumToCorrupt).getBlock(),
+            NUM_DATANODES, true);
+    }
+
+    // Validate
+    DistributedRaidFileSystem raidfs = getRaidFS();
+    assertTrue(validateFile(raidfs, srcFile, length, crc));
+  }
+
+  /**
+   * Create a file, corrupt a block in it and ensure that the file can be
+   * read through DistributedRaidFileSystem.
+   */
+  public void testRaidDfs() throws Exception {
+    LOG.info("Test testRaidDfs started.");
+
+    long blockSize = 8192L;
+    int numBlocks = 8;
+    int repl = 1;
+    mySetup();
+
+    // Create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    cnode = RaidNode.createRaidNode(null, localConf);
+
+    Path file = new Path("/user/dhruba/raidtest/file");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
+    try {
+      long crc = createTestFilePartialLastBlock(fileSys, file, repl,
+                  numBlocks, blockSize);
+      long length = fileSys.getFileStatus(file).getLen();
+      waitForFileRaided(LOG, fileSys, file, destPath);
+      LocatedBlocks locations = getBlockLocations(file);
+
+      for (int i = 0; i < corrupt.length; i++) {
+        int blockNumToCorrupt = corrupt[i][0];
+        LOG.info("Corrupt block " + blockNumToCorrupt + " of file");
+        corruptBlock(file, locations.get(blockNumToCorrupt).getBlock(),
+          NUM_DATANODES, false);
+        validateFile(getRaidFS(), file, length, crc);
+      }
 
       // Corrupt one more block. This is expected to fail.
-      LOG.info("Corrupt second block of file");
-      corruptBlock(file1, locations.get(1).getBlock(), NUM_DATANODES, false);
+      LOG.info("Corrupt one more block of file");
+      corruptBlock(file, locations.get(1).getBlock(), NUM_DATANODES, false);
       try {
-        validateFile(raidfs, file1, file1, crc1);
+        validateFile(getRaidFS(), file, length, crc);
         fail("Expected exception ChecksumException not thrown!");
       } catch (org.apache.hadoop.fs.ChecksumException e) {
       }
     } catch (Exception e) {
-      LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
+      LOG.info("testRaidDfs Exception " + e +
+                StringUtils.stringifyException(e));
       throw e;
     } finally {
+      if (cnode != null) { cnode.stop(); cnode.join(); }
       myTearDown();
     }
-    LOG.info("Test testPathFilter completed.");
+    LOG.info("Test testRaidDfs completed.");
   }
 
   /**
@@ -217,7 +290,7 @@ public class TestRaidDfs extends TestCas
 
     try {
       Path file = new Path("/user/raid/raidtest/file1");
-      createOldFile(fileSys, file, 1, 7, 8192L);
+      createTestFile(fileSys, file, 1, 7, 8192L);
 
       // filter all filesystem calls from client
       Configuration clientConf = new Configuration(conf);
@@ -242,13 +315,15 @@ public class TestRaidDfs extends TestCas
       myTearDown();
     }
   }
-  
+
   //
   // creates a file and populate it with random data. Returns its crc.
   //
-  private long createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+  public static long createTestFile(FileSystem fileSys, Path name, int repl,
+                        int numBlocks, long blocksize)
     throws IOException {
     CRC32 crc = new CRC32();
+    Random rand = new Random();
     FSDataOutputStream stm = fileSys.create(name, true,
                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
                                             (short)repl, blocksize);
@@ -264,19 +339,43 @@ public class TestRaidDfs extends TestCas
   }
 
   //
-  // validates that file matches the crc.
+  // Creates a file with partially full last block. Populate it with random
+  // data. Returns its crc.
   //
-  private void validateFile(FileSystem fileSys, Path name1, Path name2, long crc) 
+  public static long createTestFilePartialLastBlock(
+      FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
     throws IOException {
+    CRC32 crc = new CRC32();
+    Random rand = new Random();
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, blocksize);
+    // Write whole blocks.
+    byte[] b = new byte[(int)blocksize];
+    for (int i = 1; i < numBlocks; i++) {
+      rand.nextBytes(b);
+      stm.write(b);
+      crc.update(b);
+    }
+    // Write partial block.
+    b = new byte[(int)blocksize/2 - 1];
+    rand.nextBytes(b);
+    stm.write(b);
+    crc.update(b);
 
-    FileStatus stat1 = fileSys.getFileStatus(name1);
-    FileStatus stat2 = fileSys.getFileStatus(name2);
-    assertTrue(" Length of file " + name1 + " is " + stat1.getLen() + 
-               " is different from length of file " + name1 + " " + stat2.getLen(),
-               stat1.getLen() == stat2.getLen());
+    stm.close();
+    return crc.getValue();
+  }
+  //
+  // validates that file matches the crc.
+  //
+  public static boolean validateFile(FileSystem fileSys, Path name, long length,
+                                  long crc) 
+    throws IOException {
 
+    long numRead = 0;
     CRC32 newcrc = new CRC32();
-    FSDataInputStream stm = fileSys.open(name2);
+    FSDataInputStream stm = fileSys.open(name);
     final byte[] b = new byte[4192];
     int num = 0;
     while (num >= 0) {
@@ -284,19 +383,28 @@ public class TestRaidDfs extends TestCas
       if (num < 0) {
         break;
       }
+      numRead += num;
       newcrc.update(b, 0, num);
     }
     stm.close();
+
+    if (numRead != length) {
+      LOG.info("Number of bytes read " + numRead +
+               " does not match file size " + length);
+      return false;
+    }
+
     LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
     if (newcrc.getValue() != crc) {
-      fail("CRC mismatch of files " + name1 + " with file " + name2);
+      LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc);
     }
+    return true;
   }
 
   /*
    * The Data directories for a datanode
    */
-  static private File[] getDataNodeDirs(int i) throws IOException {
+  private static File[] getDataNodeDirs(int i) throws IOException {
     File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
     File data_dir = new File(base_dir, "data");
     File dir1 = new File(data_dir, "data"+(2*i+1));
@@ -353,10 +461,32 @@ public class TestRaidDfs extends TestCas
               (numCorrupted + numDeleted) > 0);
   }
 
-  //
-  // Corrupt specified block of file
-  //
-  void corruptBlock(Path file, Block blockNum) throws IOException {
-    corruptBlock(file, blockNum, NUM_DATANODES, true);
+  public static void corruptBlock(Path file, Block blockNum,
+                    int numDataNodes, long offset) throws IOException {
+    long id = blockNum.getBlockId();
+
+    // Now deliberately remove/truncate data blocks from the block.
+    //
+    for (int i = 0; i < numDataNodes; i++) {
+      File[] dirs = getDataNodeDirs(i);
+      
+      for (int j = 0; j < dirs.length; j++) {
+        File[] blocks = dirs[j].listFiles();
+        assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
+        for (int idx = 0; idx < blocks.length; idx++) {
+          if (blocks[idx].getName().startsWith("blk_" + id) &&
+              !blocks[idx].getName().endsWith(".meta")) {
+            // Corrupt
+            File f = blocks[idx];
+            RandomAccessFile raf = new RandomAccessFile(f, "rw");
+            raf.seek(offset);
+            int data = raf.readInt();
+            raf.seek(offset);
+            raf.writeInt(data+1);
+            LOG.info("Corrupted block " + blocks[idx]);
+          }
+        }
+      }
+    }
   }
 }

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class TestDirectoryTraversal extends TestCase {
+  final static Log LOG = LogFactory.getLog(
+                            "org.apache.hadoop.raid.TestDirectoryTraversal");
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+
+  MiniDFSCluster dfs = null;
+  FileSystem fs = null;
+  Configuration conf = null;
+
+  /**
+   * Test basic enumeration.
+   */
+  public void testEnumeration() throws IOException {
+    mySetup();
+
+    try {
+      Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+      createTestTree(topDir);
+
+      LOG.info("Enumerating files");
+      List<FileStatus> startPaths = new LinkedList<FileStatus>();
+      startPaths.add(fs.getFileStatus(topDir));
+      DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+      List<FileStatus> selected = new LinkedList<FileStatus>();
+      while (true) {
+        FileStatus f = dt.getNextFile();
+        if (f == null) break;
+        assertEquals(false, f.isDir());
+        LOG.info(f.getPath());
+        selected.add(f);
+      }
+      assertEquals(5, selected.size());
+
+      LOG.info("Enumerating directories");
+      startPaths.clear();
+      startPaths.add(fs.getFileStatus(topDir));
+      dt = new DirectoryTraversal(fs, startPaths);
+      selected.clear();
+      while (true) {
+        FileStatus dir = dt.getNextDirectory();
+        if (dir == null) break;
+        assertEquals(true, dir.isDir());
+        LOG.info(dir.getPath());
+        selected.add(dir);
+      }
+      assertEquals(4, selected.size());
+    } finally {
+      myTearDown();
+    }
+  }
+
+  public void testSuspension() throws IOException {
+    mySetup();
+
+    try {
+      Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+      createTestTree(topDir);
+
+      String top = topDir.toString();
+      List<FileStatus> startPaths = new LinkedList<FileStatus>();
+      startPaths.add(fs.getFileStatus(new Path(top + "/a")));
+      startPaths.add(fs.getFileStatus(new Path(top + "/b")));
+      DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+      int limit = 2;
+      short targetRepl = 1;
+      Path raid = new Path("/raid");
+      List<FileStatus> selected = dt.selectFilesToRaid(conf, targetRepl, raid,
+                                                        0, limit);
+      for (FileStatus f: selected) {
+        LOG.info(f.getPath());
+      }
+      assertEquals(limit, selected.size());
+
+      selected = dt.selectFilesToRaid(conf, targetRepl, raid, 0, limit);
+      for (FileStatus f: selected) {
+        LOG.info(f.getPath());
+      }
+      assertEquals(limit, selected.size());
+    } finally {
+      myTearDown();
+    }
+  }
+
+  /**
+   * Creates a test directory tree.
+   *            top
+   *           / | \
+   *          /  |  f5
+   *         a   b___
+   *        / \  |\  \
+   *       f1 f2 f3f4 c
+   */
+  private void createTestTree(Path topDir) throws IOException {
+    String top = topDir.toString();
+    fs.delete(topDir, true);
+
+    fs.mkdirs(topDir);
+    fs.create(new Path(top + "/f5")).close();
+
+    fs.mkdirs(new Path(top + "/a"));
+    createTestFile(new Path(top + "/a/f1"));
+    createTestFile(new Path(top + "/a/f2"));
+
+    fs.mkdirs(new Path(top + "/b"));
+    fs.mkdirs(new Path(top + "/b/c"));
+    createTestFile(new Path(top + "/b/f3"));
+    createTestFile(new Path(top + "/b/f4"));
+  }
+
+  private void createTestFile(Path file) throws IOException {
+    long blockSize = 8192;
+    byte[] bytes = new byte[(int)blockSize];
+    FSDataOutputStream stm = fs.create(file, false, 4096, (short)1, blockSize);
+    stm.write(bytes);
+    stm.write(bytes);
+    stm.write(bytes);
+    stm.close();
+    FileStatus stat = fs.getFileStatus(file);
+    assertEquals(blockSize, stat.getBlockSize());
+  }
+
+  private void mySetup() throws IOException {
+    conf = new Configuration();
+    dfs = new MiniDFSCluster(conf, 6, true, null);
+    dfs.waitActive();
+    fs = dfs.getFileSystem();
+  }
+
+  private void myTearDown() {
+    if (dfs != null) { dfs.shutdown(); }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Tue Oct 12 18:23:36 2010
@@ -82,6 +82,7 @@ public class TestRaidHar extends TestCas
     conf.setBoolean("fs.raidnode.local", local);
 
     conf.set("raid.server.address", "localhost:0");
+    conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
 
     // create a dfs and map-reduce cluster
     final int taskTrackers = 4;
@@ -101,12 +102,12 @@ public class TestRaidHar extends TestCas
   /**
    * create raid.xml file for RaidNode
    */
-  private void mySetup(String srcPath, long targetReplication,
-                long metaReplication, long stripeLength ) throws Exception {
+  private void mySetup(long targetReplication,
+                long metaReplication, long stripeLength) throws Exception {
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     String str = "<configuration> " +
-                   "<srcPath prefix=\"" + srcPath + "\"> " +
+                   "<srcPath prefix=\"/user/test/raidtest\"> " +
                      "<policy name = \"RaidTest1\"> " +
                         "<destPath> /destraid</destPath> " +
                         "<property> " +
@@ -162,7 +163,6 @@ public class TestRaidHar extends TestCas
   public void testRaidHar() throws Exception {
     LOG.info("Test testRaidHar  started.");
 
-    String srcPaths    []  = { "/user/test/raidtest", "/user/test/raid*" };
     long blockSizes    []  = {1024L};
     long stripeLengths []  = {5};
     long targetReplication = 1;
@@ -172,13 +172,11 @@ public class TestRaidHar extends TestCas
 
     createClusters(true);
     try {
-      for (String srcPath : srcPaths) {
-        for (long blockSize : blockSizes) {
-          for (long stripeLength : stripeLengths) {
-            doTestHar(iter, srcPath, targetReplication, metaReplication,
-                         stripeLength, blockSize, numBlock);
-            iter++;
-          }
+      for (long blockSize : blockSizes) {
+        for (long stripeLength : stripeLengths) {
+           doTestHar(iter, targetReplication, metaReplication,
+                       stripeLength, blockSize, numBlock);
+           iter++;
         }
       }
     } finally {
@@ -191,14 +189,14 @@ public class TestRaidHar extends TestCas
    * Create parity file, delete original file and then validate that
    * parity file is automatically deleted.
    */
-  private void doTestHar(int iter, String srcPath, long targetReplication,
+  private void doTestHar(int iter, long targetReplication,
                           long metaReplication, long stripeLength,
                           long blockSize, int numBlock) throws Exception {
     LOG.info("doTestHar started---------------------------:" +  " iter " + iter +
              " blockSize=" + blockSize + " stripeLength=" + stripeLength);
-    mySetup(srcPath, targetReplication, metaReplication, stripeLength);
-    RaidShell shell = null;
+    mySetup(targetReplication, metaReplication, stripeLength);
     Path dir = new Path("/user/test/raidtest/subdir/");
+    Path file1 = new Path(dir + "/file" + iter);
     RaidNode cnode = null;
     try {
       Path destPath = new Path("/destraid/user/test/raidtest/subdir");
@@ -211,21 +209,9 @@ public class TestRaidHar extends TestCas
       LOG.info("doTestHar created test files for iteration " + iter);
 
       // create an instance of the RaidNode
-      cnode = RaidNode.createRaidNode(null, conf);
-      int times = 10;
-
-      while (times-- > 0) {
-        try {
-          shell = new RaidShell(conf, cnode.getListenerAddress());
-        } catch (Exception e) {
-          LOG.info("doTestHar unable to connect to " + 
-              cnode.getListenerAddress() + " retrying....");
-          Thread.sleep(1000);
-          continue;
-        }
-        break;
-      }
-      LOG.info("doTestHar created RaidShell.");
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      cnode = RaidNode.createRaidNode(null, localConf);
       FileStatus[] listPaths = null;
 
       int maxFilesFound = 0;
@@ -234,6 +220,7 @@ public class TestRaidHar extends TestCas
         try {
           listPaths = fileSys.listStatus(destPath);
           int count = 0;
+          Path harPath = null;
           int filesFound = 0;
           if (listPaths != null) {
             for (FileStatus s : listPaths) {
@@ -250,6 +237,7 @@ public class TestRaidHar extends TestCas
                 // files since some parity files might get deleted by the
                 // purge thread.
                 assertEquals(10, maxFilesFound);
+                harPath = s.getPath();
                 count++;
               }
             }
@@ -260,11 +248,12 @@ public class TestRaidHar extends TestCas
         } catch (FileNotFoundException e) {
           //ignore
         }
-        LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " + 
+        LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " +
                  (listPaths == null ? "none" : listPaths.length));
         Thread.sleep(1000);                  // keep waiting
+
       }
-      
+
       fileSys.delete(dir, true);
       // wait till raid file is deleted
       int count = 1;
@@ -291,7 +280,6 @@ public class TestRaidHar extends TestCas
                                           StringUtils.stringifyException(e));
       throw e;
     } finally {
-      shell.close();
       if (cnode != null) { cnode.stop(); cnode.join(); }
     }
     LOG.info("doTestHar completed:" + " blockSize=" + blockSize +



Mime
View raw message