hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r555060 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/LocalJobRunner.java
Date Tue, 10 Jul 2007 19:53:23 GMT
Author: tomwhite
Date: Tue Jul 10 12:53:22 2007
New Revision: 555060

URL: http://svn.apache.org/viewvc?view=rev&rev=555060
Log:
HADOOP-1556.  Make LocalJobRunner delete working files at end of job run.  Contributed by
Devaraj Das.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555060&r1=555059&r2=555060
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 10 12:53:22 2007
@@ -277,6 +277,9 @@
  85. HADOOP-1546.  Remove spurious column from HDFS web UI.
      (Dhruba Borthakur via cutting)
 
+ 86. HADOOP-1556.  Make LocalJobRunner delete working files at end of
+     job run.  (Devaraj Das via tomwhite)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=555060&r1=555059&r2=555060
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jul 10 12:53:22
2007
@@ -134,38 +134,46 @@
           map_tasks -= 1;
           updateCounters(map);
         }
-        if (numReduceTasks > 0) {
-          // move map output to reduce input
-          String reduceId = "reduce_" + newId();
+        String reduceId = "reduce_" + newId();
+        try {
+          if (numReduceTasks > 0) {
+            // move map output to reduce input  
+            for (int i = 0; i < mapIds.size(); i++) {
+              String mapId = mapIds.get(i);
+              Path mapOut = this.mapoutputFile.getOutputFile(mapId);
+              Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
+                  localFs.getLength(mapOut));
+              if (!localFs.mkdirs(reduceIn.getParent())) {
+                throw new IOException("Mkdirs failed to create "
+                    + reduceIn.getParent().toString());
+              }
+              if (!localFs.rename(mapOut, reduceIn))
+                throw new IOException("Couldn't rename " + mapOut);
+            }
+
+            {
+              ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001",
+                  reduceId, 0, mapIds.size());
+              JobConf localConf = new JobConf(job);
+              reduce.localizeConfiguration(localConf);
+              reduce.setConf(localConf);
+              reduce_tasks += 1;
+              myMetrics.launchReduce();
+              reduce.run(localConf, this);
+              reduce.saveTaskOutput();
+              myMetrics.completeReduce();
+              reduce_tasks -= 1;
+              updateCounters(reduce);
+            }
+          }
+        } finally {
           for (int i = 0; i < mapIds.size(); i++) {
             String mapId = mapIds.get(i);
-            Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-            Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId,
-                localFs.getLength(mapOut));
-            if (!localFs.mkdirs(reduceIn.getParent())) {
-              throw new IOException("Mkdirs failed to create "
-                  + reduceIn.getParent().toString());
-            }
-            if (!localFs.rename(mapOut, reduceIn))
-              throw new IOException("Couldn't rename " + mapOut);
             this.mapoutputFile.removeAll(mapId);
           }
-
-          {
-            ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001",
-                reduceId, 0, mapIds.size());
-            JobConf localConf = new JobConf(job);
-            reduce.localizeConfiguration(localConf);
-            reduce.setConf(localConf);
-            reduce_tasks += 1;
-            myMetrics.launchReduce();
-            reduce.run(localConf, this);
-            reduce.saveTaskOutput();
-            myMetrics.completeReduce();
-            reduce_tasks -= 1;
-            updateCounters(reduce);
+          if (numReduceTasks == 1) {
+            this.mapoutputFile.removeAll(reduceId);
           }
-          this.mapoutputFile.removeAll(reduceId);
         }
         this.status.setRunState(JobStatus.SUCCEEDED);
 



Mime
View raw message