hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r919173 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Date Thu, 04 Mar 2010 20:50:59 GMT
Author: dhruba
Date: Thu Mar  4 20:50:59 2010
New Revision: 919173

URL: http://svn.apache.org/viewvc?rev=919173&view=rev
Log:
MAPREDUCE-1512. RAID uses HarFileSystem directly instead of
FileSystem.get (Rodrigo Schmidt via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=919173&r1=919172&r2=919173&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Mar  4 20:50:59 2010
@@ -409,6 +409,9 @@
     MAPREDUCE-1520. Fix TestMiniMRLocalFS failure caused by regression in
     getting user working dir. (Amareshwari Sriramadasu via cdouglas)
 
+    MAPREDUCE-1512. RAID uses HarFileSystem directly instead of
+    FileSystem.get (Rodrigo Schmidt via dhruba)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=919173&r1=919172&r2=919173&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Thu Mar  4 20:50:59 2010
@@ -44,6 +44,7 @@
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.HarFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -468,6 +469,31 @@
     return null; // no matching policies
   }
 
+  
+  static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
+    return new Path(destPathPrefix, makeRelative(srcPath));
+  }
+  
+  private static class ParityFilePair {
+    private Path path;
+    private FileSystem fs;
+    
+    public ParityFilePair( Path path, FileSystem fs) {
+      this.path = path;
+      this.fs = fs;
+    }
+    
+    public Path getPath() {
+      return this.path;
+    }
+    
+    public FileSystem getFileSystem() {
+      return this.fs;
+    }
+    
+  }
+  
+  
   /**
    * Returns the Path to the parity file of a given file
    * 
@@ -477,7 +503,7 @@
    * @return Path object representing the parity file of the source
    * @throws IOException
    */
-  static private Path getParityFile(Path destPathPrefix, Path srcPath, boolean create, Configuration
conf) throws IOException {
+  static private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration
conf) throws IOException {
     Path srcParent = srcPath.getParent();
 
     FileSystem fsDest = destPathPrefix.getFileSystem(conf);
@@ -493,22 +519,23 @@
 
     String harDirName = srcParent.getName() + HAR_SUFFIX; 
     Path HarPath = new Path(outDir,harDirName);
-    Path outPath =  new Path(destPathPrefix, makeRelative(srcPath));
+    Path outPath =  getOriginalParityFile(destPathPrefix, srcPath);
 
-    if (create || !fsDest.exists(HarPath)) {  // case 1: no HAR file
-      return outPath;
+    if (!fsDest.exists(HarPath)) {  // case 1: no HAR file
+      return new ParityFilePair(outPath,fsDest);
     }
 
     URI HarPathUri = HarPath.toUri();
     Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
-    FileSystem fsHar = inHarPath.getFileSystem(conf);
-    
+    FileSystem fsHar = new HarFileSystem(fsDest);
+    fsHar.initialize(inHarPath.toUri(), conf);
+
     if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR
-      return outPath;
+      return new ParityFilePair(outPath,fsDest);
     }
 
     if (! fsDest.exists(outPath)) { // case 3: only inside HAR
-      return inHarPath;
+      return new ParityFilePair(inHarPath,fsHar);
     }
 
     // both inside and outside HAR. Should return most recent
@@ -516,15 +543,15 @@
     FileStatus outHar = fsDest.getFileStatus(outPath);
 
     if (inHar.getModificationTime() >= outHar.getModificationTime()) {
-      return inHarPath;
+      return new ParityFilePair(inHarPath,fsHar);
     }
 
-    return outPath;
+    return new ParityFilePair(outPath,fsDest);
   }
   
-  private Path getParityFile(Path destPathPrefix, Path srcPath, boolean create) throws IOException
{
+  private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException
{
 	  
-	  return getParityFile(destPathPrefix, srcPath, create, conf);
+	  return getParityFile(destPathPrefix, srcPath, conf);
 	  
   }
   
@@ -589,8 +616,9 @@
       // does not match the modTime of the source file, then recalculate RAID
       boolean add = false;
       try {
-        Path outpath =  getParityFile(destPathPrefix, path, false);
-        FileSystem outFs = outpath.getFileSystem(conf);
+        ParityFilePair ppair = getParityFile(destPathPrefix, path);
+        Path outpath =  ppair.getPath();
+        FileSystem outFs = ppair.getFileSystem();
         FileStatus ostat = outFs.getFileStatus(outpath);
         if (ostat.getModificationTime() != src.getModificationTime() &&
             src.getModificationTime() + modTimePeriod < now) {
@@ -749,9 +777,10 @@
     long fileSize = stat.getLen();
 
     // create output tmp path
-    Path outpath =  getParityFile(destPathPrefix, inpath,true,conf);
-    Path tmppath =  new Path(outpath + ".tmp");
+    Path outpath =  getOriginalParityFile(destPathPrefix, inpath);
     FileSystem outFs = outpath.getFileSystem(conf);
+   
+    Path tmppath =  new Path(outpath + ".tmp");
 
     // if the parity file is already upto-date, then nothing to do
     try {
@@ -967,8 +996,9 @@
     LOG.info("Decompose a total of " + numLength + " blocks.");
 
     // open and seek to the appropriate offset in parity file.
-    Path parityFile =  getParityFile(destPathPrefix, srcPath, false, conf);
-    FileSystem parityFs = parityFile.getFileSystem(conf);
+    ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf); 
+    Path parityFile = ppair.getPath();
+    FileSystem parityFs = ppair.getFileSystem();
     LOG.info("Parity file for " + srcPath + " is " + parityFile);
     ins[numLength-1] = parityFs.open(parityFile);
     ins[numLength-1].seek(snum * blockSize);
@@ -1012,7 +1042,7 @@
 
     // Now, reopen the source file and the recovered block file
     // and copy all relevant data to new file
-    Path recoveredPath =  getParityFile(destPathPrefix, srcPath, true, conf);
+    Path recoveredPath = getOriginalParityFile(destPathPrefix, srcPath);
     recoveredPath = new Path(recoveredPath + ".recovered");
     LOG.info("Creating recovered file " + recoveredPath);
 
@@ -1137,7 +1167,7 @@
               FileSystem destFs = FileSystem.get(destp.toUri(), conf);
               destp = destp.makeQualified(destFs);
               destinationPrefix = destp.toUri().getPath(); // strip host:port
-              destp = getParityFile(destp, info.getSrcPath(), true);
+              destp = getOriginalParityFile(destp, info.getSrcPath());
               
               // if this destination path has already been processed as part
               // of another policy, then nothing more to do
@@ -1202,7 +1232,8 @@
       // then delete the parity file
       Path srcPath = new Path(src);
       Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
-      if (!srcFs.exists(srcPath) || !destPath.equals(getParityFile(dstPath,srcPath,false)))
{
+      if (!srcFs.exists(srcPath) || 
+          !destPath.equals(getParityFile(dstPath,srcPath).getPath())) {
         boolean done = destFs.delete(destPath, false);
         if (done) {
           LOG.info("Purged path " + destPath );
@@ -1265,7 +1296,7 @@
               FileSystem destFs = FileSystem.get(destp.toUri(), conf);
               destp = destp.makeQualified(destFs);
               destinationPrefix = destp.toUri().getPath(); // strip host:port
-              destp = getParityFile(destp, info.getSrcPath(), true);
+              destp = getOriginalParityFile(destp, info.getSrcPath());
 
               FileStatus stat = null;
               try {
@@ -1274,7 +1305,7 @@
                 // do nothing, leave stat = null;
               }
               if (stat != null) {
-                LOG.info("Purging obsolete parity files for policy " + 
+                LOG.info("Haring parity files for policy " + 
                     info.getName() + " " + destp);
 
                 recurseHar(destFs, stat, cutoff, tmpHarPath);
@@ -1330,14 +1361,14 @@
   private void singleHar(FileSystem destFs, FileStatus dest, String tmpHarPath) throws IOException
{
     
     Path root = new Path("/");
-    Path qualifiedPath = dest.getPath().makeQualified(destFs.getUri(),root);
+    Path qualifiedPath = dest.getPath().makeQualified(destFs);
     String harFile = qualifiedPath.getName() + HAR_SUFFIX;
     HadoopArchives har = new HadoopArchives(conf);
     String[] args = new String[6];
     args[0] = "-archiveName";
     args[1] = harFile;
     args[2] = "-p"; 
-    args[3] = root.makeQualified(destFs.getUri(),root).toString();
+    args[3] = root.makeQualified(destFs).toString();
     args[4] = qualifiedPath.toUri().getPath().substring(1);
     args[5] = tmpHarPath.toString();
     int ret = 0;
@@ -1345,7 +1376,8 @@
       ret = ToolRunner.run(har, args);
       if (ret == 0 && !destFs.rename(new Path(tmpHarPath+"/"+harFile), 
                                      new Path(qualifiedPath, harFile))){
-        LOG.info("HAR rename didn't succeed");
+        LOG.info("HAR rename didn't succeed from " + tmpHarPath+"/"+harFile +
+            " to " + qualifiedPath+"/"+harFile);
         ret = -2;
       }
     } catch (Exception exc) {



Mime
View raw message