hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r936159 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/ src/contrib/raid/src/test/org/apache/hadoop/raid/
Date Wed, 21 Apr 2010 05:18:30 GMT
Author: dhruba
Date: Wed Apr 21 05:18:30 2010
New Revision: 936159

URL: http://svn.apache.org/viewvc?rev=936159&view=rev
Log:
MAPREDUCE-1659. RaidNode writes temp files on configured tmp directory and
add random numbers to their names to avoid conflicts
(Rodrigo Schmidt via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=936159&r1=936158&r2=936159&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Apr 21 05:18:30 2010
@@ -254,6 +254,10 @@ Trunk (unreleased changes)
     MAPREDUCE-1673. Scripts to start and stop RaidNode.
     (Rodrigo Schmidt via dhruba)
 
+    MAPREDUCE-1659. RaidNode writes temp files on configured tmp directory and
+    add random numbers to their names to avoid conflicts
+    (Rodrigo Schmidt via dhruba)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=936159&r1=936158&r2=936159&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Wed Apr 21 05:18:30 2010
@@ -277,7 +277,7 @@ public class RaidNode implements RaidPro
   }
 
   /** {@inheritDoc} */
-  public ReturnStatus recoverFile(String inStr, long corruptOffset) throws IOException {
+  public String recoverFile(String inStr, long corruptOffset) throws IOException {
 
     LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
     Path inputPath = new Path(inStr);
@@ -294,9 +294,12 @@ public class RaidNode implements RaidPro
       FileSystem fs = FileSystem.get(destPath.toUri(), conf);
       destPath = destPath.makeQualified(fs);
 
-      unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
+      Path unraided = unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
+      if (unraided != null) {
+        return unraided.toString();
+      }
     }
-    return ReturnStatus.SUCCESS;
+    return null;
   }
 
   /**
@@ -767,10 +770,10 @@ public class RaidNode implements RaidPro
                                   int metaRepl, int stripeLength) throws IOException {
 
     // two buffers for generating parity
+    Random rand = new Random();
     int bufSize = 5 * 1024 * 1024; // 5 MB
     byte[] bufs = new byte[bufSize];
     byte[] xor = new byte[bufSize];
-    byte zero = 0;
 
     Path inpath = stat.getPath();
     long blockSize = stat.getBlockSize();
@@ -780,7 +783,9 @@ public class RaidNode implements RaidPro
     Path outpath =  getOriginalParityFile(destPathPrefix, inpath);
     FileSystem outFs = outpath.getFileSystem(conf);
    
-    Path tmppath =  new Path(outpath + ".tmp");
+    Path tmppath =  new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + 
+                             outpath.toUri().getPath() + "." + 
+                             rand.nextLong() + ".tmp");
 
     // if the parity file is already upto-date, then nothing to do
     try {
@@ -841,6 +846,7 @@ public class RaidNode implements RaidPro
         outFs.delete(outpath, false);
       }
       // rename tmppath to the real parity filename
+      outFs.mkdirs(outpath.getParent());
       if (!outFs.rename(tmppath, outpath)) {
         String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
         LOG.warn(msg);
@@ -954,10 +960,8 @@ public class RaidNode implements RaidPro
     Random rand = new Random();
     FileSystem srcFs = srcPath.getFileSystem(conf);
     FileStatus srcStat = srcFs.getFileStatus(srcPath);
-    BlockLocation[] locations = srcFs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
     long blockSize = srcStat.getBlockSize();
     long fileSize = srcStat.getLen();
-    int totalBlocks = locations.length;
 
     // find the stripe number where the corrupted offset lies
     long snum = corruptOffset / (stripeLength * blockSize);
@@ -1014,7 +1018,8 @@ public class RaidNode implements RaidPro
     FileSystem destFs = destPathPrefix.getFileSystem(conf);
     int retry = 5;
     try {
-      tmpFile = new Path("/tmp/dhruba/" + rand.nextInt());
+      tmpFile = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + "/" + 
+          rand.nextInt());
       fout = destFs.create(tmpFile, false);
     } catch (IOException e) {
       if (retry-- <= 0) {
@@ -1042,8 +1047,12 @@ public class RaidNode implements RaidPro
 
     // Now, reopen the source file and the recovered block file
     // and copy all relevant data to new file
-    Path recoveredPath = getOriginalParityFile(destPathPrefix, srcPath);
-    recoveredPath = new Path(recoveredPath + ".recovered");
+    final Path recoveryDestination = 
+      new Path(conf.get("fs.raid.tmpdir", "/tmp/raid"));
+    final Path recoveredPrefix = 
+      destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
+    final Path recoveredPath = 
+      new Path(recoveredPrefix + "." + rand.nextLong() + ".recovered");
     LOG.info("Creating recovered file " + recoveredPath);
 
     FSDataInputStream sin = srcFs.open(srcPath);
@@ -1296,7 +1305,7 @@ public class RaidNode implements RaidPro
           String str = info.getProperty("time_before_har");
           String tmpHarPath = info.getProperty("har_tmp_dir");
           if (tmpHarPath == null) {
-            tmpHarPath = "/raid_har";
+            tmpHarPath = "/tmp/raid_har";
           }
           if (str != null) {
             try {
@@ -1371,13 +1380,16 @@ public class RaidNode implements RaidPro
   
   private void singleHar(FileSystem destFs, FileStatus dest, String tmpHarPath) throws IOException
{
     
+    Random rand = new Random();
     Path root = new Path("/");
     Path qualifiedPath = dest.getPath().makeQualified(destFs);
-    String harFile = qualifiedPath.getName() + HAR_SUFFIX;
+    String harFileDst = qualifiedPath.getName() + HAR_SUFFIX;
+    String harFileSrc = qualifiedPath.getName() + "-" + 
+                                rand.nextLong() + "-" + HAR_SUFFIX;
     HadoopArchives har = new HadoopArchives(conf);
     String[] args = new String[6];
     args[0] = "-archiveName";
-    args[1] = harFile;
+    args[1] = harFileSrc;
     args[2] = "-p"; 
     args[3] = root.makeQualified(destFs).toString();
     args[4] = qualifiedPath.toUri().getPath().substring(1);
@@ -1385,10 +1397,10 @@ public class RaidNode implements RaidPro
     int ret = 0;
     try {
       ret = ToolRunner.run(har, args);
-      if (ret == 0 && !destFs.rename(new Path(tmpHarPath+"/"+harFile), 
-                                     new Path(qualifiedPath, harFile))){
-        LOG.info("HAR rename didn't succeed from " + tmpHarPath+"/"+harFile +
-            " to " + qualifiedPath+"/"+harFile);
+      if (ret == 0 && !destFs.rename(new Path(tmpHarPath+"/"+harFileSrc), 
+                                     new Path(qualifiedPath, harFileDst))) {
+        LOG.info("HAR rename didn't succeed from " + tmpHarPath+"/"+harFileSrc +
+            " to " + qualifiedPath + "/" + harFileDst);
         ret = -2;
       }
     } catch (Exception exc) {
@@ -1416,6 +1428,7 @@ public class RaidNode implements RaidPro
           LOG.info("Har parity files thread continuing to run...");
         }
       }
+      LOG.info("Leaving Har thread.");
     }
     
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java?rev=936159&r1=936158&r2=936159&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
Wed Apr 21 05:18:30 2010
@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
 import org.apache.hadoop.raid.protocol.RaidProtocol;
-import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
 
 /**
  * A {@link RaidShell} that allows browsing configured raid policies.
@@ -198,7 +197,7 @@ public class RaidShell extends Configure
       if ("-showConfig".equals(cmd)) {
         exitCode = showConfig(cmd, argv, i);
       } else if ("-recover".equals(cmd)) {
-        exitCode = recover(cmd, argv, i);
+        exitCode = recoverAndPrint(cmd, argv, i);
       } else {
         exitCode = -1;
         System.err.println(cmd.substring(1) + ": Unknown command");
@@ -256,19 +255,30 @@ public class RaidShell extends Configure
   /**
    * Recovers the specified path from the parity file
    */
-  public int recover(String cmd, String argv[], int startindex)
+  public Path[] recover(String cmd, String argv[], int startindex)
     throws IOException {
-    int exitCode = 0;
-    String[] paths = new String[argv.length - startindex];
+    Path[] paths = new Path[(argv.length - startindex) / 2];
+    int j = 0;
     for (int i = startindex; i < argv.length; i = i + 2) {
       String path = argv[i];
       long corruptOffset = Long.parseLong(argv[i+1]);
       LOG.info("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
-      raidnode.recoverFile(path, corruptOffset);
+      paths[j] = new Path(raidnode.recoverFile(path, corruptOffset));
+      LOG.info("Raidshell created recovery file " + paths[j]);
+      j++;
     }
-    return 0;
+    return paths;
   }
 
+  public int recoverAndPrint(String cmd, String argv[], int startindex)
+    throws IOException {
+    int exitCode = 0;
+    for (Path p : recover(cmd,argv,startindex)) {
+      System.out.println(p);
+    }
+    return exitCode;
+  }
+  
   /**
    * main() has some simple utility methods
    */

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java?rev=936159&r1=936158&r2=936159&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
Wed Apr 21 05:18:30 2010
@@ -21,20 +21,15 @@ package org.apache.hadoop.raid.protocol;
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.util.Date;
-import java.util.List;
 import java.util.Properties;
 import java.util.Enumeration;
 import java.lang.Math;
-import java.lang.Class;
 import java.text.SimpleDateFormat;
-import java.util.StringTokenizer;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
@@ -42,7 +37,6 @@ import org.apache.hadoop.io.WritableFact
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
 
 /**
  * Maintains information about one policy

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java?rev=936159&r1=936158&r2=936159&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
Wed Apr 21 05:18:30 2010
@@ -39,21 +39,6 @@ public interface RaidProtocol extends Ve
   public static final long versionID = 1L;
 
   /**
-   * A set of codes returned by RPC calls.
-   */
-  public enum ReturnStatus {
-    SUCCESS ((int)0x01),
-    FAILURE ((int)0x02),
-    RETRY   ((int)0x03);
-    private int code;
-
-    private ReturnStatus(int code) {
-      this.code = code;
-    }
-    int getReturnStatus() {return code;}
-  }
-
-  /**
    * Get a listing of all configured policies
    * @throws IOException
    * return all categories of configured policies
@@ -68,6 +53,6 @@ public interface RaidProtocol extends Ve
    * @param inputPath The absolute pathname of the file to be recovered.
    * @param corruptOffset The offset that has the corruption
    */
-  public ReturnStatus recoverFile(String inputPath, long corruptOffset) throws IOException;
+  public String recoverFile(String inputPath, long corruptOffset) throws IOException;
 
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java?rev=936159&r1=936158&r2=936159&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
Wed Apr 21 05:18:30 2010
@@ -198,7 +198,6 @@ public class TestRaidNode extends TestCa
     Path file1 = new Path(dir + "/file" + iter);
     RaidNode cnode = null;
     try {
-      Path recover1 = new Path("/destraid/" + file1 + ".recovered");
       Path destPath = new Path("/destraid/user/dhruba/raidtest");
       fileSys.delete(dir, true);
       fileSys.delete(destPath, true);
@@ -252,26 +251,25 @@ public class TestRaidNode extends TestCa
       // check for error at beginning of file
       if (numBlock >= 1) {
         LOG.info("Check error at beginning of file.");
-        simulateError(shell, fileSys, file1, recover1, crc1, 0);
+        simulateError(shell, fileSys, file1, crc1, 0);
       }
 
       // check for error at the beginning of second block
       if (numBlock >= 2) {
         LOG.info("Check error at beginning of second block.");
-        simulateError(shell, fileSys, file1, recover1, crc1, blockSize + 1);
+        simulateError(shell, fileSys, file1, crc1, blockSize + 1);
       }
 
       // check for error at the middle of third block
       if (numBlock >= 3) {
         LOG.info("Check error at middle of third block.");
-        simulateError(shell, fileSys, file1, recover1, crc1,
-                                                        2 * blockSize + 10);
+        simulateError(shell, fileSys, file1, crc1, 2 * blockSize + 10);
       }
 
       // check for error at the middle of second stripe
       if (numBlock >= stripeLength + 1) {
         LOG.info("Check error at middle of second stripe.");
-        simulateError(shell, fileSys, file1, recover1, crc1,
+        simulateError(shell, fileSys, file1, crc1,
                                             stripeLength * blockSize + 100);
       }
 
@@ -455,14 +453,14 @@ public class TestRaidNode extends TestCa
   //
   // simulate a corruption at specified offset and verify that eveyrthing is good
   //
-  void simulateError(RaidShell shell, FileSystem fileSys, Path file1, Path recover1, 
+  void simulateError(RaidShell shell, FileSystem fileSys, Path file1, 
                      long crc, long corruptOffset) throws IOException {
     // recover the file assuming that we encountered a corruption at offset 0
     String[] args = new String[3];
     args[0] = "recover";
     args[1] = file1.toString();
     args[2] = Long.toString(corruptOffset);
-    shell.recover(args[0], args, 1);
+    Path recover1 = shell.recover(args[0], args, 1)[0];
 
     // compare that the recovered file is identical to the original one
     LOG.info("Comparing file " + file1 + " with recovered file " + recover1);



Mime
View raw message