hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r481430 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/
Date Fri, 01 Dec 2006 22:32:49 GMT
Author: cutting
Date: Fri Dec  1 14:32:48 2006
New Revision: 481430

URL: http://svn.apache.org/viewvc?view=rev&rev=481430
Log:
HADOOP-728.  Fix contrib/streaming issues, including '-reducer=NONE'.  Contributed by Sanjay.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec  1 14:32:48 2006
@@ -146,6 +146,9 @@
 43. HADOOP-750.  Fix a potential race condition during mapreduce
     shuffle.  (omalley via cutting)
 
+44. HADOOP-728.  Fix contrib/streaming-related issues, including
+    '-reducer NONE'.  (Sanjay Dahiya via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Fri Dec  1 14:32:48 2006
@@ -36,6 +36,7 @@
 
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.PhasedFileSystem;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
@@ -192,10 +193,6 @@
     }
   }
 
-  String makeUniqueFileSuffix() {
-    return "." + System.currentTimeMillis() + "." + job_.get("mapred.task.id");
-  }
-
   public void configure(JobConf job) {
     try {
       String argv = getPipeCommand(job);
@@ -259,20 +256,21 @@
         // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: 
         // client has renamed outputPath and saved the argv's original output path as:
         if (useSingleSideOutputURI_) {
-          sideEffectURI_ = new URI(sideOutputURI_);
+          finalOutputURI = new URI(sideOutputURI_);
           sideEffectPathFinal_ = null; // in-place, no renaming to final
         } else {
+          sideFs_ = new PhasedFileSystem(sideFs_, job);
           String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath()

           String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale
           sideEffectPathFinal_ = new Path(sideOutputPath, fileName);
-          sideEffectURI_ = new URI(sideEffectPathFinal_ + makeUniqueFileSuffix()); // implicit
dfs: 
+          finalOutputURI = new URI(sideEffectPathFinal_.toString()); // implicit dfs: 
         }
         // apply default scheme
-        if(sideEffectURI_.getScheme() == null) {
-          sideEffectURI_ = new URI("file", sideEffectURI_.getSchemeSpecificPart(), null);
+        if(finalOutputURI.getScheme() == null) {
+          finalOutputURI = new URI("file", finalOutputURI.getSchemeSpecificPart(), null);
         }
         boolean allowSocket = useSingleSideOutputURI_;
-        sideEffectOut_ = getURIOutputStream(sideEffectURI_, allowSocket);
+        sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket);
       }
 
       // 
@@ -292,7 +290,7 @@
           f = null;
       }
       logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
-      logprintln("sideEffectURI_=" + sideEffectURI_);
+      logprintln("sideEffectURI_=" + finalOutputURI);
 
       Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
@@ -505,6 +503,7 @@
           if (optSideEffect_) {
             sideEffectOut_.write(answer);
             sideEffectOut_.write('\n');
+            sideEffectOut_.flush();
           } else {
             splitKeyVal(answer, key, val);
             output.collect(key, val);
@@ -576,17 +575,11 @@
       waitOutputThreads();
       try {
         if (optSideEffect_) {
-          logprintln("closing " + sideEffectURI_);
+          logprintln("closing " + finalOutputURI);
           if (sideEffectOut_ != null) sideEffectOut_.close();
-          logprintln("closed  " + sideEffectURI_);
-          if (useSingleSideOutputURI_) {
-            // With sideEffectPath_ we wrote in-place. 
-            // Possibly a named pipe set up by user or a socket.
-          } else {
-            boolean del = sideFs_.delete(sideEffectPathFinal_);
-            logprintln("deleted  (" + del + ") " + sideEffectPathFinal_);
-            sideFs_.rename(new Path(sideEffectURI_.getSchemeSpecificPart()), sideEffectPathFinal_);
-            logprintln("renamed  " + sideEffectPathFinal_);
+          logprintln("closed  " + finalOutputURI);
+          if ( ! useSingleSideOutputURI_) {
+            ((PhasedFileSystem)sideFs_).commit(); 
           }
         }
       } catch (IOException io) {
@@ -725,7 +718,7 @@
   boolean optUseKey_ = true;
 
   private boolean optSideEffect_;
-  private URI sideEffectURI_;
+  private URI finalOutputURI;
   private Path sideEffectPathFinal_;
 
   private boolean useSingleSideOutputURI_;

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Fri Dec  1 14:32:48 2006
@@ -701,8 +701,6 @@
         } catch (URISyntaxException e) {
           throw (IOException) new IOException().initCause(e);
         }
-      } else {
-        mapsideoutURI_ = primary;
       }
       // an empty reduce output named "part-00002" will go here and not collide.
       channel0 = primary + ".NONE";

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Fri Dec  1
14:32:48 2006
@@ -25,7 +25,7 @@
  * better to commit(Path) individual files when done. Otherwise
  * commit() can be used to commit all open files at once. 
  */
-class PhasedFileSystem extends FileSystem {
+public class PhasedFileSystem extends FileSystem {
 
   private FileSystem baseFS ;
   // Map from final file name to temporary file name
@@ -93,7 +93,9 @@
         }catch(IOException ioe){
           // ignore if already closed
         }
-        baseFS.delete( fInfo.getTempPath() ); 
+        if( baseFS.exists(fInfo.getTempPath())){
+          baseFS.delete( fInfo.getTempPath() );
+        }
         finalNameToFileInfo.remove(finalFile); 
       }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=481430&r1=481429&r2=481430
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec  1 14:32:48
2006
@@ -1030,9 +1030,11 @@
             // Delete temp directory in case any task used PhasedFileSystem.
             try{
               String systemDir = task.getConf().get("mapred.system.dir");
-              String taskTempDir = systemDir + "/" + 
-                  task.getJobId() + "/" + task.getTipId();
-              fs.delete(new Path(taskTempDir)) ;
+              Path taskTempDir = new Path(systemDir + "/" + 
+                  task.getJobId() + "/" + task.getTipId());
+              if( fs.exists(taskTempDir)){
+                fs.delete(taskTempDir) ;
+              }
             }catch(IOException e){
               LOG.warn("Error in deleting reduce temporary output",e); 
             }



Mime
View raw message