hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r627741 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Thu, 14 Feb 2008 13:41:02 GMT
Author: ddas
Date: Thu Feb 14 05:41:01 2008
New Revision: 627741

URL: http://svn.apache.org/viewvc?rev=627741&view=rev
Log:
HADOOP-2391. Cleanup job output directory before declaring a job as SUCCESSFUL. Contributed
by Amareshwari Sri Ramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Feb 14 05:41:01 2008
@@ -76,6 +76,9 @@
     the destination after encountering an error. (Tsz Wo (Nicholas), SZE
     via cdouglas)
 
+    HADOOP-2391. Cleanup job output directory before declaring a job as
+    SUCCESSFUL. (Amareshwari Sri Ramadasu via ddas)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Feb 14 05:41:01
2008
@@ -30,6 +30,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
@@ -275,6 +276,18 @@
                                       jobtracker, conf, this);
     }
 
+    // create job specific temporary directory in output path
+    Path outputPath = conf.getOutputPath();
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, "_temporary");
+      FileSystem fileSys = tmpDir.getFileSystem(conf);
+      if (!fileSys.mkdirs(tmpDir)) {
+        LOG.error("Mkdirs failed to create " + tmpDir.toString());
+      }
+    } else {
+      LOG.error("Null Output path");
+    }
+
     this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
     tasksInited = true;
         
@@ -1129,6 +1142,15 @@
       Path tempDir = new Path(conf.getSystemDir(), jobId); 
       fs.delete(tempDir); 
 
+      // delete the temporary directory in output directory
+      Path outputPath = conf.getOutputPath();
+      if (outputPath != null) {
+        Path tmpDir = new Path(outputPath, "_temporary");
+        FileSystem fileSys = tmpDir.getFileSystem(conf);
+        if (fileSys.exists(tmpDir)) {
+          FileUtil.fullyDelete(fileSys, tmpDir);
+        }
+      }
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Feb 14 05:41:01
2008
@@ -27,6 +27,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -112,6 +113,13 @@
           numReduceTasks = 1;
           job.setNumReduceTasks(1);
         }
+        // create job specific temp directory in output path
+        Path tmpDir = new Path(job.getOutputPath(), "_temporary");
+        FileSystem fileSys = tmpDir.getFileSystem(job);
+        if (!fileSys.mkdirs(tmpDir)) {
+          LOG.error("Mkdirs failed to create " + tmpDir.toString());
+        }
+
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
           String mapId = jobId + "_map_" + idFormat.format(i);
@@ -125,6 +133,16 @@
                                     splits[i].getClass().getName(),
                                     split);
           JobConf localConf = new JobConf(job);
+          if (fileSys.exists(tmpDir)) {
+            Path taskTmpDir = new Path(tmpDir, "_" + mapId);
+            if (!fileSys.mkdirs(taskTmpDir)) {
+              throw new IOException("Mkdirs failed to create " 
+                                     + taskTmpDir.toString());
+            }
+          } else {
+            throw new IOException("The directory " + tmpDir.toString()
+                                   + " doesnt exist " );
+          }
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
           map_tasks += 1;
@@ -157,6 +175,16 @@
                                                  "tip_r_0001",
                                                  reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
+              if (fileSys.exists(tmpDir)) {
+                Path taskTmpDir = new Path(tmpDir, "_" + reduceId);
+                if (!fileSys.mkdirs(taskTmpDir)) {
+                  throw new IOException("Mkdirs failed to create " 
+                                         + taskTmpDir.toString());
+                }
+              } else {
+                throw new IOException("The directory " + tmpDir.toString()
+                                       + " doesnt exist "); 
+              }
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
@@ -177,6 +205,15 @@
             this.mapoutputFile.removeAll(reduceId);
           }
         }
+        // delete the temporary directory in output directory
+        try {
+          if (fileSys.exists(tmpDir)) {
+            FileUtil.fullyDelete(fileSys, tmpDir);
+          }
+        } catch (IOException e) {
+          LOG.error("Exception in deleting " + tmpDir.toString());
+        }
+
         this.status.setRunState(JobStatus.SUCCEEDED);
 
         JobEndNotifier.localRunnerNotification(job, status);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Thu Feb 14
05:41:01 2008
@@ -42,7 +42,12 @@
                                       String name, Progressable progress)
     throws IOException {
 
-    Path file = new Path(job.getOutputPath(), name);
+    Path outputPath = job.getOutputPath();
+    FileSystem fs = outputPath.getFileSystem(job);
+    if (!fs.exists(outputPath)) {
+      throw new IOException("Output directory doesnt exist");
+    }
+    Path file = new Path(outputPath, name);
     
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
@@ -58,7 +63,7 @@
     
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
-      new MapFile.Writer(job, file.getFileSystem(job), file.toString(),
+      new MapFile.Writer(job, fs, file.toString(),
                          job.getOutputKeyClass(),
                          job.getOutputValueClass(),
                          compressionType, codec,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Thu
Feb 14 05:41:01 2008
@@ -40,8 +40,12 @@
                                       String name, Progressable progress)
     throws IOException {
 
-    Path file = new Path(job.getOutputPath(), name);
-    FileSystem fs = file.getFileSystem(job);
+    Path outputPath = job.getOutputPath();
+    FileSystem fs = outputPath.getFileSystem(job);
+    if (!fs.exists(outputPath)) {
+      throw new IOException("Output directory doesnt exist");
+    }
+    Path file = new Path(outputPath, name);
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(job)) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Feb 14 05:41:01 2008
@@ -190,7 +190,8 @@
   public String toString() { return taskId; }
 
   private Path getTaskOutputPath(JobConf conf) {
-    Path p = new Path(conf.getOutputPath(), ("_" + taskId));
+    Path p = new Path(conf.getOutputPath(), ("_temporary" 
+                      + Path.SEPARATOR + "_" + taskId));
     try {
       FileSystem fs = p.getFileSystem(conf);
       return p.makeQualified(fs);
@@ -420,7 +421,7 @@
     if (taskOutputPath != null) {
       FileSystem fs = taskOutputPath.getFileSystem(conf);
       if (fs.exists(taskOutputPath)) {
-        Path jobOutputPath = taskOutputPath.getParent();
+        Path jobOutputPath = taskOutputPath.getParent().getParent();
 
         // Move the task outputs to their final place
         moveTaskOutputs(fs, jobOutputPath, taskOutputPath);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Feb 14 05:41:01
2008
@@ -1423,7 +1423,25 @@
             
       localJobConf.set("mapred.task.id", task.getTaskId());
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+
+      // create _taskid directory in output path temporary directory.
+      Path outputPath = localJobConf.getOutputPath();
+      if (outputPath != null) {
+        Path jobTmpDir = new Path(outputPath, "_temporary");
+        FileSystem fs = jobTmpDir.getFileSystem(localJobConf);
+        if (fs.exists(jobTmpDir)) {
+          Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskId());
+          if (!fs.mkdirs(taskTmpDir)) {
+            throw new IOException("Mkdirs failed to create " 
+                                 + taskTmpDir.toString());
+          }
+        } else {
+          throw new IOException("The directory " + jobTmpDir.toString()
+                                 + " doesnt exist "); 
+        }
+      }
       task.localizeConfiguration(localJobConf);
+      
       OutputStream out = localFs.create(localTaskFile);
       try {
         localJobConf.write(out);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Thu Feb 14 05:41:01
2008
@@ -108,6 +108,9 @@
 
     Path dir = job.getOutputPath();
     FileSystem fs = dir.getFileSystem(job);
+    if (!fs.exists(dir)) {
+      throw new IOException("Output directory doesnt exist");
+    }
     boolean isCompressed = getCompressOutput(job);
     if (!isCompressed) {
       FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=627741&r1=627740&r2=627741&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Thu Feb
14 05:41:01 2008
@@ -44,6 +44,10 @@
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
     job.setOutputPath(workDir);
+    FileSystem fs = workDir.getFileSystem(job);
+    if (!fs.mkdirs(workDir)) {
+      fail("Failed to create output directory");
+    }
     String file = "test.txt";
     
     // A reporter that does nothing



Mime
View raw message